/*
 * Decompiled with CFR 0.152.
 */
package fileStreamTest;

import es.bsc.distrostreamlib.api.files.FileDistroStream;
import es.bsc.distrostreamlib.exceptions.BackendException;
import es.bsc.distrostreamlib.exceptions.RegistrationException;
import fileStreamTest.Tasks;
import fileStreamTest.Utils;
import java.io.IOException;
import java.util.List;

public class Main {
    public static final String TEST_PATH = "/tmp/file_stream/";
    public static final String BASE_FILENAME = "file";
    public static final String ALIAS = "custom-stream";
    public static final String TEST_PATH_ALIAS = "/tmp/file_custom_stream/";
    public static final int NUM_FILES = 10;
    private static final int SLEEP_TIME1 = 400;
    private static final int SLEEP_TIME2 = 1000;
    private static final int SLEEP_TIME3 = 2000;
    private static final int MAX_ENTRIES = 100;

    public static void main(String[] args) throws RegistrationException, IOException, BackendException {
        Main.produceConsume(1, 1000, 1, 400);
        Main.produceConsume(1, 1000, 1, 2000);
        Main.produceConsume(2, 1000, 1, 400);
        Main.produceConsume(1, 400, 2, 1000);
        Main.produceConsume(2, 1000, 2, 400);
        Main.produceSpawnConsumers(1, 400, 1000);
        Main.byAlias(1, 1000, 1, 400);
        Utils.removeDirectory(TEST_PATH);
        Utils.removeDirectory(TEST_PATH_ALIAS);
    }

    private static void produceConsume(int numProducers, int producerSleep, int numConsumers, int consumerSleep) throws RegistrationException, IOException, BackendException {
        Utils.removeDirectory(TEST_PATH);
        Utils.createDirectory(TEST_PATH);
        FileDistroStream fds = new FileDistroStream(TEST_PATH);
        for (int i = 0; i < numProducers; ++i) {
            Tasks.writeFiles(fds, producerSleep);
        }
        Integer[] totalFiles = new Integer[numConsumers];
        for (int i = 0; i < numConsumers; ++i) {
            totalFiles[i] = Tasks.readFiles(fds, consumerSleep);
        }
        Integer total = 0;
        for (Integer partial : totalFiles) {
            total = total + partial;
        }
        System.out.println("[LOG] TOTAL NUMBER OF PROCESSED FILES: " + total);
    }

    private static void produceSpawnConsumers(int numProducers, int producerSleep, int consumerSleep) throws RegistrationException, IOException, BackendException {
        List<String> newFiles;
        Utils.removeDirectory(TEST_PATH);
        Utils.createDirectory(TEST_PATH);
        FileDistroStream fds = new FileDistroStream(TEST_PATH);
        for (int i = 0; i < numProducers; ++i) {
            Tasks.writeFiles(fds, producerSleep);
        }
        Integer[] totalFiles = new Integer[100];
        int pos = 0;
        while (!fds.isClosed()) {
            newFiles = fds.poll();
            for (String fileName : newFiles) {
                System.out.println("Sending " + fileName + " to a process task");
                totalFiles[pos] = Tasks.processFile(fileName);
                ++pos;
            }
            try {
                Thread.sleep(consumerSleep);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        newFiles = fds.poll();
        for (String fileName : newFiles) {
            System.out.println("Sending " + fileName + " to a process task");
            totalFiles[pos] = Tasks.processFile(fileName);
            ++pos;
        }
        Integer total = 0;
        for (int i = 0; i < pos; ++i) {
            Integer partial = totalFiles[i];
            if (partial == null) continue;
            total = total + partial;
        }
        System.out.println("[LOG] TOTAL NUMBER OF PROCESSED FILES: " + total);
    }

    private static void byAlias(int numProducers, int producerSleep, int numConsumers, int consumerSleep) throws RegistrationException, IOException, BackendException {
        Utils.removeDirectory(TEST_PATH_ALIAS);
        Utils.createDirectory(TEST_PATH_ALIAS);
        FileDistroStream fds = new FileDistroStream(ALIAS, TEST_PATH_ALIAS);
        for (int i = 0; i < numProducers; ++i) {
            Tasks.writeFilesAlias(fds, producerSleep);
        }
        FileDistroStream fds2 = new FileDistroStream(ALIAS, TEST_PATH_ALIAS);
        Integer[] totalFiles = new Integer[numConsumers];
        for (int i = 0; i < numConsumers; ++i) {
            totalFiles[i] = Tasks.readFiles(fds2, consumerSleep);
        }
        Integer total = 0;
        for (Integer partial : totalFiles) {
            total = total + partial;
        }
        System.out.println("[LOG] TOTAL NUMBER OF PROCESSED FILES: " + total);
    }
}

