/*
 * Decompiled with CFR 0.152.
 */
package es.bsc.compss.types.allocatableactions;

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import es.bsc.compss.api.TaskMonitor;
import es.bsc.compss.comm.Comm;
import es.bsc.compss.components.impl.AccessProcessor;
import es.bsc.compss.components.impl.ResourceScheduler;
import es.bsc.compss.scheduler.exceptions.BlockedActionException;
import es.bsc.compss.scheduler.exceptions.FailedActionException;
import es.bsc.compss.scheduler.exceptions.UnassignedActionException;
import es.bsc.compss.scheduler.types.ActionGroup;
import es.bsc.compss.scheduler.types.ActionOrchestrator;
import es.bsc.compss.scheduler.types.AllocatableAction;
import es.bsc.compss.scheduler.types.SchedulingInformation;
import es.bsc.compss.scheduler.types.Score;
import es.bsc.compss.types.AbstractTask;
import es.bsc.compss.types.CommutativeGroupTask;
import es.bsc.compss.types.CoreElement;
import es.bsc.compss.types.Task;
import es.bsc.compss.types.TaskDescription;
import es.bsc.compss.types.TaskGroup;
import es.bsc.compss.types.TaskState;
import es.bsc.compss.types.annotations.parameter.DataType;
import es.bsc.compss.types.annotations.parameter.Direction;
import es.bsc.compss.types.annotations.parameter.OnFailure;
import es.bsc.compss.types.data.DataAccessId;
import es.bsc.compss.types.data.DataInstanceId;
import es.bsc.compss.types.data.LogicalData;
import es.bsc.compss.types.data.Transferable;
import es.bsc.compss.types.data.accessid.RAccessId;
import es.bsc.compss.types.data.accessid.RWAccessId;
import es.bsc.compss.types.data.accessid.WAccessId;
import es.bsc.compss.types.data.listener.EventListener;
import es.bsc.compss.types.data.location.DataLocation;
import es.bsc.compss.types.data.operation.JobTransfersListener;
import es.bsc.compss.types.implementations.Implementation;
import es.bsc.compss.types.job.Job;
import es.bsc.compss.types.job.JobEndStatus;
import es.bsc.compss.types.job.JobHistory;
import es.bsc.compss.types.job.JobListener;
import es.bsc.compss.types.parameter.CollectionParameter;
import es.bsc.compss.types.parameter.DependencyParameter;
import es.bsc.compss.types.parameter.DictCollectionParameter;
import es.bsc.compss.types.parameter.Parameter;
import es.bsc.compss.types.resources.Worker;
import es.bsc.compss.types.resources.WorkerResourceDescription;
import es.bsc.compss.types.uri.SimpleURI;
import es.bsc.compss.util.ErrorManager;
import es.bsc.compss.util.JobDispatcher;
import es.bsc.compss.util.Tracer;
import es.bsc.compss.worker.COMPSsException;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ExecutionAction
extends AllocatableAction
implements JobListener {
    private static final int TRANSFER_CHANCES = 2;
    private static final int SUBMISSION_CHANCES = 2;
    private static final int SCHEDULING_CHANCES = 2;
    private static final Logger JOB_LOGGER = LogManager.getLogger("es.bsc.compss.Components.TaskDispatcher.JobManager");
    protected final AccessProcessor ap;
    protected final Task task;
    private final LinkedList<Integer> jobs;
    private int transferErrors = 0;
    protected int executionErrors = 0;
    protected Job<?> currentJob;
    boolean cancelledBeforeSubmit = false;
    boolean extraResubmit = false;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ExecutionAction(SchedulingInformation schedulingInformation, ActionOrchestrator orchestrator, AccessProcessor ap, Task task) {
        super(schedulingInformation, orchestrator);
        this.ap = ap;
        this.task = task;
        this.jobs = new LinkedList();
        this.transferErrors = 0;
        this.executionErrors = 0;
        this.task.addExecution(this);
        Task task2 = this.task;
        synchronized (task2) {
            this.registerDataDependencies();
            this.registerStreamProducers();
            this.registerMutex();
        }
        Task resourceConstraintTask = this.task.getEnforcingTask();
        if (resourceConstraintTask != null) {
            for (AllocatableAction e : resourceConstraintTask.getExecutions()) {
                this.addResourceConstraint(e);
            }
        }
    }

    private void registerMutex() {
        for (CommutativeGroupTask group : this.task.getCommutativeGroupList()) {
            ActionGroup.MutexGroup mGroup = group.getActions();
            this.addToMutexGroup(mGroup);
        }
    }

    private void registerStreamProducers() {
        for (AbstractTask predecessor : this.task.getStreamProducers()) {
            for (AllocatableAction e : ((Task)predecessor).getExecutions()) {
                if (e == null || !e.isPending()) continue;
                this.addStreamProducer(e);
            }
        }
    }

    private void registerDataDependencies() {
        List<AbstractTask> predecessors = this.task.getPredecessors();
        for (AbstractTask predecessor : predecessors) {
            if (!(predecessor instanceof CommutativeGroupTask)) {
                this.treatStandardPredecessor(predecessor);
                continue;
            }
            this.treatCommutativePredecessor((CommutativeGroupTask)predecessor);
        }
    }

    private void treatCommutativePredecessor(CommutativeGroupTask predecessor) {
        if (DEBUG) {
            LOGGER.debug("Task has a commutative group as a predecessor");
        }
        for (Task t : predecessor.getCommutativeTasks()) {
            for (AllocatableAction com : t.getExecutions()) {
                if (com.getDataPredecessors().contains(this)) continue;
                this.addDataPredecessor(com);
            }
        }
    }

    private void treatStandardPredecessor(AbstractTask predecessor) {
        for (AllocatableAction e : predecessor.getExecutions()) {
            if (e != null && e.isPending()) {
                this.addDataPredecessor(e);
                continue;
            }
            this.addAlreadyDoneAction(e);
        }
    }

    public final Task getTask() {
        return this.task;
    }

    @Override
    public boolean isToReserveResources() {
        return true;
    }

    @Override
    public boolean isToReleaseResources() {
        return true;
    }

    @Override
    public boolean isToStopResource() {
        return false;
    }

    @Override
    protected void doAction() {
        JOB_LOGGER.info("Ordering transfers to " + this.getAssignedResource() + " to run task: " + this.task.getId());
        this.transferErrors = 0;
        this.executionErrors = 0;
        TaskMonitor monitor = this.task.getTaskMonitor();
        monitor.onSubmission();
        this.doInputTransfers();
    }

    @Override
    public boolean checkIfCanceled(AllocatableAction aa) {
        return aa instanceof ExecutionAction && ((ExecutionAction)aa).getTask().getStatus() == TaskState.CANCELED;
    }

    private void doInputTransfers() {
        JobTransfersListener listener = new JobTransfersListener(this);
        this.transferInputData(listener);
        listener.enable();
    }

    private void transferInputData(JobTransfersListener listener) {
        TaskDescription taskDescription = this.task.getTaskDescription();
        for (Parameter p : taskDescription.getParameters()) {
            if (DEBUG) {
                JOB_LOGGER.debug("    * " + p);
            }
            if (!p.isPotentialDependency()) continue;
            DependencyParameter dp = (DependencyParameter)p;
            switch (taskDescription.getType()) {
                case HTTP: 
                case METHOD: {
                    this.transferJobData(dp, listener);
                    break;
                }
                case SERVICE: {
                    if (dp.getDirection() == Direction.INOUT) break;
                    this.transferJobData(dp, listener);
                }
            }
        }
    }

    private void transferJobData(DependencyParameter param, JobTransfersListener listener) {
        switch (param.getType()) {
            case COLLECTION_T: {
                CollectionParameter cp = (CollectionParameter)param;
                JOB_LOGGER.debug("Detected CollectionParameter " + cp);
                for (Parameter p : cp.getParameters()) {
                    if (!p.isPotentialDependency()) continue;
                    DependencyParameter dp = (DependencyParameter)p;
                    this.transferJobData(dp, listener);
                }
                this.transferSingleParameter(param, listener);
                break;
            }
            case DICT_COLLECTION_T: {
                DictCollectionParameter dcp = (DictCollectionParameter)param;
                JOB_LOGGER.debug("Detected DictCollectionParameter " + dcp);
                for (Map.Entry<Parameter, Parameter> entry : dcp.getParameters().entrySet()) {
                    Parameter v;
                    Parameter k = entry.getKey();
                    if (k.isPotentialDependency()) {
                        DependencyParameter dpKey = (DependencyParameter)k;
                        this.transferJobData(dpKey, listener);
                    }
                    if (!(v = entry.getValue()).isPotentialDependency()) continue;
                    DependencyParameter dpValue = (DependencyParameter)v;
                    this.transferJobData(dpValue, listener);
                }
                this.transferSingleParameter(param, listener);
                break;
            }
            case STREAM_T: 
            case EXTERNAL_STREAM_T: {
                this.transferStreamParameter(param, listener);
                break;
            }
            default: {
                this.transferSingleParameter(param, listener);
            }
        }
    }

    private void transferSingleParameter(DependencyParameter param, JobTransfersListener listener) {
        Worker<? extends WorkerResourceDescription> w = this.getAssignedResource().getResource();
        DataAccessId access = param.getDataAccessId();
        if (access instanceof WAccessId) {
            String dataTarget = w.getOutputDataTargetPath(((WAccessId)access).getWrittenDataInstance().getRenaming(), param);
            param.setDataTarget(dataTarget);
        } else if (access instanceof RAccessId) {
            listener.addOperation();
            LogicalData srcData = ((RAccessId)access).getReadDataInstance().getData();
            w.getData(srcData, param, listener);
        } else {
            listener.addOperation();
            LogicalData srcData = ((RWAccessId)access).getReadDataInstance().getData();
            String tgtName = ((RWAccessId)access).getWrittenDataInstance().getRenaming();
            LogicalData tmpData = Comm.registerData("tmp" + tgtName);
            w.getData(srcData, tgtName, tmpData, (Transferable)param, (EventListener)listener);
        }
    }

    private void transferStreamParameter(DependencyParameter param, JobTransfersListener listener) {
        LogicalData target;
        LogicalData source;
        DataAccessId access = param.getDataAccessId();
        if (access instanceof WAccessId) {
            WAccessId wAccess = (WAccessId)access;
            target = source = wAccess.getWrittenDataInstance().getData();
        } else if (access instanceof RAccessId) {
            RAccessId rAccess = (RAccessId)access;
            target = source = rAccess.getReadDataInstance().getData();
        } else {
            RWAccessId rwAccess = (RWAccessId)access;
            source = rwAccess.getReadDataInstance().getData();
            target = rwAccess.getWrittenDataInstance().getData();
        }
        Worker<? extends WorkerResourceDescription> w = this.getAssignedResource().getResource();
        if (DEBUG) {
            JOB_LOGGER.debug("Requesting stream transfer from " + source + " to " + target + " at " + w.getName());
        }
        listener.addOperation();
        w.getData(source, target, (Transferable)param, (EventListener)listener);
    }

    public final void failedTransfers(int failedtransfers) {
        JOB_LOGGER.debug("Received a notification for the transfers for task " + this.task.getId() + " with state FAILED");
        ++this.transferErrors;
        if (this.transferErrors < 2 && this.task.getOnFailure() == OnFailure.RETRY) {
            JOB_LOGGER.debug("Resubmitting input files for task " + this.task.getId() + " to host " + this.getAssignedResource().getName() + " since " + failedtransfers + " transfers failed.");
            this.doInputTransfers();
        } else {
            ErrorManager.warn("Transfers for running task " + this.task.getId() + " on worker " + this.getAssignedResource().getName() + " have failed.");
            this.notifyError();
            this.removeJobTempData();
        }
    }

    public final void doSubmit(int transferGroupId) {
        JOB_LOGGER.debug("Received a notification for the transfers of task " + this.task.getId() + " with state DONE");
        Job<?> job = this.submitJob(transferGroupId);
        if (!this.cancelledBeforeSubmit) {
            this.jobs.add(job.getJobId());
            JOB_LOGGER.info((this.getExecutingResources().size() > 1 ? "Rescheduled" : "New") + " Job " + job.getJobId() + " (Task: " + this.task.getId() + ")");
            JOB_LOGGER.info("  * Method name: " + this.task.getTaskDescription().getName());
            JOB_LOGGER.info("  * Target host: " + this.getAssignedResource().getName());
            this.profile.setSubmissionTime(System.currentTimeMillis());
            JobDispatcher.dispatch(job);
            JOB_LOGGER.info("Submitted Task: " + this.task.getId() + " Job: " + job.getJobId() + " Method: " + this.task.getTaskDescription().getName() + " Resource: " + this.getAssignedResource().getName());
        } else {
            JOB_LOGGER.info("Job" + job.getJobId() + " cancelled before submission.");
        }
    }

    @Override
    public void arrived(Job<?> job) {
        this.arrivedAt(job, System.currentTimeMillis());
    }

    @Override
    public void arrivedAt(Job<?> job, long ts) {
        this.profile.setArrivalTime(ts);
    }

    private void removeJobTempData() {
        TaskDescription taskDescription = this.task.getTaskDescription();
        for (Parameter p : taskDescription.getParameters()) {
            if (DEBUG) {
                JOB_LOGGER.debug("    * " + p);
            }
            if (!p.isPotentialDependency()) continue;
            DependencyParameter dp = (DependencyParameter)p;
            switch (taskDescription.getType()) {
                case HTTP: 
                case METHOD: {
                    this.removeTmpData(dp);
                    break;
                }
                case SERVICE: {
                    if (dp.getDirection() == Direction.INOUT) break;
                    this.removeTmpData(dp);
                }
            }
        }
    }

    private void removeTmpData(DependencyParameter param) {
        if (param.getType() != DataType.STREAM_T && param.getType() != DataType.EXTERNAL_STREAM_T) {
            DataAccessId access;
            if (param.getType() == DataType.COLLECTION_T) {
                CollectionParameter cp = (CollectionParameter)param;
                JOB_LOGGER.debug("Detected CollectionParameter " + cp);
                for (Parameter parameter : cp.getParameters()) {
                    if (!parameter.isPotentialDependency()) continue;
                    DependencyParameter dp = (DependencyParameter)parameter;
                    this.removeTmpData(dp);
                }
            }
            if (param.getType() == DataType.DICT_COLLECTION_T) {
                DictCollectionParameter dcp = (DictCollectionParameter)param;
                JOB_LOGGER.debug("Detected DictCollectionParameter " + dcp);
                for (Map.Entry entry : dcp.getParameters().entrySet()) {
                    Parameter v;
                    Parameter k = (Parameter)entry.getKey();
                    if (k.isPotentialDependency()) {
                        DependencyParameter dpKey = (DependencyParameter)k;
                        this.removeTmpData(dpKey);
                    }
                    if (!(v = (Parameter)entry.getValue()).isPotentialDependency()) continue;
                    DependencyParameter dpValue = (DependencyParameter)v;
                    this.removeTmpData(dpValue);
                }
            }
            if ((access = param.getDataAccessId()) instanceof RWAccessId) {
                String tgtName = "tmp" + ((RWAccessId)access).getWrittenDataInstance().getRenaming();
                Comm.removeDataKeepingValue(tgtName);
            }
        }
    }

    protected Job<?> submitJob(int transferGroupId) {
        if (DEBUG) {
            LOGGER.debug(this.toString() + " starts job creation");
        }
        Worker<? extends WorkerResourceDescription> w = this.getAssignedResource().getResource();
        ArrayList<String> slaveNames = new ArrayList<String>();
        ArrayList<Integer> predecessors = null;
        if (Tracer.isActivated() && Tracer.isTracingTaskDependencies()) {
            predecessors = Tracer.getPredecessors(this.task.getId());
        }
        Job<?> job = w.newJob(this.task.getId(), this.task.getTaskDescription(), this.getAssignedImplementation(), slaveNames, this, predecessors, this.task.getSuccessors().size());
        if (Tracer.isActivated() && Tracer.isTracingTaskDependencies()) {
            Tracer.removePredecessor(this.task.getId());
        }
        this.currentJob = job;
        job.setTransferGroupId(transferGroupId);
        job.setHistory(JobHistory.NEW);
        return job;
    }

    @Override
    public void allInputDataOnWorker(Job<?> job) {
        this.allInputDataOnWorkerAt(job, System.currentTimeMillis());
    }

    @Override
    public void allInputDataOnWorkerAt(Job<?> job, long ts) {
        this.profile.setDataFetchingTime(ts);
        TaskMonitor monitor = this.task.getTaskMonitor();
        monitor.onDataReception();
    }

    @Override
    public void startingExecution(Job<?> job) {
        this.startingExecutionAt(job, System.currentTimeMillis());
        TaskMonitor monitor = this.task.getTaskMonitor();
        monitor.onExecutionStart();
    }

    @Override
    public void startingExecutionAt(Job<?> job, long ts) {
        this.profile.setExecutionStartTime(ts);
        TaskMonitor monitor = this.task.getTaskMonitor();
        monitor.onExecutionStartAt(ts);
    }

    @Override
    public void endedExecution(Job<?> job) {
        this.endedExecutionAt(job, System.currentTimeMillis());
        TaskMonitor monitor = this.task.getTaskMonitor();
        monitor.onExecutionEnd();
    }

    @Override
    public void endedExecutionAt(Job<?> job, long ts) {
        this.profile.setExecutionEndTime(ts);
        TaskMonitor monitor = this.task.getTaskMonitor();
        monitor.onExecutionEndAt(ts);
    }

    @Override
    public void endNotified(Job<?> job) {
        this.endNotifiedAt(job, System.currentTimeMillis());
    }

    @Override
    public void endNotifiedAt(Job<?> job, long ts) {
        this.profile.setEndNotificationTime(ts);
    }

    @Override
    protected void stopAction() throws Exception {
        if (DEBUG) {
            LOGGER.debug("Task " + this.task.getId() + " starts cancelling running job");
        }
        if (this.currentJob != null) {
            this.currentJob.cancelJob();
            this.doOutputTransfers(this.currentJob);
        } else {
            this.cancelledBeforeSubmit = true;
        }
    }

    @Override
    public final void jobException(Job<?> job, COMPSsException e) {
        this.profile.end(System.currentTimeMillis());
        this.removeJobTempData();
        int jobId = job.getJobId();
        JOB_LOGGER.error("Received an exception notification for job " + jobId);
        if (this.task.getStatus() == TaskState.CANCELED) {
            ErrorManager.warn("Ingoring notification for job " + jobId + ". Task " + this.task.getId() + " already cancelled");
        } else {
            if (e instanceof COMPSsException && this.task.hasTaskGroups()) {
                for (TaskGroup t : this.task.getTaskGroupList()) {
                    t.setException(e);
                }
            }
            this.doOutputTransfers(job);
            this.notifyException(e);
        }
    }

    @Override
    public final void jobFailed(Job<?> job, JobEndStatus status) {
        this.profile.end(System.currentTimeMillis());
        this.removeJobTempData();
        if (this.task.getStatus() == TaskState.CANCELED) {
            JOB_LOGGER.debug("Ignoring notification for cancelled job " + job.getJobId());
        } else if (this.isCancelling()) {
            JOB_LOGGER.debug("Received a notification for cancelled job " + job.getJobId());
            this.doOutputTransfers(job);
            this.notifyError();
        } else {
            int jobId = job.getJobId();
            JOB_LOGGER.error("Received a notification for job " + jobId + " with state FAILED");
            JOB_LOGGER.error("Job " + job.getJobId() + ", running Task " + this.task.getId() + " on worker " + this.getAssignedResource().getName() + ", has failed.");
            ErrorManager.warn("Job " + job.getJobId() + ", running Task " + this.task.getId() + " on worker " + this.getAssignedResource().getName() + ", has failed.");
            ++this.executionErrors;
            if (this.transferErrors + this.executionErrors < 2 && this.task.getOnFailure() == OnFailure.RETRY) {
                JOB_LOGGER.error("Resubmitting job to the same worker.");
                ErrorManager.warn("Resubmitting job to the same worker.");
                job.setHistory(JobHistory.RESUBMITTED);
                this.profile.setSubmissionTime(System.currentTimeMillis());
                JobDispatcher.dispatch(job);
            } else {
                if (this.task.getOnFailure() == OnFailure.IGNORE) {
                    ErrorManager.warn("Ignoring failure.");
                    this.doOutputTransfers(job);
                }
                this.notifyError();
            }
        }
    }

    @Override
    public final void jobCompleted(Job<?> job) {
        this.profile.end(System.currentTimeMillis());
        this.removeJobTempData();
        int jobId = job.getJobId();
        JOB_LOGGER.info("Received a notification for job " + jobId + " with state OK (avg. duration: " + this.profile.getAverageExecutionTime() + ")");
        if (this.task.getStatus() == TaskState.CANCELED) {
            ErrorManager.warn("Ingoring notification for job " + jobId + ". Task " + this.task.getId() + " already cancelled");
        } else {
            this.doOutputTransfers(job);
            this.notifyCompleted();
        }
    }

    private final void doOutputTransfers(Job<?> job) {
        this.commitCommutativeAccesses(job);
        switch (job.getType()) {
            case METHOD: {
                this.doMethodOutputTransfers(job);
                break;
            }
            case HTTP: {
                this.doHttpOutputTransfers(job);
                break;
            }
            case SERVICE: {
                this.doServiceOutputTransfers(job);
            }
        }
    }

    private void commitCommutativeAccesses(Job<?> job) {
        for (Parameter p : this.task.getParameters()) {
            if (!p.isPotentialDependency()) continue;
            this.commitCommutativeAccesses((DependencyParameter)p);
        }
    }

    private void commitCommutativeAccesses(DependencyParameter dp) {
        switch (dp.getType()) {
            case COLLECTION_T: {
                CollectionParameter cp = (CollectionParameter)dp;
                for (Parameter elem : cp.getParameters()) {
                    if (!elem.isPotentialDependency()) continue;
                    this.commitCommutativeAccesses((DependencyParameter)elem);
                }
                break;
            }
            case DICT_COLLECTION_T: {
                DictCollectionParameter dcp = (DictCollectionParameter)dp;
                for (Map.Entry<Parameter, Parameter> entry : dcp.getParameters().entrySet()) {
                    Parameter v;
                    Parameter k = entry.getKey();
                    if (k.isPotentialDependency()) {
                        this.commitCommutativeAccesses((DependencyParameter)k);
                    }
                    if (!(v = entry.getValue()).isPotentialDependency()) continue;
                    this.commitCommutativeAccesses((DependencyParameter)v);
                }
                break;
            }
            default: {
                if (dp.getDirection() != Direction.COMMUTATIVE) break;
                DataAccessId placeHolder = dp.getDataAccessId();
                CommutativeGroupTask cgt = this.getTask().getCommutativeGroup(placeHolder.getDataId());
                DataAccessId performedAccess = cgt.nextAccess();
                dp.setDataAccessId(performedAccess);
            }
        }
    }

    private void doMethodOutputTransfers(Job<?> job) {
        Worker<? extends WorkerResourceDescription> w = this.getAssignedResource().getResource();
        TaskMonitor monitor = this.task.getTaskMonitor();
        List<Parameter> params = job.getTaskParams().getParameters();
        for (int i = 0; i < params.size(); ++i) {
            Parameter p = params.get(i);
            String dataName = this.getOuputRename(p);
            if (dataName == null) continue;
            DependencyParameter dp = (DependencyParameter)p;
            this.storeOutputParameter(job, w, dataName, dp);
            TaskMonitor.TaskResult mp = this.buildMonitorParameter(p, dataName);
            monitor.valueGenerated(i, mp);
        }
    }

    private TaskMonitor.TaskResult buildMonitorParameter(Parameter p, String dataName) {
        TaskMonitor.TaskResult result;
        String dataLocation = ((DependencyParameter)p).getDataTarget();
        if (p.getType() == DataType.COLLECTION_T) {
            List<Parameter> subParams = ((CollectionParameter)p).getParameters();
            TaskMonitor.TaskResult[] subResults = new TaskMonitor.TaskResult[subParams.size()];
            for (int i = 0; i < subParams.size(); ++i) {
                subResults[i] = this.buildMonitorParameter(subParams.get(i), this.getOuputRename(subParams.get(i)));
            }
            result = new TaskMonitor.CollectionTaskResult(p.getType(), dataName, dataLocation, subResults);
        } else {
            result = new TaskMonitor.TaskResult(p.getType(), dataName, dataLocation);
        }
        return result;
    }

    private String getOuputRename(Parameter p) {
        String name = null;
        if (p.isPotentialDependency()) {
            DependencyParameter dp = (DependencyParameter)p;
            DataInstanceId dId = null;
            switch (p.getDirection()) {
                case CONCURRENT: 
                case IN_DELETE: 
                case IN: {
                    return null;
                }
                case OUT: {
                    dId = ((WAccessId)dp.getDataAccessId()).getWrittenDataInstance();
                    break;
                }
                case COMMUTATIVE: {
                    dId = ((RWAccessId)dp.getDataAccessId()).getWrittenDataInstance();
                    break;
                }
                case INOUT: {
                    dId = ((RWAccessId)dp.getDataAccessId()).getWrittenDataInstance();
                    Comm.removeDataKeepingValue("tmp" + dId);
                }
            }
            name = dId.getRenaming();
        }
        return name;
    }

    private DataLocation storeOutputParameter(Job<?> job, Worker<? extends WorkerResourceDescription> w, String dataName, DependencyParameter p) {
        DependencyParameter dp = p;
        if (dp.getType() == DataType.COLLECTION_T) {
            CollectionParameter cp = (CollectionParameter)p;
            for (Parameter parameter : cp.getParameters()) {
                String elemOutRename = this.getOuputRename(parameter);
                if (elemOutRename == null) continue;
                this.storeOutputParameter(job, w, elemOutRename, (DependencyParameter)parameter);
            }
        }
        if (dp.getType() == DataType.DICT_COLLECTION_T) {
            DictCollectionParameter dcp = (DictCollectionParameter)p;
            for (Map.Entry entry : dcp.getParameters().entrySet()) {
                Parameter v;
                String elemValueOutRename;
                Parameter k = (Parameter)entry.getKey();
                String elemKeyOutRename = this.getOuputRename(k);
                if (elemKeyOutRename != null) {
                    this.storeOutputParameter(job, w, elemKeyOutRename, (DependencyParameter)k);
                }
                if ((elemValueOutRename = this.getOuputRename(v = (Parameter)entry.getValue())) == null) continue;
                this.storeOutputParameter(job, w, elemValueOutRename, (DependencyParameter)v);
            }
        }
        DataLocation outLoc = null;
        try {
            String dataTarget = dp.getDataTarget();
            if (DEBUG) {
                JOB_LOGGER.debug("Proposed URI for storing output param: " + dataTarget);
            }
            SimpleURI simpleURI = new SimpleURI(dataTarget);
            SimpleURI targetURI = new SimpleURI(simpleURI.getSchema() + simpleURI.getPath());
            outLoc = DataLocation.createLocation(w, targetURI);
            dp.setDataTarget(outLoc.getPath());
        }
        catch (Exception e) {
            ErrorManager.error("ERROR: Invalid location URI " + dp.getDataTarget(), e);
        }
        Comm.registerLocation(dataName, outLoc);
        return outLoc;
    }

    private DataLocation storeHttpOutputParameter(String dataName, DependencyParameter p) {
        DataLocation outLoc = null;
        try {
            String dataTarget = p.getDataTarget();
            if (DEBUG) {
                JOB_LOGGER.debug("Proposed URI for storing HTTP output param: " + dataTarget);
            }
            SimpleURI resultURI = new SimpleURI(dataTarget);
            SimpleURI targetURI = new SimpleURI(resultURI.getSchema() + resultURI.getPath());
            outLoc = DataLocation.createLocation(Comm.getAppHost(), targetURI);
            p.setDataTarget(outLoc.getPath());
        }
        catch (Exception e) {
            ErrorManager.error("ERROR: Invalid location URI " + p.getDataTarget(), e);
        }
        Comm.registerLocation(dataName, outLoc);
        return outLoc;
    }

    private final void doServiceOutputTransfers(Job<?> job) {
        TaskMonitor monitor = this.task.getTaskMonitor();
        List<Parameter> params = job.getTaskParams().getParameters();
        block4: for (int i = params.size() - 1; i >= 0; --i) {
            Parameter p = params.get(i);
            if (!p.isPotentialDependency()) continue;
            DataInstanceId dId = null;
            DependencyParameter dp = (DependencyParameter)p;
            switch (p.getDirection()) {
                case CONCURRENT: 
                case IN_DELETE: 
                case IN: 
                case COMMUTATIVE: 
                case INOUT: {
                    continue block4;
                }
                case OUT: {
                    dId = ((WAccessId)dp.getDataAccessId()).getWrittenDataInstance();
                }
                default: {
                    String name = dId.getRenaming();
                    Object value = job.getReturnValue();
                    LogicalData ld = Comm.registerValue(name, value);
                    Set<DataLocation> locations = ld.getLocations();
                    if (!locations.isEmpty()) {
                        TaskMonitor.TaskResult mp = this.buildMonitorParameter(p, this.getOuputRename(p));
                        for (DataLocation loc : ld.getLocations()) {
                            if (loc == null) continue;
                            monitor.valueGenerated(i, mp);
                        }
                    }
                    return;
                }
            }
        }
    }

    private void doHttpOutputTransfers(Job<?> job) {
        TaskMonitor monitor = this.task.getTaskMonitor();
        Worker<? extends WorkerResourceDescription> w = this.getAssignedResource().getResource();
        List<Parameter> params = job.getTaskParams().getParameters();
        for (int i = 0; i < params.size(); ++i) {
            LogicalData ld;
            Object value;
            Parameter p = params.get(i);
            if (!p.isPotentialDependency()) continue;
            DependencyParameter dp = (DependencyParameter)p;
            DataInstanceId dId = null;
            if (p.getDirection() == Direction.INOUT) {
                dId = ((RWAccessId)dp.getDataAccessId()).getWrittenDataInstance();
            } else {
                if (p.getDirection() != Direction.OUT) continue;
                dId = ((WAccessId)dp.getDataAccessId()).getWrittenDataInstance();
            }
            String dataName = this.getOuputRename(p);
            DataLocation dl = this.storeHttpOutputParameter(dataName, dp);
            String name = dId.getRenaming();
            JsonObject retValue = (JsonObject)job.getReturnValue();
            if (dp.getType().equals((Object)DataType.FILE_T)) {
                value = retValue.get(p.getName()).toString();
                try {
                    FileWriter file = new FileWriter(dp.getDataTarget());
                    file.write("0004");
                    file.write(value.toString());
                    file.close();
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
                ld = Comm.registerLocation(name, dl);
                TaskMonitor.TaskResult mp = this.buildMonitorParameter(p, dataName);
                monitor.valueGenerated(i, mp);
            } else {
                Gson gson = new Gson();
                JsonPrimitive primValue = retValue.getAsJsonPrimitive("$return_0");
                switch (dp.getType()) {
                    case INT_T: {
                        value = gson.fromJson((JsonElement)primValue, Integer.TYPE);
                        break;
                    }
                    case LONG_T: {
                        value = gson.fromJson((JsonElement)primValue, Long.TYPE);
                        break;
                    }
                    case STRING_T: {
                        value = gson.fromJson((JsonElement)primValue, String.class);
                        break;
                    }
                    case STRING_64_T: {
                        String temp = gson.fromJson((JsonElement)primValue, String.class);
                        byte[] encoded = Base64.getEncoder().encode(temp.getBytes());
                        value = new String(encoded);
                        break;
                    }
                    case OBJECT_T: {
                        if (dp.getContentType().equals("int")) {
                            value = gson.fromJson((JsonElement)primValue, Integer.TYPE);
                            break;
                        }
                        if (dp.getContentType().equals("long")) {
                            value = gson.fromJson((JsonElement)primValue, Long.TYPE);
                            break;
                        }
                        if (dp.getContentType().equals("String")) {
                            value = gson.fromJson((JsonElement)primValue, String.class);
                            break;
                        }
                        value = gson.fromJson((JsonElement)primValue, Object.class);
                        break;
                    }
                    default: {
                        value = null;
                    }
                }
                ld = Comm.registerValue(name, value);
            }
            Set<DataLocation> locations = ld.getLocations();
            if (locations.isEmpty()) continue;
            TaskMonitor.TaskResult mp = this.buildMonitorParameter(p, this.getOuputRename(p));
            for (DataLocation loc : ld.getLocations()) {
                if (loc == null) continue;
                monitor.valueGenerated(i, mp);
            }
        }
    }

    @Override
    protected void doCompleted() {
        this.getAssignedResource().profiledExecution(this.getAssignedImplementation(), this.profile);
        TaskMonitor monitor = this.task.getTaskMonitor();
        monitor.onSuccesfulExecution();
        this.task.decreaseExecutionCount();
        this.task.setStatus(TaskState.FINISHED);
        this.ap.notifyTaskEnd(this.task);
    }

    @Override
    protected void doError() throws FailedActionException {
        TaskMonitor monitor = this.task.getTaskMonitor();
        monitor.onErrorExecution();
        if (this.task.getOnFailure() == OnFailure.RETRY) {
            if (this.getExecutingResources().size() >= 2) {
                LOGGER.warn("Task " + this.task.getId() + " has already been rescheduled; notifying task failure.");
                ErrorManager.warn("Task " + this.task.getId() + " has already been rescheduled; notifying task failure.");
                throw new FailedActionException();
            }
        } else {
            LOGGER.warn("Notifying task " + this.task.getId() + " failure");
            ErrorManager.warn("Notifying task " + this.task.getId() + " failure");
            throw new FailedActionException();
        }
        ErrorManager.warn("Task " + this.task.getId() + " execution on worker " + this.getAssignedResource().getName() + " has failed; rescheduling task execution. (changing worker)");
        LOGGER.warn("Task " + this.task.getId() + " execution on worker " + this.getAssignedResource().getName() + " has failed; rescheduling task execution. (changing worker)");
    }

    @Override
    protected void doAbort() {
        ResourceScheduler<? extends WorkerResourceDescription> target = this.getAssignedResource();
        if (target != null) {
            this.getExecutingResources().remove(target);
        }
        TaskMonitor monitor = this.task.getTaskMonitor();
        monitor.onAbortedExecution();
    }

    @Override
    protected void doFailed() {
        String taskName = this.task.getTaskDescription().getName();
        StringBuilder sb = new StringBuilder();
        sb.append("Task '").append(taskName).append("' TOTALLY FAILED.\n");
        sb.append("Possible causes:\n");
        sb.append("     -Exception thrown by task '").append(taskName).append("'.\n");
        sb.append("     -Expected output files not generated by task '").append(taskName).append("'.\n");
        sb.append("     -Could not provide nor retrieve needed data between master and worker.\n");
        sb.append("\n");
        sb.append("Check files '").append(Comm.getAppHost().getJobsDirPath()).append("job[");
        Iterator j = this.jobs.iterator();
        while (j.hasNext()) {
            sb.append(j.next());
            if (!j.hasNext()) break;
            sb.append("|");
        }
        sb.append("'] to find out the error.\n");
        sb.append(" \n");
        ErrorManager.warn(sb.toString());
        TaskMonitor monitor = this.task.getTaskMonitor();
        monitor.onFailedExecution();
        this.task.decreaseExecutionCount();
        this.task.setStatus(TaskState.FAILED);
        this.ap.notifyTaskEnd(this.task);
    }

    @Override
    protected Collection<AllocatableAction> doException(COMPSsException e) {
        LinkedList<TaskGroup> taskGroups = this.task.getTaskGroupList();
        LinkedList<AllocatableAction> otherActionsFromGroups = new LinkedList<AllocatableAction>();
        for (TaskGroup group : taskGroups) {
            if (group.getName().equals("App" + this.task.getApplication().getId())) continue;
            group.setException(e);
            group.addToCollectionExecutionForTasksOtherThan(otherActionsFromGroups, this.getTask());
        }
        String taskName = this.task.getTaskDescription().getName();
        StringBuilder sb = new StringBuilder();
        sb.append("COMPSs Exception raised : Task " + this.task.getId() + " (").append(taskName).append(") has raised an exception with message ").append(e.getMessage()).append(". Members of the containing groups will be cancelled.\n");
        sb.append("\n");
        ErrorManager.warn(sb.toString());
        TaskMonitor monitor = this.task.getTaskMonitor();
        monitor.onException(e);
        this.task.decreaseExecutionCount();
        this.task.setStatus(TaskState.FINISHED);
        this.ap.notifyTaskEnd(this.task);
        return otherActionsFromGroups;
    }

    @Override
    protected void doCanceled() {
        String taskName = this.task.getTaskDescription().getName();
        ErrorManager.warn("Task " + this.task.getId() + "(Action: " + this.getId() + ") with name " + taskName + " has been cancelled.");
        this.task.decreaseExecutionCount();
        this.task.setStatus(TaskState.CANCELED);
        this.ap.notifyTaskEnd(this.task);
    }

    @Override
    protected void doFailIgnored() {
        String taskName = this.task.getTaskDescription().getName();
        StringBuilder sb = new StringBuilder();
        sb.append("Task failure: Task " + this.task.getId() + " (").append(taskName).append(") has failed. Successors keep running.\n");
        sb.append("\n");
        ErrorManager.warn(sb.toString());
        TaskMonitor monitor = this.task.getTaskMonitor();
        monitor.onFailedExecution();
        this.task.decreaseExecutionCount();
        this.task.setStatus(TaskState.FINISHED);
        this.ap.notifyTaskEnd(this.task);
    }

    @Override
    public final List<ResourceScheduler<? extends WorkerResourceDescription>> getCompatibleWorkers() {
        return this.getCoreElementExecutors(this.task.getTaskDescription().getCoreElement().getCoreId());
    }

    @Override
    public final Implementation[] getImplementations() {
        CoreElement ce = this.task.getTaskDescription().getCoreElement();
        List<Implementation> coreImpls = ce.getImplementations();
        int coreImplsSize = coreImpls.size();
        Implementation[] impls = new Implementation[coreImplsSize];
        for (int i = 0; i < coreImplsSize; ++i) {
            impls[i] = coreImpls.get(i);
        }
        return impls;
    }

    @Override
    public <W extends WorkerResourceDescription> boolean isCompatible(Worker<W> r) {
        return r.canRun(this.task.getTaskDescription().getCoreElement().getCoreId());
    }

    @Override
    public final <T extends WorkerResourceDescription> List<Implementation> getCompatibleImplementations(ResourceScheduler<T> r) {
        return r.getExecutableImpls(this.task.getTaskDescription().getCoreElement().getCoreId());
    }

    @Override
    public final Integer getCoreId() {
        return this.task.getTaskDescription().getCoreElement().getCoreId();
    }

    @Override
    public final int getPriority() {
        return this.task.getTaskDescription().hasPriority() ? 1 : 0;
    }

    @Override
    public long getGroupPriority() {
        return Long.MAX_VALUE;
    }

    @Override
    public OnFailure getOnFailure() {
        return this.task.getOnFailure();
    }

    @Override
    public final <T extends WorkerResourceDescription> Score schedulingScore(ResourceScheduler<T> targetWorker, Score actionScore) {
        return targetWorker.generateResourceScore(this, this.task.getTaskDescription(), actionScore);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public final void schedule(Score actionScore) throws BlockedActionException, UnassignedActionException {
        List<Object> candidates = new LinkedList();
        List<ResourceScheduler<? extends WorkerResourceDescription>> compatibleWorkers = this.getCompatibleWorkers();
        if (this.isTargetResourceEnforced()) {
            ResourceScheduler<? extends WorkerResourceDescription> target = this.getEnforcedTargetResource();
            if (!compatibleWorkers.contains(target)) throw new UnassignedActionException();
            candidates.add(target);
        } else if (this.isSchedulingConstrained()) {
            for (AllocatableAction a : this.getConstrainingPredecessors()) {
                ResourceScheduler<? extends WorkerResourceDescription> target = a.getAssignedResource();
                if (!compatibleWorkers.contains(target)) continue;
                candidates.add(target);
            }
        } else {
            candidates = compatibleWorkers;
        }
        if (candidates.isEmpty()) {
            throw new BlockedActionException();
        }
        List<ResourceScheduler<? extends WorkerResourceDescription>> prevExecutors = this.getExecutingResources();
        if (candidates.size() > prevExecutors.size() && prevExecutors.size() > 0) {
            candidates.removeAll(prevExecutors);
        }
        this.scheduleSecuredCandidates(actionScore, candidates);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public void schedule(Collection<ResourceScheduler<? extends WorkerResourceDescription>> candidates, Score actionScore) throws UnassignedActionException {
        List<ResourceScheduler<? extends WorkerResourceDescription>> compatibleWorkers = this.getCompatibleWorkers();
        LinkedList<ResourceScheduler<? extends WorkerResourceDescription>> verifiedCandidates = new LinkedList<ResourceScheduler<? extends WorkerResourceDescription>>();
        if (this.isTargetResourceEnforced()) {
            ResourceScheduler<? extends WorkerResourceDescription> target = this.getEnforcedTargetResource();
            if (!candidates.contains(target) || !compatibleWorkers.contains(target)) throw new UnassignedActionException();
            verifiedCandidates.add(target);
        } else if (this.isSchedulingConstrained()) {
            for (AllocatableAction a : this.getConstrainingPredecessors()) {
                ResourceScheduler<? extends WorkerResourceDescription> target = a.getAssignedResource();
                if (!candidates.contains(target) || !compatibleWorkers.contains(target)) continue;
                verifiedCandidates.add(target);
            }
            if (verifiedCandidates.isEmpty()) {
                throw new UnassignedActionException();
            }
        } else {
            for (ResourceScheduler<? extends WorkerResourceDescription> candidate : candidates) {
                if (!compatibleWorkers.contains(candidate) || !compatibleWorkers.contains(candidate)) continue;
                verifiedCandidates.add(candidate);
            }
            if (verifiedCandidates.isEmpty()) {
                throw new UnassignedActionException();
            }
        }
        this.scheduleSecuredCandidates(actionScore, verifiedCandidates);
    }

    @Override
    public final void schedule(ResourceScheduler<? extends WorkerResourceDescription> targetWorker, Score actionScore) throws UnassignedActionException {
        if (targetWorker == null || !this.validateWorker(targetWorker)) {
            throw new UnassignedActionException();
        }
        Implementation bestImpl = null;
        Score bestScore = null;
        Score resourceScore = targetWorker.generateResourceScore(this, this.task.getTaskDescription(), actionScore);
        if (resourceScore != null) {
            for (Implementation impl : this.getCompatibleImplementations(targetWorker)) {
                Score implScore = targetWorker.generateImplementationScore(this, this.task.getTaskDescription(), impl, resourceScore);
                if (!Score.isBetter(implScore, bestScore)) continue;
                bestImpl = impl;
                bestScore = implScore;
            }
        }
        if (bestImpl == null) {
            throw new UnassignedActionException();
        }
        this.assignWorkerAndImpl(targetWorker, bestImpl);
    }

    @Override
    public final void schedule(ResourceScheduler<? extends WorkerResourceDescription> targetWorker, Implementation impl) throws UnassignedActionException {
        if (targetWorker == null || impl == null) {
            this.assignResource(null);
            throw new UnassignedActionException();
        }
        if (DEBUG) {
            LOGGER.debug("Scheduling " + this + " on worker " + targetWorker.getName() + " with implementation " + impl.getImplementationId());
        }
        if (!this.validateWorker(targetWorker)) {
            throw new UnassignedActionException();
        }
        if (!targetWorker.getResource().canRun(impl)) {
            LOGGER.warn("Worker " + targetWorker.getName() + " is not compatible with " + impl);
            throw new UnassignedActionException();
        }
        this.assignWorkerAndImpl(targetWorker, impl);
    }

    private boolean validateWorker(ResourceScheduler targetCandidate) {
        if (this.isTargetResourceEnforced()) {
            ResourceScheduler<? extends WorkerResourceDescription> enforcedTarget = this.getEnforcedTargetResource();
            if (enforcedTarget != targetCandidate) {
                LOGGER.warn("Task " + this.getTask().getId() + " is enforced to run on " + enforcedTarget.getName());
                return false;
            }
        } else if (this.isSchedulingConstrained()) {
            boolean isPredecessor = false;
            for (AllocatableAction a : this.getConstrainingPredecessors()) {
                ResourceScheduler<? extends WorkerResourceDescription> predecessorHost = a.getAssignedResource();
                if (targetCandidate != predecessorHost) continue;
                isPredecessor = true;
            }
            if (!isPredecessor) {
                LOGGER.warn(targetCandidate.getName() + " did not host the execution of any constraining predecessor");
                return false;
            }
        }
        if (this.getExecutingResources().contains(targetCandidate) && this.getCompatibleWorkers().size() > 1) {
            LOGGER.warn("Task " + this.getTask().getId() + " was already scheduled on " + targetCandidate.getName());
            return false;
        }
        return true;
    }

    private void scheduleSecuredCandidates(Score actionScore, List<ResourceScheduler<? extends WorkerResourceDescription>> candidates) throws UnassignedActionException {
        StringBuilder debugString = new StringBuilder("Scheduling " + this + " execution:\n");
        ResourceScheduler<? extends WorkerResourceDescription> bestWorker = null;
        Implementation bestImpl = null;
        Score bestScore = null;
        for (ResourceScheduler<? extends WorkerResourceDescription> worker : candidates) {
            Score resourceScore = worker.generateResourceScore(this, this.task.getTaskDescription(), actionScore);
            if (resourceScore == null) continue;
            for (Implementation impl : this.getCompatibleImplementations(worker)) {
                Score implScore = worker.generateImplementationScore(this, this.task.getTaskDescription(), impl, resourceScore);
                if (DEBUG) {
                    debugString.append("[Task ").append(this.task.getId()).append("] Resource ").append(worker.getName()).append(" ").append(" Implementation ").append(impl.getImplementationId()).append(" ").append(" Score ").append(implScore).append("\n");
                }
                if (!Score.isBetter(implScore, bestScore)) continue;
                bestWorker = worker;
                bestImpl = impl;
                bestScore = implScore;
            }
        }
        if (DEBUG) {
            LOGGER.debug(debugString.toString());
        }
        if (bestWorker == null) {
            throw new UnassignedActionException();
        }
        this.assignWorkerAndImpl(bestWorker, bestImpl);
    }

    private void assignWorkerAndImpl(ResourceScheduler worker, Implementation impl) {
        LOGGER.info("Assigning action " + this + " to worker " + worker.getName() + " with implementation " + impl.getImplementationId());
        this.assignImplementation(impl);
        this.assignResource(worker);
        worker.scheduleAction(this);
        TaskMonitor monitor = this.task.getTaskMonitor();
        monitor.onSchedule();
    }

    @Override
    public String toString() {
        return "ExecutionAction (Task " + this.task.getId() + ", CE name " + this.task.getTaskDescription().getName() + ")";
    }
}

