/*
 * Decompiled with CFR 0.152.
 */
package es.bsc.compss.types.allocatableactions;

import es.bsc.compss.api.TaskMonitor;
import es.bsc.compss.comm.Comm;
import es.bsc.compss.components.impl.ResourceScheduler;
import es.bsc.compss.components.impl.TaskProducer;
import es.bsc.compss.scheduler.exceptions.BlockedActionException;
import es.bsc.compss.scheduler.exceptions.FailedActionException;
import es.bsc.compss.scheduler.exceptions.UnassignedActionException;
import es.bsc.compss.scheduler.types.ActionOrchestrator;
import es.bsc.compss.scheduler.types.AllocatableAction;
import es.bsc.compss.scheduler.types.SchedulingInformation;
import es.bsc.compss.scheduler.types.Score;
import es.bsc.compss.types.Task;
import es.bsc.compss.types.TaskDescription;
import es.bsc.compss.types.annotations.parameter.DataType;
import es.bsc.compss.types.annotations.parameter.Direction;
import es.bsc.compss.types.annotations.parameter.OnFailure;
import es.bsc.compss.types.data.DataAccessId;
import es.bsc.compss.types.data.DataInstanceId;
import es.bsc.compss.types.data.LogicalData;
import es.bsc.compss.types.data.Transferable;
import es.bsc.compss.types.data.accessid.RAccessId;
import es.bsc.compss.types.data.accessid.RWAccessId;
import es.bsc.compss.types.data.accessid.WAccessId;
import es.bsc.compss.types.data.listener.EventListener;
import es.bsc.compss.types.data.location.DataLocation;
import es.bsc.compss.types.data.operation.JobTransfersListener;
import es.bsc.compss.types.implementations.Implementation;
import es.bsc.compss.types.job.Job;
import es.bsc.compss.types.job.JobListener;
import es.bsc.compss.types.job.JobStatusListener;
import es.bsc.compss.types.parameter.CollectionParameter;
import es.bsc.compss.types.parameter.DependencyParameter;
import es.bsc.compss.types.parameter.ExternalPSCOParameter;
import es.bsc.compss.types.parameter.Parameter;
import es.bsc.compss.types.resources.Worker;
import es.bsc.compss.types.resources.WorkerResourceDescription;
import es.bsc.compss.types.uri.SimpleURI;
import es.bsc.compss.util.CoreManager;
import es.bsc.compss.util.ErrorManager;
import es.bsc.compss.util.JobDispatcher;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ExecutionAction
extends AllocatableAction {
    private static final int TRANSFER_CHANCES = 2;
    private static final int SUBMISSION_CHANCES = 2;
    private static final int SCHEDULING_CHANCES = 2;
    private static final Logger JOB_LOGGER = LogManager.getLogger("es.bsc.compss.Components.TaskDispatcher.JobManager");
    protected final TaskProducer producer;
    protected final Task task;
    private final LinkedList<Integer> jobs;
    private int transferErrors = 0;
    protected int executionErrors = 0;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ExecutionAction(SchedulingInformation schedulingInformation, ActionOrchestrator orchestrator, TaskProducer producer, Task task) {
        super(schedulingInformation, orchestrator);
        this.producer = producer;
        this.task = task;
        this.jobs = new LinkedList();
        this.transferErrors = 0;
        this.executionErrors = 0;
        this.task.addExecution(this);
        Task task2 = this.task;
        synchronized (task2) {
            for (Task predecessor : this.task.getPredecessors()) {
                for (ExecutionAction e : predecessor.getExecutions()) {
                    if (e == null || !e.isPending()) continue;
                    this.addDataPredecessor(e);
                }
            }
        }
        Task resourceConstraintTask = this.task.getEnforcingTask();
        if (resourceConstraintTask != null) {
            for (ExecutionAction e : resourceConstraintTask.getExecutions()) {
                this.addResourceConstraint(e);
            }
        }
    }

    public final Task getTask() {
        return this.task;
    }

    @Override
    public boolean isToReserveResources() {
        return true;
    }

    @Override
    public boolean isToReleaseResources() {
        return true;
    }

    @Override
    public boolean isToStopResource() {
        return false;
    }

    @Override
    protected void doAction() {
        JOB_LOGGER.info("Ordering transfers to " + this.getAssignedResource() + " to run task: " + this.task.getId());
        this.transferErrors = 0;
        this.executionErrors = 0;
        TaskMonitor monitor = this.task.getTaskMonitor();
        monitor.onSubmission();
        this.doInputTransfers();
    }

    private void doInputTransfers() {
        JobTransfersListener listener = new JobTransfersListener(this);
        this.transferInputData(listener);
        listener.enable();
    }

    private void transferInputData(JobTransfersListener listener) {
        TaskDescription taskDescription = this.task.getTaskDescription();
        block4: for (Parameter p : taskDescription.getParameters()) {
            JOB_LOGGER.debug("    * " + p);
            if (!(p instanceof DependencyParameter)) continue;
            DependencyParameter dp = (DependencyParameter)p;
            switch (taskDescription.getType()) {
                case METHOD: {
                    this.transferJobData(dp, listener);
                    continue block4;
                }
                case SERVICE: {
                    if (dp.getDirection() == Direction.INOUT) continue block4;
                    this.transferJobData(dp, listener);
                }
            }
        }
    }

    private void transferJobData(DependencyParameter param, JobTransfersListener listener) {
        String srcName;
        if (param.getType() == DataType.COLLECTION_T) {
            CollectionParameter cp = (CollectionParameter)param;
            JOB_LOGGER.debug("Detected CollectionParameter " + cp);
            for (Parameter p : cp.getParameters()) {
                DependencyParameter dp = (DependencyParameter)p;
                this.transferJobData(dp, listener);
            }
        }
        Worker<? extends WorkerResourceDescription> w = this.getAssignedResource().getResource();
        DataAccessId access = param.getDataAccessId();
        if (access instanceof WAccessId) {
            String tgtName = ((WAccessId)access).getWrittenDataInstance().getRenaming();
            if (param instanceof ExternalPSCOParameter) {
                ExternalPSCOParameter epp = (ExternalPSCOParameter)param;
                tgtName = epp.getId();
            }
            if (DEBUG) {
                JOB_LOGGER.debug("Setting data target job transfer: " + w.getCompleteRemotePath(param.getType(), tgtName));
            }
            JOB_LOGGER.debug("Setting data target job transfer: " + w.getCompleteRemotePath(param.getType(), tgtName));
            param.setDataTarget(w.getCompleteRemotePath(param.getType(), tgtName).getPath());
            return;
        }
        listener.addOperation();
        if (access instanceof RAccessId) {
            srcName = ((RAccessId)access).getReadDataInstance().getRenaming();
            w.getData(srcName, srcName, (Transferable)param, (EventListener)listener);
        } else {
            srcName = ((RWAccessId)access).getReadDataInstance().getRenaming();
            String tgtName = ((RWAccessId)access).getWrittenDataInstance().getRenaming();
            w.getData(srcName, tgtName, (LogicalData)null, (Transferable)param, (EventListener)listener);
        }
    }

    public final void failedTransfers(int failedtransfers) {
        JOB_LOGGER.debug("Received a notification for the transfers for task " + this.task.getId() + " with state FAILED");
        ++this.transferErrors;
        if (this.transferErrors < 2 && this.task.getOnFailure() == OnFailure.RETRY) {
            JOB_LOGGER.debug("Resubmitting input files for task " + this.task.getId() + " to host " + this.getAssignedResource().getName() + " since " + failedtransfers + " transfers failed.");
            this.doInputTransfers();
        } else {
            ErrorManager.warn("Transfers for running task " + this.task.getId() + " on worker " + this.getAssignedResource().getName() + " have failed.");
            this.notifyError();
        }
    }

    public final void doSubmit(int transferGroupId) {
        JOB_LOGGER.debug("Received a notification for the transfers of task " + this.task.getId() + " with state DONE");
        JobStatusListener listener = new JobStatusListener(this);
        Job<?> job = this.submitJob(transferGroupId, listener);
        this.jobs.add(job.getJobId());
        JOB_LOGGER.info((this.getExecutingResources().size() > 1 ? "Rescheduled" : "New") + " Job " + job.getJobId() + " (Task: " + this.task.getId() + ")");
        JOB_LOGGER.info("  * Method name: " + this.task.getTaskDescription().getName());
        JOB_LOGGER.info("  * Target host: " + this.getAssignedResource().getName());
        this.profile.start();
        JobDispatcher.dispatch(job);
    }

    protected Job<?> submitJob(int transferGroupId, JobStatusListener listener) {
        if (DEBUG) {
            LOGGER.debug(this.toString() + " starts job creation");
        }
        Worker<? extends WorkerResourceDescription> w = this.getAssignedResource().getResource();
        ArrayList<String> slaveNames = new ArrayList<String>();
        Job<?> job = w.newJob(this.task.getId(), this.task.getTaskDescription(), this.getAssignedImplementation(), slaveNames, listener);
        job.setTransferGroupId(transferGroupId);
        job.setHistory(Job.JobHistory.NEW);
        return job;
    }

    public final void failedJob(Job<?> job, JobListener.JobEndStatus endStatus) {
        this.profile.end();
        int jobId = job.getJobId();
        JOB_LOGGER.error("Received a notification for job " + jobId + " with state FAILED");
        ++this.executionErrors;
        if (this.transferErrors + this.executionErrors < 2 && this.task.getOnFailure() == OnFailure.RETRY) {
            JOB_LOGGER.error("Job " + job.getJobId() + " for running task " + this.task.getId() + " on worker " + this.getAssignedResource().getName() + " has failed; resubmitting task to the same worker.");
            ErrorManager.warn("Job " + job.getJobId() + " for running task " + this.task.getId() + " on worker " + this.getAssignedResource().getName() + " has failed; resubmitting task to the same worker.");
            job.setHistory(Job.JobHistory.RESUBMITTED);
            this.profile.start();
            JobDispatcher.dispatch(job);
        } else {
            if (this.task.getOnFailure() == OnFailure.IGNORE) {
                this.doOutputTransfers(job);
            }
            this.notifyError();
        }
    }

    public final void completedJob(Job<?> job) {
        this.profile.end();
        int jobId = job.getJobId();
        JOB_LOGGER.info("Received a notification for job " + jobId + " with state OK (avg. duration: " + this.profile.getAverageExecutionTime() + ")");
        this.doOutputTransfers(job);
        this.notifyCompleted();
    }

    private final DataLocation storeOutputParameter(Job<?> job, Parameter p) {
        Worker<? extends WorkerResourceDescription> w = this.getAssignedResource().getResource();
        if (p instanceof DependencyParameter) {
            DataInstanceId dId = null;
            DependencyParameter dp = (DependencyParameter)p;
            switch (p.getDirection()) {
                case CONCURRENT: 
                case IN: {
                    return null;
                }
                case OUT: {
                    dId = ((WAccessId)dp.getDataAccessId()).getWrittenDataInstance();
                    break;
                }
                case INOUT: {
                    dId = ((RWAccessId)dp.getDataAccessId()).getWrittenDataInstance();
                    if (job.getType() != Implementation.TaskType.SERVICE) break;
                    return null;
                }
            }
            String name = dId.getRenaming();
            if (job.getType() == Implementation.TaskType.METHOD) {
                String targetProtocol = null;
                switch (dp.getType()) {
                    case FILE_T: {
                        targetProtocol = DataLocation.Protocol.FILE_URI.getSchema();
                        break;
                    }
                    case OBJECT_T: {
                        targetProtocol = DataLocation.Protocol.OBJECT_URI.getSchema();
                        break;
                    }
                    case COLLECTION_T: {
                        targetProtocol = DataLocation.Protocol.OBJECT_URI.getSchema();
                        CollectionParameter cp = (CollectionParameter)p;
                        for (Parameter elem : cp.getParameters()) {
                            this.storeOutputParameter(job, elem);
                        }
                        break;
                    }
                    case PSCO_T: {
                        targetProtocol = DataLocation.Protocol.PERSISTENT_URI.getSchema();
                        break;
                    }
                    case EXTERNAL_PSCO_T: {
                        targetProtocol = DataLocation.Protocol.PERSISTENT_URI.getSchema();
                        break;
                    }
                    case BINDING_OBJECT_T: {
                        targetProtocol = DataLocation.Protocol.BINDING_URI.getSchema();
                        break;
                    }
                    default: {
                        targetProtocol = DataLocation.Protocol.ANY_URI.getSchema();
                    }
                }
                DataLocation outLoc = null;
                try {
                    SimpleURI targetURI = new SimpleURI(targetProtocol + dp.getDataTarget());
                    outLoc = DataLocation.createLocation(w, targetURI);
                }
                catch (Exception e) {
                    ErrorManager.error("ERROR: Invalid location URI " + dp.getDataTarget(), e);
                }
                Comm.registerLocation(name, outLoc);
                return outLoc;
            }
            Object value = job.getReturnValue();
            LogicalData ld = Comm.registerValue(name, value);
            Iterator<DataLocation> iterator = ld.getLocations().iterator();
            if (iterator.hasNext()) {
                DataLocation loc = iterator.next();
                return loc;
            }
        }
        return null;
    }

    private final void doOutputTransfers(Job<?> job) {
        Parameter[] params = job.getTaskParams().getParameters();
        for (int i = 0; i < params.length; ++i) {
            Parameter p = params[i];
            DataLocation outLoc = this.storeOutputParameter(job, p);
            TaskMonitor monitor = this.task.getTaskMonitor();
            if (outLoc == null) continue;
            monitor.valueGenerated(i, p.getType(), p.getName(), outLoc);
        }
    }

    @Override
    protected void doCompleted() {
        this.getAssignedResource().profiledExecution(this.getAssignedImplementation(), this.profile);
        TaskMonitor monitor = this.task.getTaskMonitor();
        monitor.onSuccesfulExecution();
        this.task.decreaseExecutionCount();
        this.task.setStatus(Task.TaskState.FINISHED);
        this.producer.notifyTaskEnd(this.task);
    }

    @Override
    protected void doError() throws FailedActionException {
        TaskMonitor monitor = this.task.getTaskMonitor();
        monitor.onErrorExecution();
        if (this.task.getOnFailure() == OnFailure.RETRY) {
            if (this.getExecutingResources().size() >= 2) {
                LOGGER.warn("Task " + this.task.getId() + " has already been rescheduled; notifying task failure.");
                ErrorManager.warn("Task " + this.task.getId() + " has already been rescheduled; notifying task failure.");
                throw new FailedActionException();
            }
        } else {
            LOGGER.warn("Notifying task " + this.task.getId() + " failure");
            ErrorManager.warn("Notifying task " + this.task.getId() + " failure");
            throw new FailedActionException();
        }
        ErrorManager.warn("Task " + this.task.getId() + " execution on worker " + this.getAssignedResource().getName() + " has failed; rescheduling task execution. (changing worker)");
        LOGGER.warn("Task " + this.task.getId() + " execution on worker " + this.getAssignedResource().getName() + " has failed; rescheduling task execution. (changing worker)");
    }

    @Override
    protected void doAbort() {
        TaskMonitor monitor = this.task.getTaskMonitor();
        monitor.onAbortedExecution();
    }

    @Override
    protected void doFailed() {
        String taskName = this.task.getTaskDescription().getName();
        StringBuilder sb = new StringBuilder();
        sb.append("Task '").append(taskName).append("' TOTALLY FAILED.\n");
        sb.append("Possible causes:\n");
        sb.append("     -Exception thrown by task '").append(taskName).append("'.\n");
        sb.append("     -Expected output files not generated by task '").append(taskName).append("'.\n");
        sb.append("     -Could not provide nor retrieve needed data between master and worker.\n");
        sb.append("\n");
        sb.append("Check files '").append(Comm.getAppHost().getJobsDirPath()).append("job[");
        Iterator j = this.jobs.iterator();
        while (j.hasNext()) {
            sb.append(j.next());
            if (!j.hasNext()) break;
            sb.append("|");
        }
        sb.append("'] to find out the error.\n");
        sb.append(" \n");
        ErrorManager.warn(sb.toString());
        TaskMonitor monitor = this.task.getTaskMonitor();
        monitor.onFailedExecution();
        this.task.decreaseExecutionCount();
        this.task.setStatus(Task.TaskState.FAILED);
        this.producer.notifyTaskEnd(this.task);
    }

    @Override
    protected void doCanceled() {
        String taskName = this.task.getTaskDescription().getName();
        ErrorManager.warn("Task " + taskName + " has been cancelled.");
        this.task.decreaseExecutionCount();
        this.task.setStatus(Task.TaskState.CANCELED);
        this.producer.notifyTaskEnd(this.task);
    }

    @Override
    protected void doFailIgnored() {
        String taskName = this.task.getTaskDescription().getName();
        StringBuilder sb = new StringBuilder();
        sb.append("Task failure: Task ").append(taskName).append(" has failed. Successors keep running.\n");
        sb.append("\n");
        ErrorManager.warn(sb.toString());
        TaskMonitor monitor = this.task.getTaskMonitor();
        monitor.onFailedExecution();
        this.task.decreaseExecutionCount();
        this.task.setStatus(Task.TaskState.FINISHED);
        this.producer.notifyTaskEnd(this.task);
    }

    @Override
    public final List<ResourceScheduler<? extends WorkerResourceDescription>> getCompatibleWorkers() {
        return this.getCoreElementExecutors(this.task.getTaskDescription().getId());
    }

    @Override
    public final Implementation[] getImplementations() {
        List<Implementation> coreImpls = CoreManager.getCoreImplementations(this.task.getTaskDescription().getId());
        int coreImplsSize = coreImpls.size();
        Implementation[] impls = new Implementation[coreImplsSize];
        for (int i = 0; i < coreImplsSize; ++i) {
            impls[i] = coreImpls.get(i);
        }
        return impls;
    }

    @Override
    public <W extends WorkerResourceDescription> boolean isCompatible(Worker<W> r) {
        return r.canRun(this.task.getTaskDescription().getId());
    }

    @Override
    public final <T extends WorkerResourceDescription> List<Implementation> getCompatibleImplementations(ResourceScheduler<T> r) {
        return r.getExecutableImpls(this.task.getTaskDescription().getId());
    }

    @Override
    public final Integer getCoreId() {
        return this.task.getTaskDescription().getId();
    }

    @Override
    public final int getPriority() {
        return this.task.getTaskDescription().hasPriority() ? 1 : 0;
    }

    @Override
    public OnFailure getOnFailure() {
        return this.task.getOnFailure();
    }

    @Override
    public final <T extends WorkerResourceDescription> Score schedulingScore(ResourceScheduler<T> targetWorker, Score actionScore) {
        Score computedScore = targetWorker.generateResourceScore(this, this.task.getTaskDescription(), actionScore);
        return computedScore;
    }

    @Override
    public final void schedule(Score actionScore) throws BlockedActionException, UnassignedActionException {
        LinkedList<ResourceScheduler<? extends WorkerResourceDescription>> candidates = new LinkedList<ResourceScheduler<? extends WorkerResourceDescription>>();
        if (this.isTargetResourceEnforced()) {
            candidates.add(this.getEnforcedTargetResource());
        } else if (this.isSchedulingConstrained()) {
            for (AllocatableAction a : this.getConstrainingPredecessors()) {
                candidates.add(a.getAssignedResource());
            }
        } else {
            candidates = this.getCompatibleWorkers();
        }
        this.schedule(actionScore, candidates);
    }

    private <T extends WorkerResourceDescription> void schedule(Score actionScore, List<ResourceScheduler<? extends WorkerResourceDescription>> candidates) throws BlockedActionException, UnassignedActionException {
        StringBuilder debugString = new StringBuilder("Scheduling " + this + " execution:\n");
        ResourceScheduler<? extends WorkerResourceDescription> bestWorker = null;
        Implementation bestImpl = null;
        Score bestScore = null;
        int usefulResources = 0;
        for (ResourceScheduler<? extends WorkerResourceDescription> worker : candidates) {
            if (this.getExecutingResources().contains(worker)) {
                if (DEBUG) {
                    LOGGER.debug("Task " + this.task.getId() + " already ran on worker " + worker.getName());
                }
                if (candidates.size() > 1) continue;
                LOGGER.debug("No more candidate resources for task " + this.task.getId() + ". Trying to use worker " + worker.getName() + " again ... ");
            }
            Score resourceScore = worker.generateResourceScore(this, this.task.getTaskDescription(), actionScore);
            ++usefulResources;
            if (resourceScore == null) continue;
            for (Implementation impl : this.getCompatibleImplementations(worker)) {
                Score implScore = worker.generateImplementationScore(this, this.task.getTaskDescription(), impl, resourceScore);
                if (DEBUG) {
                    debugString.append(" Resource ").append(worker.getName()).append(" ").append(" Implementation ").append(impl.getImplementationId()).append(" ").append(" Score ").append(implScore).append("\n");
                }
                if (!Score.isBetter(implScore, bestScore)) continue;
                bestWorker = worker;
                bestImpl = impl;
                bestScore = implScore;
            }
        }
        if (DEBUG) {
            LOGGER.debug(debugString.toString());
        }
        if (bestWorker == null) {
            if (usefulResources == 0) {
                LOGGER.warn("No worker can run " + this);
                throw new BlockedActionException();
            }
            throw new UnassignedActionException();
        }
        this.schedule(bestWorker, bestImpl);
    }

    @Override
    public final <T extends WorkerResourceDescription> void schedule(ResourceScheduler<T> targetWorker, Score actionScore) throws BlockedActionException, UnassignedActionException {
        if (targetWorker == null || !targetWorker.getResource().canRun(this.task.getTaskDescription().getId()) || this.getExecutingResources().contains(targetWorker)) {
            String message = "Worker " + (targetWorker == null ? "null" : targetWorker.getName()) + " has not available resources to run " + this;
            LOGGER.warn(message);
            throw new UnassignedActionException();
        }
        Implementation bestImpl = null;
        Score bestScore = null;
        Score resourceScore = targetWorker.generateResourceScore(this, this.task.getTaskDescription(), actionScore);
        for (Implementation impl : this.getCompatibleImplementations(targetWorker)) {
            Score implScore = targetWorker.generateImplementationScore(this, this.task.getTaskDescription(), impl, resourceScore);
            if (!Score.isBetter(implScore, bestScore)) continue;
            bestImpl = impl;
            bestScore = implScore;
        }
        this.schedule(targetWorker, bestImpl);
    }

    @Override
    public final <T extends WorkerResourceDescription> void schedule(ResourceScheduler<T> targetWorker, Implementation impl) throws BlockedActionException, UnassignedActionException {
        if (targetWorker == null || impl == null) {
            throw new UnassignedActionException();
        }
        if (DEBUG) {
            LOGGER.debug("Scheduling " + this + " on worker " + (targetWorker == null ? "null" : targetWorker.getName()) + " with implementation " + (impl == null ? "null" : impl.getImplementationId()));
        }
        if (!targetWorker.getResource().canRun(impl) || this.getExecutingResources().contains(targetWorker)) {
            LOGGER.debug("Worker " + targetWorker.getName() + " has not available resources to run " + this);
            throw new UnassignedActionException();
        }
        LOGGER.info("Assigning action " + this + " to worker " + targetWorker.getName() + " with implementation " + impl.getImplementationId());
        this.assignImplementation(impl);
        this.assignResource(targetWorker);
        targetWorker.scheduleAction(this);
        TaskMonitor monitor = this.task.getTaskMonitor();
        monitor.onSchedule();
    }

    @Override
    public String toString() {
        return "ExecutionAction ( Task " + this.task.getId() + ", CE name " + this.task.getTaskDescription().getName() + ")";
    }
}

