/*
 * Decompiled with CFR 0.152.
 */
package es.bsc.compss.components.impl;

import es.bsc.compss.components.ResourceUser;
import es.bsc.compss.components.impl.AccessProcessor;
import es.bsc.compss.components.impl.TaskScheduler;
import es.bsc.compss.scheduler.types.ActionOrchestrator;
import es.bsc.compss.scheduler.types.AllocatableAction;
import es.bsc.compss.types.AbstractTask;
import es.bsc.compss.types.CoreElementDefinition;
import es.bsc.compss.types.Task;
import es.bsc.compss.types.request.exceptions.ShutdownException;
import es.bsc.compss.types.request.listener.RequestListener;
import es.bsc.compss.types.request.td.ActionUpdate;
import es.bsc.compss.types.request.td.CERegistration;
import es.bsc.compss.types.request.td.CancelTaskRequest;
import es.bsc.compss.types.request.td.ExecuteTasksRequest;
import es.bsc.compss.types.request.td.MonitoringDataRequest;
import es.bsc.compss.types.request.td.PrintCurrentGraphRequest;
import es.bsc.compss.types.request.td.PrintCurrentLoadRequest;
import es.bsc.compss.types.request.td.ShutdownRequest;
import es.bsc.compss.types.request.td.TDRequest;
import es.bsc.compss.types.request.td.TaskSummaryRequest;
import es.bsc.compss.types.request.td.UpdateLocalCEIRequest;
import es.bsc.compss.types.request.td.WorkerUpdateRequest;
import es.bsc.compss.types.resources.Worker;
import es.bsc.compss.types.resources.WorkerResourceDescription;
import es.bsc.compss.types.resources.updates.PerformedIncrease;
import es.bsc.compss.types.resources.updates.ResourceUpdate;
import es.bsc.compss.util.CEIParser;
import es.bsc.compss.util.Classpath;
import es.bsc.compss.util.ErrorManager;
import es.bsc.compss.util.ResourceManager;
import es.bsc.compss.util.Tracer;
import es.bsc.compss.worker.COMPSsException;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.lang.reflect.Constructor;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class TaskDispatcher
implements Runnable,
ResourceUser,
ActionOrchestrator {
    private static final String SCHEDULERS_REL_PATH = File.separator + "Runtime" + File.separator + "scheduler";
    protected TaskScheduler scheduler;
    protected LinkedBlockingDeque<TDRequest> requestQueue = new LinkedBlockingDeque();
    protected Thread dispatcher = new Thread(this);
    protected boolean keepGoing;
    private static final Logger LOGGER = LogManager.getLogger("es.bsc.compss.Components.TaskDispatcher");
    private static final boolean DEBUG = LOGGER.isDebugEnabled();
    private static final String ERR_LOAD_SCHEDULER = "Error loading scheduler";
    private static final String ERROR_QUEUE_OFFER = "ERROR: TaskDispatcher queue offer error on ";

    public <A extends WorkerResourceDescription> TaskDispatcher() {
        this.dispatcher.setName("Task Dispatcher");
        TaskDispatcher.loadSchedulerJars();
        CEIParser.parse();
        ResourceManager.load(this);
        this.scheduler = this.constructScheduler();
        if (this.scheduler == null) {
            ErrorManager.fatal(ERR_LOAD_SCHEDULER);
        }
        this.scheduler.setOrchestrator(this);
        Iterator<Worker<? extends WorkerResourceDescription>> iterator = ResourceManager.getStaticResources().iterator();
        while (iterator.hasNext()) {
            Worker<? extends WorkerResourceDescription> worker;
            Worker<? extends WorkerResourceDescription> w = worker = iterator.next();
            this.scheduler.updateWorker(w, new PerformedIncrease<WorkerResourceDescription>(w.getDescription()));
        }
        this.keepGoing = true;
        if (Tracer.basicModeEnabled()) {
            Tracer.enablePThreads();
        }
        this.dispatcher.start();
        if (Tracer.basicModeEnabled()) {
            Tracer.disablePThreads();
        }
        LOGGER.info("Initialization finished");
    }

    @Override
    public void run() {
        while (this.keepGoing) {
            String requestType = "Not defined";
            try {
                TDRequest request = this.requestQueue.take();
                requestType = request.getType().toString();
                if (Tracer.extraeEnabled()) {
                    Tracer.emitEvent(Tracer.getTaskDispatcherRequestEvent(request.getType().name()).getId(), Tracer.getRuntimeEventsType());
                }
                request.process(this.scheduler);
                if (!Tracer.extraeEnabled()) continue;
                Tracer.emitEvent(0L, Tracer.getRuntimeEventsType());
            }
            catch (InterruptedException ie) {
                if (Tracer.extraeEnabled()) {
                    Tracer.emitEvent(0L, Tracer.getRuntimeEventsType());
                }
                Thread.currentThread().interrupt();
            }
            catch (ShutdownException se) {
                LOGGER.debug("Exiting dispatcher because of shutting down");
                if (Tracer.extraeEnabled()) {
                    Tracer.emitEvent(0L, Tracer.getRuntimeEventsType());
                }
                se.getSemaphore().release();
                break;
            }
            catch (Exception e) {
                ErrorManager.error("Error in TaskDispatcher request " + requestType, e);
                if (!Tracer.extraeEnabled()) continue;
                Tracer.emitEvent(0L, Tracer.getRuntimeEventsType());
            }
        }
    }

    private void addRequest(TDRequest request) {
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: TaskDispatcher queue offer error on add request");
        }
    }

    private void addPrioritaryRequest(TDRequest request) {
        if (!this.requestQueue.offerFirst(request)) {
            ErrorManager.error("ERROR: TaskDispatcher queue offer error on add prioritary request");
        }
    }

    public void executeTask(AccessProcessor ap, AbstractTask task) {
        if (task instanceof Task) {
            if (DEBUG) {
                StringBuilder sb = new StringBuilder("Schedule task: ");
                sb.append(((Task)task).getTaskDescription().getName()).append("(").append(task.getId()).append(") ");
                LOGGER.debug(sb);
            }
            ExecuteTasksRequest request = new ExecuteTasksRequest(ap, (Task)task);
            this.addRequest(request);
        }
    }

    public void cancelTasks(Task task, RequestListener listener) {
        CancelTaskRequest request = new CancelTaskRequest(task, listener);
        this.addRequest(request);
    }

    @Override
    public void actionRunning(AllocatableAction action) {
        ActionUpdate request = new ActionUpdate(action, ActionUpdate.Update.RUNNING);
        this.addRequest(request);
    }

    @Override
    public void actionCompletion(AllocatableAction action) {
        ActionUpdate request = new ActionUpdate(action, ActionUpdate.Update.COMPLETED);
        this.addRequest(request);
    }

    @Override
    public void actionError(AllocatableAction action) {
        ActionUpdate request = new ActionUpdate(action, ActionUpdate.Update.ERROR);
        this.addRequest(request);
    }

    @Override
    public void actionException(AllocatableAction action, COMPSsException e) {
        ActionUpdate request = new ActionUpdate(action, ActionUpdate.Update.EXCEPTION);
        request.setCOMPSsException(e);
        this.addRequest(request);
    }

    public void getTaskSummary(Logger logger) {
        Semaphore sem = new Semaphore(0);
        TaskSummaryRequest request = new TaskSummaryRequest(logger, sem);
        this.addRequest(request);
        try {
            sem.acquire();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public String getCurrentMonitoringData() {
        Semaphore sem = new Semaphore(0);
        MonitoringDataRequest request = new MonitoringDataRequest(sem);
        this.addRequest(request);
        try {
            sem.acquire();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return request.getResponse();
    }

    public void printCurrentState() {
        PrintCurrentLoadRequest request = new PrintCurrentLoadRequest();
        this.addRequest(request);
    }

    public void printCurrentGraph(BufferedWriter graph) {
        Semaphore sem = new Semaphore(0);
        PrintCurrentGraphRequest request = new PrintCurrentGraphRequest(sem, graph);
        this.addRequest(request);
        try {
            sem.acquire();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public <T extends WorkerResourceDescription> void updatedResource(Worker<T> r, ResourceUpdate<T> modification) {
        WorkerUpdateRequest<T> request = new WorkerUpdateRequest<T>(r, modification);
        this.addPrioritaryRequest(request);
    }

    public void addInterface(Class<?> forName) {
        if (DEBUG) {
            LOGGER.debug("Updating CEI " + forName.getName());
        }
        Semaphore sem = new Semaphore(0);
        UpdateLocalCEIRequest request = new UpdateLocalCEIRequest(forName, sem);
        this.addRequest(request);
        try {
            sem.acquire();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        if (DEBUG) {
            LOGGER.debug("Updated CEI " + forName.getName());
        }
    }

    public void registerNewCoreElement(CoreElementDefinition ced) {
        if (DEBUG) {
            LOGGER.debug("Registering new CoreElement");
        }
        Semaphore sem = new Semaphore(0);
        CERegistration request = new CERegistration(ced, sem);
        this.addRequest(request);
        try {
            sem.acquire();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        if (DEBUG) {
            LOGGER.debug("Registered new CoreElement");
        }
    }

    public void shutdown() {
        Semaphore sem = new Semaphore(0);
        ShutdownRequest request = new ShutdownRequest(sem);
        this.addRequest(request);
        try {
            sem.acquire();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public String toString() {
        return "TaskDispatcher[Instance" + this.hashCode() + "]";
    }

    private static void loadSchedulerJars() {
        LOGGER.info("Loading schedulers...");
        String compssHome = System.getenv("COMPSS_HOME");
        if (compssHome == null || compssHome.isEmpty()) {
            LOGGER.warn("WARN: COMPSS_HOME not defined, no schedulers loaded.");
            return;
        }
        try {
            Classpath.loadPath(compssHome + SCHEDULERS_REL_PATH, LOGGER);
        }
        catch (FileNotFoundException ex) {
            LOGGER.warn("WARN: Schedulers folder not defined, no schedulers loaded.");
        }
    }

    private TaskScheduler constructScheduler() {
        TaskScheduler scheduler = null;
        try {
            String schedFQN = System.getProperty("compss.scheduler");
            Class<?> schedClass = Class.forName(schedFQN);
            Constructor<?> schedCnstr = schedClass.getDeclaredConstructors()[0];
            scheduler = (TaskScheduler)schedCnstr.newInstance(new Object[0]);
            if (DEBUG) {
                LOGGER.debug("Loaded scheduler " + scheduler);
            }
        }
        catch (Exception e) {
            ErrorManager.fatal(ERR_LOAD_SCHEDULER, e);
        }
        return scheduler;
    }
}

