/*
 * Decompiled with CFR 0.152.
 */
package es.bsc.compss.executor.utils;

import es.bsc.compss.executor.Executor;
import es.bsc.compss.executor.ExecutorContext;
import es.bsc.compss.executor.external.ExecutionPlatformMirror;
import es.bsc.compss.executor.types.Execution;
import es.bsc.compss.executor.utils.ResourceManager;
import es.bsc.compss.executor.utils.ThreadedProperties;
import es.bsc.compss.invokers.util.JobQueue;
import es.bsc.compss.types.execution.InvocationContext;
import es.bsc.compss.types.execution.exceptions.UnsufficientAvailableComputingUnitsException;
import es.bsc.compss.types.resources.ResourceDescription;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ExecutionPlatform
implements ExecutorContext {
    private static final Logger LOGGER = LogManager.getLogger("es.bsc.compss.Worker.ThreadPool");
    private final String platformName;
    private final InvocationContext context;
    private final ResourceManager rm;
    private final JobQueue queue;
    private boolean started = false;
    private int nextThreadId = 0;
    private final TreeSet<Thread> workerThreads;
    private final LinkedList<Thread> finishedWorkerThreads;
    private final Semaphore startSemaphore;
    private final Semaphore stopSemaphore;
    private final Map<Class<?>, ExecutionPlatformMirror<?>> mirrors;

    public ExecutionPlatform(String platformName, InvocationContext context, int initialSize, ResourceManager resManager) {
        LOGGER.info("Initializing execution platform " + platformName);
        this.platformName = platformName;
        this.context = context;
        this.rm = resManager;
        this.mirrors = new HashMap();
        System.setProperties(new ThreadedProperties(System.getProperties()));
        this.queue = new JobQueue();
        this.startSemaphore = new Semaphore(0);
        this.stopSemaphore = new Semaphore(0);
        this.workerThreads = new TreeSet<Thread>(new Comparator<Thread>(){

            @Override
            public int compare(Thread t1, Thread t2) {
                return Long.compare(t1.getId(), t2.getId());
            }
        });
        this.finishedWorkerThreads = new LinkedList();
        this.addWorkerThreads(initialSize);
    }

    public void execute(Execution exec) {
        this.queue.enqueue(exec);
    }

    public final synchronized void start() {
        LOGGER.info("Starting execution platform " + this.platformName);
        for (Thread t : this.workerThreads.descendingSet()) {
            LOGGER.info("Starting Thread " + t.getName());
            t.start();
        }
        int size = this.workerThreads.size();
        this.startSemaphore.acquireUninterruptibly(size);
        this.started = true;
        LOGGER.info("Started execution platform " + this.platformName + " with " + size);
    }

    public final synchronized void stop() {
        LOGGER.info("Stopping execution platform " + this.platformName);
        int size = this.workerThreads.size();
        this.removeWorkerThreads(size);
        LOGGER.info("Stopping mirrors for execution platform " + this.platformName);
        for (ExecutionPlatformMirror<?> mirror : this.mirrors.values()) {
            mirror.stop();
        }
        this.mirrors.clear();
        this.started = false;
        LOGGER.info("Stopped execution platform " + this.platformName);
    }

    public final synchronized void addWorkerThreads(int numWorkerThreads) {
        final Semaphore startSem = this.started ? new Semaphore(numWorkerThreads) : this.startSemaphore;
        for (int i = 0; i < numWorkerThreads; ++i) {
            int id;
            ++this.nextThreadId;
            Executor executor = new Executor(this.context, this, "compute" + id){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    startSem.release();
                    super.run();
                    LinkedList linkedList = ExecutionPlatform.this.finishedWorkerThreads;
                    synchronized (linkedList) {
                        ExecutionPlatform.this.finishedWorkerThreads.add(Thread.currentThread());
                    }
                    ExecutionPlatform.this.stopSemaphore.release();
                }
            };
            Thread t = new Thread(executor);
            t.setName(this.platformName + " compute thread # " + id);
            this.workerThreads.add(t);
            if (!this.started) continue;
            t.start();
        }
        if (this.started) {
            startSem.acquireUninterruptibly(numWorkerThreads);
        }
    }

    public final synchronized void removeWorkerThreads(int numWorkerThreads) {
        if (numWorkerThreads > 0) {
            LOGGER.info("Stopping " + numWorkerThreads + " executors from execution platform " + this.platformName);
            for (int i = 0; i < numWorkerThreads; ++i) {
                this.queue.enqueue(new Execution(null, null));
            }
            LOGGER.info("Waking up all locks");
            this.queue.wakeUpAll();
            LOGGER.info("Waiting for " + numWorkerThreads + " threads finished");
            this.stopSemaphore.acquireUninterruptibly(numWorkerThreads);
            this.joinThreads();
            LOGGER.info("Stopped " + numWorkerThreads + " executors from execution platform " + this.platformName);
        }
    }

    private void joinThreads() {
        Iterator iter = this.finishedWorkerThreads.iterator();
        while (iter.hasNext()) {
            Thread t = (Thread)iter.next();
            if (t == null) continue;
            try {
                t.join();
                iter.remove();
                this.workerThreads.remove(t);
                t = null;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        Runtime.getRuntime().gc();
    }

    @Override
    public int getSize() {
        return this.workerThreads.size();
    }

    @Override
    public Execution getJob() {
        return this.queue.dequeue();
    }

    @Override
    public ResourceManager.InvocationResources acquireResources(int jobId, ResourceDescription requirements) throws UnsufficientAvailableComputingUnitsException {
        return this.rm.acquireResources(jobId, requirements);
    }

    @Override
    public void releaseResources(int jobId) {
        this.rm.releaseResources(jobId);
    }

    @Override
    public ExecutionPlatformMirror<?> getMirror(Class<?> invoker) {
        return this.mirrors.get(invoker);
    }

    @Override
    public void registerMirror(Class<?> invoker, ExecutionPlatformMirror<?> mirror) {
        this.mirrors.put(invoker, mirror);
    }

    @Override
    public Collection<ExecutionPlatformMirror<?>> getMirrors() {
        return this.mirrors.values();
    }
}

