/*
 * Decompiled with CFR 0.152.
 */
package objectStreamTest;

import es.bsc.distrostreamlib.api.objects.ObjectDistroStream;
import es.bsc.distrostreamlib.exceptions.BackendException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import objectStreamTest.MyObject;

public class Tasks {
    private static final String BASE_NAME = "CUSTOM_OBJECT_";

    public static void writeObjects(ObjectDistroStream<MyObject> ods, int sleepTime) throws BackendException {
        for (int i = 0; i < 10; ++i) {
            try {
                Thread.sleep(sleepTime);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            String name = BASE_NAME + UUID.randomUUID();
            MyObject obj = new MyObject(name, i);
            System.out.println("SENDING OBJECT: " + obj.hashCode());
            System.out.println(" - Name: " + obj.getName());
            System.out.println(" - Value: " + obj.getValue());
            ods.publish(obj);
        }
        System.out.println("Requesting stream closure");
        try {
            Thread.sleep(sleepTime * 2);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        ods.close();
    }

    public static void writeObjectList(ObjectDistroStream<MyObject> ods, int sleepTime) throws BackendException {
        for (int i = 0; i < 2; ++i) {
            try {
                Thread.sleep(sleepTime);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            ArrayList<MyObject> objects = new ArrayList<MyObject>();
            for (int j = 0; j < 10; ++j) {
                String name = BASE_NAME + UUID.randomUUID();
                MyObject obj = new MyObject(name, i * 10 + j);
                objects.add(obj);
            }
            System.out.println("SENDING OBJECTS: " + objects.hashCode());
            for (MyObject obj : objects) {
                System.out.println(" - Name: " + obj.getName());
                System.out.println(" - Value: " + obj.getValue());
            }
            ods.publish(objects);
        }
        try {
            Thread.sleep(sleepTime);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        System.out.println("Requesting stream closure");
        ods.close();
    }

    public static Integer readObjects(ObjectDistroStream<MyObject> ods, int sleepTime) throws IOException, BackendException {
        Integer numNewObjects;
        Integer totalObjects = 0;
        while (!ods.isClosed()) {
            try {
                Thread.sleep(sleepTime);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("Polling new objects");
            numNewObjects = Tasks.pollObjects(ods);
            totalObjects = totalObjects + numNewObjects;
        }
        try {
            Thread.sleep(sleepTime);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        numNewObjects = Tasks.pollObjects(ods);
        totalObjects = totalObjects + numNewObjects;
        return totalObjects;
    }

    public static Integer readObjectsTimeout(ObjectDistroStream<MyObject> ods, long timeout, int sleepTime) throws IOException, BackendException {
        Integer numNewObjects;
        Integer totalObjects = 0;
        while (!ods.isClosed()) {
            try {
                Thread.sleep(sleepTime);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("Polling new objects");
            numNewObjects = Tasks.pollWithTimeout(ods, timeout);
            totalObjects = totalObjects + numNewObjects;
        }
        try {
            Thread.sleep(sleepTime);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        numNewObjects = Tasks.pollWithTimeout(ods, timeout);
        totalObjects = totalObjects + numNewObjects;
        return totalObjects;
    }

    private static Integer pollObjects(ObjectDistroStream<MyObject> ods) throws IOException, BackendException {
        List<MyObject> newObjects = ods.poll();
        for (MyObject obj : newObjects) {
            System.out.println("RECEIVED OBJECT: " + obj.hashCode());
            System.out.println(" - Name: " + obj.getName());
            System.out.println(" - Value: " + obj.getValue());
        }
        return newObjects.size();
    }

    private static Integer pollWithTimeout(ObjectDistroStream<MyObject> ods, long timeout) throws IOException, BackendException {
        List<MyObject> newObjects = ods.poll(timeout);
        for (MyObject obj : newObjects) {
            System.out.println("RECEIVED OBJECT: " + obj.hashCode());
            System.out.println(" - Name: " + obj.getName());
            System.out.println(" - Value: " + obj.getValue());
        }
        return newObjects.size();
    }

    public static Integer processObject(MyObject obj) throws IOException {
        System.out.println("RECEIVED OBJECT: " + obj.hashCode());
        System.out.println(" - Name: " + obj.getName());
        System.out.println(" - Value: " + obj.getValue());
        return 1;
    }
}

