/*
 * Decompiled with CFR 0.152.
 */
package objectStreamTest;

import es.bsc.distrostreamlib.api.objects.ObjectDistroStream;
import es.bsc.distrostreamlib.exceptions.BackendException;
import es.bsc.distrostreamlib.exceptions.RegistrationException;
import es.bsc.distrostreamlib.types.ConsumerMode;
import java.io.IOException;
import java.util.List;
import objectStreamTest.MyObject;
import objectStreamTest.Tasks;

public class Main {
    public static final int NUM_BATCHES = 2;
    public static final int NUM_OBJECTS = 10;
    private static final int SLEEP_TIME1 = 400;
    private static final int SLEEP_TIME2 = 1000;
    private static final int SLEEP_TIME3 = 2000;
    private static final int MAX_ENTRIES = 100;
    private static final long TIMEOUT = 1000L;
    private static final String ALIAS = "custom-stream";

    public static void main(String[] args) throws RegistrationException, IOException, BackendException {
        Main.produceConsume(1, 1000, 1, 400);
        Main.produceListConsume(1000, 1, 400);
        Main.produceConsumeTimeout(1, 1000, 1, 400);
        Main.produceConsume(1, 1000, 1, 2000);
        Main.produceConsume(2, 1000, 1, 400);
        Main.produceConsume(1, 400, 2, 1000);
        Main.produceConsume(2, 1000, 2, 400);
        Main.produceSpawnConsumers(1, 400, 1000);
        Main.byAlias(1, 1000, 1, 400);
    }

    private static void produceConsume(int numProducers, int producerSleep, int numConsumers, int consumerSleep) throws RegistrationException, IOException, BackendException {
        ObjectDistroStream<MyObject> ods = new ObjectDistroStream<MyObject>(ConsumerMode.AT_MOST_ONCE);
        for (int i = 0; i < numProducers; ++i) {
            Tasks.writeObjects(ods, producerSleep);
            try {
                Thread.sleep(producerSleep / numProducers);
                continue;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        Integer[] partialObjects = new Integer[numConsumers];
        for (int i = 0; i < numConsumers; ++i) {
            partialObjects[i] = Tasks.readObjects(ods, consumerSleep);
            try {
                Thread.sleep(consumerSleep / numConsumers);
                continue;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        Integer totalObjects = 0;
        for (int i = 0; i < numConsumers; ++i) {
            totalObjects = totalObjects + partialObjects[i];
        }
        System.out.println("[LOG] TOTAL NUMBER OF PROCESSED OBJECTS: " + totalObjects);
    }

    private static void produceListConsume(int producerSleep, int numConsumers, int consumerSleep) throws RegistrationException, IOException, BackendException {
        ObjectDistroStream<MyObject> ods = new ObjectDistroStream<MyObject>(ConsumerMode.AT_MOST_ONCE);
        Tasks.writeObjectList(ods, producerSleep);
        Integer[] partialObjects = new Integer[numConsumers];
        for (int i = 0; i < numConsumers; ++i) {
            partialObjects[i] = Tasks.readObjects(ods, consumerSleep);
        }
        Integer totalObjects = 0;
        for (int i = 0; i < numConsumers; ++i) {
            totalObjects = totalObjects + partialObjects[i];
        }
        System.out.println("[LOG] TOTAL NUMBER OF PROCESSED OBJECTS: " + totalObjects);
    }

    private static void produceConsumeTimeout(int numProducers, int producerSleep, int numConsumers, int consumerSleep) throws RegistrationException, IOException, BackendException {
        ObjectDistroStream<MyObject> ods = new ObjectDistroStream<MyObject>(ConsumerMode.AT_MOST_ONCE);
        for (int i = 0; i < numProducers; ++i) {
            Tasks.writeObjects(ods, producerSleep);
            try {
                Thread.sleep(producerSleep / numProducers);
                continue;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        Integer[] partialObjects = new Integer[numConsumers];
        for (int i = 0; i < numConsumers; ++i) {
            partialObjects[i] = Tasks.readObjectsTimeout(ods, 1000L, consumerSleep);
            try {
                Thread.sleep(consumerSleep / numConsumers);
                continue;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        Integer totalObjects = 0;
        for (int i = 0; i < numConsumers; ++i) {
            totalObjects = totalObjects + partialObjects[i];
        }
        System.out.println("[LOG] TOTAL NUMBER OF PROCESSED OBJECTS: " + totalObjects);
    }

    private static void produceSpawnConsumers(int numProducers, int producerSleep, int consumerSleep) throws RegistrationException, IOException, BackendException {
        List<MyObject> newObjects;
        ObjectDistroStream<MyObject> ods = new ObjectDistroStream<MyObject>(ConsumerMode.AT_MOST_ONCE);
        for (int i = 0; i < numProducers; ++i) {
            Tasks.writeObjects(ods, producerSleep);
        }
        Integer[] partialObjects = new Integer[100];
        int pos = 0;
        while (!ods.isClosed()) {
            newObjects = ods.poll();
            for (MyObject obj : newObjects) {
                System.out.println("Sending " + obj.hashCode() + ":" + obj.getName() + ":" + obj.getValue() + " to a process task");
                partialObjects[pos] = Tasks.processObject(obj);
                ++pos;
            }
            try {
                Thread.sleep(consumerSleep);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        newObjects = ods.poll();
        for (MyObject obj : newObjects) {
            System.out.println("Sending " + obj.hashCode() + ":" + obj.getName() + ":" + obj.getValue() + " to a process task");
            partialObjects[pos] = Tasks.processObject(obj);
            ++pos;
        }
        Integer totalObjects = 0;
        for (int i = 0; i < pos; ++i) {
            if (partialObjects[i] == null) continue;
            totalObjects = totalObjects + partialObjects[i];
        }
        System.out.println("[LOG] TOTAL NUMBER OF PROCESSED OBJECTS: " + totalObjects);
    }

    private static void byAlias(int numProducers, int producerSleep, int numConsumers, int consumerSleep) throws RegistrationException, IOException, BackendException {
        ObjectDistroStream<MyObject> ods = new ObjectDistroStream<MyObject>(ALIAS, ConsumerMode.AT_MOST_ONCE);
        for (int i = 0; i < numProducers; ++i) {
            Tasks.writeObjects(ods, producerSleep);
            try {
                Thread.sleep(producerSleep / numProducers);
                continue;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        ObjectDistroStream<MyObject> ods2 = new ObjectDistroStream<MyObject>(ALIAS, ConsumerMode.AT_MOST_ONCE);
        Integer[] partialObjects = new Integer[numConsumers];
        for (int i = 0; i < numConsumers; ++i) {
            partialObjects[i] = Tasks.readObjects(ods2, consumerSleep);
            try {
                Thread.sleep(consumerSleep / numConsumers);
                continue;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        Integer totalObjects = 0;
        for (int i = 0; i < numConsumers; ++i) {
            totalObjects = totalObjects + partialObjects[i];
        }
        System.out.println("[LOG] TOTAL NUMBER OF PROCESSED OBJECTS: " + totalObjects);
    }
}

