/*
 * Decompiled with CFR 0.152.
 */
package es.bsc.compss.execution;

import es.bsc.compss.execution.ExecutionPlatformConfiguration;
import es.bsc.compss.execution.types.ExecutorContext;
import es.bsc.compss.execution.types.InvocationResources;
import es.bsc.compss.execution.utils.JobQueue;
import es.bsc.compss.execution.utils.ResourceManager;
import es.bsc.compss.executor.Executor;
import es.bsc.compss.executor.InvocationRunner;
import es.bsc.compss.executor.external.ExecutionPlatformMirror;
import es.bsc.compss.invokers.Invoker;
import es.bsc.compss.types.execution.Execution;
import es.bsc.compss.types.execution.Invocation;
import es.bsc.compss.types.execution.InvocationContext;
import es.bsc.compss.types.execution.exceptions.UnsufficientAvailableResourcesException;
import es.bsc.compss.types.resources.ResourceDescription;
import es.bsc.compss.util.TraceEvent;
import es.bsc.compss.util.Tracer;
import es.bsc.compss.utils.execution.ThreadedProperties;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ExecutionPlatform
implements ExecutorContext {
    private static final Logger LOGGER = LogManager.getLogger("es.bsc.compss.Worker.ThreadPool");
    private final String platformName;
    private final InvocationContext context;
    private final ResourceManager rm;
    private boolean reuseResourcesOnBlockedInvocation = true;
    private final JobQueue queue;
    private boolean started = false;
    private int nextThreadId = 0;
    private final TreeSet<Thread> workerThreads;
    private final LinkedList<Thread> finishedWorkerThreads;
    private final Semaphore startSemaphore;
    private final Semaphore stopSemaphore;
    private final Map<Class<?>, ExecutionPlatformMirror<?>> mirrors;
    private Timer timer;
    private Set<Integer> toCancel;
    private Map<Integer, Invoker> executingJobs;

    public ExecutionPlatform(String platformName, InvocationContext context, ExecutionPlatformConfiguration config, ResourceManager resManager) {
        LOGGER.info("Initializing execution platform " + platformName);
        this.platformName = platformName;
        this.context = context;
        this.rm = resManager;
        this.mirrors = new HashMap();
        System.setProperties(new ThreadedProperties(System.getProperties()));
        this.queue = new JobQueue();
        this.startSemaphore = new Semaphore(0);
        this.stopSemaphore = new Semaphore(0);
        this.reuseResourcesOnBlockedInvocation = config.isReuseResourcesOnBlockedRunner();
        this.workerThreads = new TreeSet<Thread>(new Comparator<Thread>(){

            @Override
            public int compare(Thread t1, Thread t2) {
                return Long.compare(t1.getId(), t2.getId());
            }
        });
        this.finishedWorkerThreads = new LinkedList();
        this.toCancel = new HashSet<Integer>();
        this.executingJobs = new ConcurrentHashMap<Integer, Invoker>();
        this.addWorkerThreads(config.getInitialSize());
    }

    public void execute(Execution exec) {
        this.queue.enqueue(exec);
    }

    public final synchronized void start() {
        LOGGER.info("Starting execution platform " + this.platformName);
        this.startTimer();
        for (Thread t : this.workerThreads.descendingSet()) {
            LOGGER.info("Starting Thread " + t.getName());
            t.start();
        }
        int size = this.workerThreads.size();
        this.startSemaphore.acquireUninterruptibly(size);
        this.started = true;
        LOGGER.info("Started execution platform " + this.platformName + " with " + size);
    }

    public final void stop() {
        LOGGER.info("Stopping execution platform " + this.platformName);
        int size = this.workerThreads.size();
        this.removeWorkerThreads(size);
        LOGGER.info("Stopping mirrors for execution platform " + this.platformName);
        for (ExecutionPlatformMirror<?> mirror : this.mirrors.values()) {
            mirror.stop();
        }
        this.mirrors.clear();
        this.stopTimer();
        this.started = false;
        LOGGER.info("Stopped execution platform " + this.platformName);
    }

    private void startTimer() {
        if (Tracer.basicModeEnabled()) {
            Tracer.enablePThreads(1);
        }
        this.timer = new Timer(this.platformName + " deadline reapper");
        if (Tracer.extraeEnabled()) {
            this.timer.schedule(new TimerTask(){

                @Override
                public void run() {
                    if (Tracer.basicModeEnabled()) {
                        Tracer.disablePThreads(1);
                    }
                    if (Tracer.extraeEnabled()) {
                        Tracer.emitEvent(TraceEvent.TIMER_THREAD_ID.getId(), TraceEvent.TIMER_THREAD_ID.getType());
                    }
                }
            }, 0L);
        }
    }

    private void stopTimer() {
        if (Tracer.extraeEnabled()) {
            final Semaphore sem = new Semaphore(0);
            this.timer.schedule(new TimerTask(){

                @Override
                public void run() {
                    if (Tracer.extraeEnabled()) {
                        Tracer.emitEvent(0L, TraceEvent.TIMER_THREAD_ID.getType());
                    }
                    sem.release();
                }
            }, 0L);
            try {
                sem.acquire();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        this.timer.cancel();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void addWorkerThreads(int numWorkerThreads) {
        Semaphore startSem;
        boolean wait = false;
        ExecutionPlatform executionPlatform = this;
        synchronized (executionPlatform) {
            if (this.started) {
                startSem = new Semaphore(numWorkerThreads);
                wait = true;
            } else {
                startSem = this.startSemaphore;
            }
            if (Tracer.basicModeEnabled()) {
                Tracer.enablePThreads(numWorkerThreads);
            }
            for (int i = 0; i < numWorkerThreads; ++i) {
                int id;
                ++this.nextThreadId;
                Executor executor = new Executor(this.context, this, "executor" + id){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void run() {
                        startSem.release();
                        super.run();
                        ExecutionPlatform executionPlatform = ExecutionPlatform.this;
                        synchronized (executionPlatform) {
                            ExecutionPlatform.this.finishedWorkerThreads.add(Thread.currentThread());
                        }
                        ExecutionPlatform.this.stopSemaphore.release();
                    }
                };
                Thread t = new Thread(executor);
                t.setName(this.platformName + " executor thread # " + id);
                this.workerThreads.add(t);
                if (!this.started) continue;
                t.start();
            }
        }
        if (wait) {
            startSem.acquireUninterruptibly(numWorkerThreads);
        }
    }

    public final void removeWorkerThreads(int numWorkerThreads) {
        if (numWorkerThreads > 0) {
            LOGGER.info("Stopping " + numWorkerThreads + " executors from execution platform " + this.platformName);
            for (int i = 0; i < numWorkerThreads; ++i) {
                this.queue.enqueue(new Execution(null, null));
            }
            LOGGER.info("Waking up all locks");
            this.queue.wakeUpAll();
            LOGGER.info("Waiting for " + numWorkerThreads + " threads finished");
            this.stopSemaphore.acquireUninterruptibly(numWorkerThreads);
            this.joinThreads();
            LOGGER.info("Stopped " + numWorkerThreads + " executors from execution platform " + this.platformName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void joinThreads() {
        ExecutionPlatform executionPlatform = this;
        synchronized (executionPlatform) {
            Iterator iter = this.finishedWorkerThreads.iterator();
            while (iter.hasNext()) {
                Thread t = (Thread)iter.next();
                if (t == null) continue;
                try {
                    t.join();
                    iter.remove();
                    this.workerThreads.remove(t);
                    t = null;
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        Runtime.getRuntime().gc();
    }

    public void cancelJob(int jobId) {
        Invoker invoker = this.executingJobs.get(jobId);
        if (invoker == null) {
            LOGGER.debug("Job " + jobId + " is to be cancelled");
            this.toCancel.add(jobId);
        } else {
            LOGGER.debug("Cancelling running job " + jobId);
            this.executingJobs.get(jobId).cancel();
        }
    }

    @Override
    public void registerRunningJob(Invocation invocation, Invoker invoker, TimerTask timeOutHandler) {
        int jobId = invocation.getJobId();
        LOGGER.debug("Registering job " + jobId);
        this.executingJobs.put(jobId, invoker);
        if (this.toCancel.contains(jobId)) {
            this.cancelJob(jobId);
        } else {
            long timeout = invocation.getTimeOut();
            if (timeout > 0L) {
                this.timer.schedule(timeOutHandler, timeout);
            }
        }
    }

    @Override
    public InvocationResources acquireResources(int jobId, ResourceDescription requirements, InvocationResources preferredAllocation) throws UnsufficientAvailableResourcesException {
        return this.rm.acquireResources(jobId, requirements, preferredAllocation);
    }

    @Override
    public void blockedRunner(Invocation invocation, InvocationRunner runner, InvocationResources assignedResources) {
        LOGGER.debug("Stalled execution of job " + invocation.getJobId());
        if (this.reuseResourcesOnBlockedInvocation) {
            LOGGER.debug("Releasing resources assigned to job " + invocation.getJobId());
            int jobId = invocation.getJobId();
            this.rm.releaseResources(jobId);
            this.addWorkerThreads(1);
            this.context.idleReservedResourcesDetected(invocation.getRequirements());
        }
    }

    @Override
    public void unblockedRunner(Invocation invocation, InvocationRunner runner, InvocationResources previousAllocation, Semaphore sem) {
        LOGGER.debug("Execution of job " + invocation.getJobId() + " ready to continue");
        if (this.reuseResourcesOnBlockedInvocation) {
            LOGGER.debug("Reacquiring resources assigned to job " + invocation.getJobId());
            this.removeWorkerThreads(1);
            int jobId = invocation.getJobId();
            this.rm.reacquireResources(jobId, invocation.getRequirements(), previousAllocation, sem);
            this.context.reactivatedReservedResourcesDetected(invocation.getRequirements());
        } else {
            sem.release();
        }
    }

    @Override
    public void releaseResources(int jobId) {
        this.rm.releaseResources(jobId);
    }

    @Override
    public void unregisterRunningJob(int jobId) {
        LOGGER.debug("Unregistering job " + jobId);
        this.executingJobs.remove(jobId);
        this.toCancel.remove(jobId);
    }

    @Override
    public int getSize() {
        return this.workerThreads.size();
    }

    @Override
    public Execution getJob() {
        return this.queue.dequeue();
    }

    @Override
    public ExecutionPlatformMirror<?> getMirror(Class<?> invoker) {
        return this.mirrors.get(invoker);
    }

    @Override
    public void registerMirror(Class<?> invoker, ExecutionPlatformMirror<?> mirror) {
        this.mirrors.put(invoker, mirror);
    }

    @Override
    public Collection<ExecutionPlatformMirror<?>> getMirrors() {
        return this.mirrors.values();
    }
}

