/*
 * Decompiled with CFR 0.152.
 */
package filters;

import es.bsc.compss.api.COMPSs;
import es.bsc.distrostreamlib.api.objects.ObjectDistroStream;
import es.bsc.distrostreamlib.exceptions.BackendException;
import es.bsc.distrostreamlib.exceptions.RegistrationException;
import es.bsc.distrostreamlib.types.ConsumerMode;
import filters.BigFilterTasks;
import filters.FilterArguments;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import mains.MyElement;

public class BigFilter {
    private static final int TIME_BETWEEN_POLLS = 5000;

    private static void processObjects(ObjectDistroStream<MyElement> odsSensor, ObjectDistroStream<MyElement> odsFiltered, Queue<MyElement> pendingObjects, FilterArguments fargs, boolean forced) throws BackendException {
        LinkedList batchObjects;
        List newObjects = odsSensor.poll();
        pendingObjects.addAll(newObjects);
        while (pendingObjects.size() > fargs.getBatchSize()) {
            batchObjects = new LinkedList();
            for (int i = 0; i < fargs.getBatchSize(); ++i) {
                MyElement next = pendingObjects.poll();
                batchObjects.add(next);
            }
            System.out.println("[DEBUG] Launch filter task with " + batchObjects.size() + " elements");
            BigFilterTasks.filterObjects(batchObjects, odsFiltered, fargs.getSleepBaseTime(), fargs.getSleepRandomRange());
        }
        if (forced) {
            batchObjects = (LinkedList)pendingObjects;
            System.out.println("[DEBUG] Launch filter task with " + batchObjects.size() + " elements");
            BigFilterTasks.filterObjects(batchObjects, odsFiltered, fargs.getSleepBaseTime(), fargs.getSleepRandomRange());
            pendingObjects.clear();
        }
    }

    public static void main(String[] args) throws RegistrationException, BackendException, InterruptedException {
        System.out.println("[INFO] Starting application");
        long start = System.currentTimeMillis();
        System.out.println("[INFO] Parsing application arguments");
        FilterArguments fargs = new FilterArguments(args);
        System.out.println("[INFO] Initializing streams");
        ObjectDistroStream odsSensor = new ObjectDistroStream(fargs.getAliasSensor(), ConsumerMode.AT_MOST_ONCE);
        ObjectDistroStream odsFiltered = new ObjectDistroStream(fargs.getAliasFiltered(), ConsumerMode.AT_MOST_ONCE);
        System.out.println("[INFO] Processing input stream elements");
        LinkedList<MyElement> pendingObjects = new LinkedList<MyElement>();
        while (!odsSensor.isClosed()) {
            BigFilter.processObjects((ObjectDistroStream<MyElement>)odsSensor, (ObjectDistroStream<MyElement>)odsFiltered, pendingObjects, fargs, false);
            Thread.sleep(5000L);
        }
        BigFilter.processObjects((ObjectDistroStream<MyElement>)odsSensor, (ObjectDistroStream<MyElement>)odsFiltered, pendingObjects, fargs, true);
        System.out.println("[INFO] Waiting for all batch tasks to finish");
        COMPSs.barrier();
        odsFiltered.close();
        System.out.println("DONE");
        long end = System.currentTimeMillis();
        long elapsedTime = end - start;
        System.out.println("[TIME] TOTAL ELAPSED: " + elapsedTime);
    }
}

