/*
 * Decompiled with CFR 0.152.
 */
package es.bsc.compss.types.job;

import es.bsc.compss.COMPSsConstants;
import es.bsc.compss.comm.Comm;
import es.bsc.compss.types.COMPSsWorker;
import es.bsc.compss.types.TaskDescription;
import es.bsc.compss.types.annotations.parameter.DataType;
import es.bsc.compss.types.annotations.parameter.OnFailure;
import es.bsc.compss.types.data.DataAccessId;
import es.bsc.compss.types.data.DataInstanceId;
import es.bsc.compss.types.data.LogicalData;
import es.bsc.compss.types.data.Transferable;
import es.bsc.compss.types.data.listener.EventListener;
import es.bsc.compss.types.data.location.DataLocation;
import es.bsc.compss.types.implementations.Implementation;
import es.bsc.compss.types.implementations.TaskType;
import es.bsc.compss.types.job.Job;
import es.bsc.compss.types.job.JobEndStatus;
import es.bsc.compss.types.job.JobHistory;
import es.bsc.compss.types.job.JobListener;
import es.bsc.compss.types.job.JobTransfersListener;
import es.bsc.compss.types.parameter.CollectiveParameter;
import es.bsc.compss.types.parameter.DependencyParameter;
import es.bsc.compss.types.parameter.Parameter;
import es.bsc.compss.types.resources.Resource;
import es.bsc.compss.types.uri.SimpleURI;
import es.bsc.compss.util.ErrorManager;
import es.bsc.compss.util.JobDispatcher;
import es.bsc.compss.worker.COMPSsException;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public abstract class JobImpl<T extends COMPSsWorker>
implements Job<T> {
    protected static final Logger LOGGER = LogManager.getLogger((String)"es.bsc.compss.Communication");
    private static final Logger JOB_LOGGER = LogManager.getLogger((String)"es.bsc.compss.Components.TaskDispatcher.JobManager");
    protected static final boolean DEBUG = LOGGER.isDebugEnabled();
    protected static final boolean JOB_DEBUG = JOB_LOGGER.isDebugEnabled();
    private static final int TRANSFER_CHANCES = 2;
    private static final int SUBMISSION_CHANCES = 2;
    protected static final int FIRST_JOB_ID = 1;
    protected static int nextJobId = 1;
    private static final String CLASSPATH_FROM_ENV = System.getProperty("compss.worker.cp") != null && !System.getProperty("compss.worker.cp").equals("") ? System.getProperty("compss.worker.cp") : "\"\"";
    private final String workerClasspath;
    private static final String PYTHONPATH_FROM_ENV = System.getProperty("compss.worker.pythonpath") != null && !System.getProperty("compss.worker.pythonpath").equals("") ? System.getProperty("compss.worker.pythonpath") : "\"\"";
    private final String workerPythonpath;
    protected int jobId = nextJobId++;
    protected final int taskId;
    protected final TaskDescription<Parameter> taskParams;
    protected final Implementation impl;
    protected final Resource worker;
    private final JobListener listener;
    protected final List<Integer> predecessors;
    protected final Integer numSuccessors;
    private boolean cancelling;
    protected JobHistory history;
    protected int transferId;
    private int transferErrors;
    private int executionErrors;

    public JobImpl(int taskId, TaskDescription task, Implementation impl, Resource res, JobListener listener, List<Integer> predecessors, Integer numSuccessors) {
        this.taskId = taskId;
        this.cancelling = false;
        this.history = JobHistory.NEW;
        this.transferErrors = 0;
        this.executionErrors = 0;
        this.taskParams = task;
        this.impl = impl;
        this.worker = res;
        this.listener = listener;
        this.predecessors = predecessors;
        this.numSuccessors = numSuccessors;
        String classpathFromFile = ((COMPSsWorker)this.getResourceNode()).getClasspath();
        this.workerClasspath = !classpathFromFile.equals("") ? (!CLASSPATH_FROM_ENV.equals("") ? CLASSPATH_FROM_ENV + ":" + classpathFromFile : classpathFromFile) : CLASSPATH_FROM_ENV;
        String pythonpathFromFile = ((COMPSsWorker)this.getResourceNode()).getPythonpath();
        this.workerPythonpath = !pythonpathFromFile.equals("") ? (!PYTHONPATH_FROM_ENV.equals("") ? PYTHONPATH_FROM_ENV + ":" + pythonpathFromFile : pythonpathFromFile) : PYTHONPATH_FROM_ENV;
    }

    public COMPSsConstants.Lang getLang() {
        return this.taskParams.getLang();
    }

    @Override
    public int getJobId() {
        return this.jobId;
    }

    @Override
    public int getTaskId() {
        return this.taskId;
    }

    public TaskDescription getTaskParams() {
        return this.taskParams;
    }

    public OnFailure getOnFailure() {
        return this.taskParams.getOnFailure();
    }

    public long getTimeOut() {
        return this.taskParams.getTimeOut();
    }

    public JobHistory getHistory() {
        return this.history;
    }

    public boolean isBeingCancelled() {
        return this.cancelling;
    }

    public Resource getResource() {
        return this.worker;
    }

    public T getResourceNode() {
        return (T)((COMPSsWorker)this.worker.getNode());
    }

    public String getClasspath() {
        return this.workerClasspath;
    }

    public String getPythonpath() {
        return this.workerPythonpath;
    }

    public Implementation getImplementation() {
        return this.impl;
    }

    public int getTransferGroupId() {
        return this.transferId;
    }

    public abstract TaskType getType();

    public List<Integer> getPredecessors() {
        return this.predecessors;
    }

    public Integer getNumSuccessors() {
        return this.numSuccessors;
    }

    public abstract String toString();

    @Override
    public void stageIn() {
        if (this.isBeingCancelled()) {
            this.cancelled();
        } else {
            JOB_LOGGER.info("Ordering transfers to " + this.worker.getName() + " to run task: " + this.taskId);
            JobTransfersListener stageInListener = new JobTransfersListener(){

                @Override
                public void stageInCompleted() {
                    if (JobImpl.this.isBeingCancelled()) {
                        JobImpl.this.cancelled();
                    } else {
                        JOB_LOGGER.debug("Received a notification for the transfers of task " + JobImpl.this.taskId + " with state DONE");
                        JobImpl.this.listener.stageInCompleted();
                    }
                }

                @Override
                public void stageInFailed(int numErrors) {
                    if (JobImpl.this.isBeingCancelled()) {
                        JobImpl.this.cancelled();
                    } else {
                        JOB_LOGGER.debug("Received a notification for the transfers for task " + JobImpl.this.taskId + " with state FAILED");
                        JobImpl.this.removeTmpData();
                        JobImpl.this.transferErrors++;
                        if (JobImpl.this.transferErrors < 2 && JobImpl.this.taskParams.getOnFailure() == OnFailure.RETRY) {
                            JOB_LOGGER.debug("Resubmitting input files for task " + JobImpl.this.taskId + " to host " + JobImpl.this.worker.getName() + " since " + numErrors + " transfers failed.");
                            JobImpl.this.stageIn();
                        } else {
                            JobImpl.this.listener.stageInFailed(numErrors);
                        }
                    }
                }
            };
            this.transferId = stageInListener.getId();
            this.transferInputData(stageInListener);
            stageInListener.enable();
        }
    }

    private void transferInputData(JobTransfersListener listener) {
        for (Parameter p : this.taskParams.getParameters()) {
            if (DEBUG) {
                JOB_LOGGER.debug("    * " + p);
            }
            if (!p.isPotentialDependency()) continue;
            DependencyParameter dp = (DependencyParameter)p;
            this.transferJobData(dp, listener);
        }
    }

    private void transferJobData(DependencyParameter param, JobTransfersListener listener) {
        switch (param.getType()) {
            case COLLECTION_T: 
            case DICT_COLLECTION_T: {
                CollectiveParameter cp = (CollectiveParameter)param;
                JOB_LOGGER.debug("Detected CollectiveParameter " + cp);
                for (Parameter p : cp.getElements()) {
                    if (!p.isPotentialDependency()) continue;
                    DependencyParameter dp = (DependencyParameter)p;
                    this.transferJobData(dp, listener);
                }
                this.transferSingleParameter(param, listener);
                break;
            }
            case STREAM_T: 
            case EXTERNAL_STREAM_T: {
                this.transferStreamParameter(param, listener);
                break;
            }
            default: {
                this.transferSingleParameter(param, listener);
            }
        }
    }

    private void transferSingleParameter(DependencyParameter param, JobTransfersListener listener) {
        DataAccessId access = param.getDataAccessId();
        if (access != null) {
            if (!access.isRead()) {
                String outRename = ((DataAccessId.WritingDataAccessId)access).getWrittenDataInstance().getRenaming();
                String dataTarget = this.worker.getNode().getOutputDataTarget(outRename, param);
                param.setDataTarget(dataTarget);
            } else {
                listener.addOperation();
                DataInstanceId dId = ((DataAccessId.ReadingDataAccessId)access).getReadDataInstance();
                if (dId == null) {
                    ErrorManager.warn((String)("Read Data Instance for Param: " + param.getName() + " Task: " + this.taskId + " Job: " + this.jobId + " Method: " + this.taskParams.getName() + " is not defined (null)"));
                    listener.notifyFailure(null, new Exception("Read Data Instance for Param: " + param.getName() + " Task: " + this.taskId + " Job: " + this.jobId + " Method: " + this.taskParams.getName() + " is not defined (null)"));
                }
                LogicalData srcData = dId.getData();
                if (access.isWrite()) {
                    String tgtName = ((DataAccessId.WritingDataAccessId)access).getWrittenDataInstance().getRenaming();
                    LogicalData tmpData = Comm.registerData("tmp" + tgtName);
                    this.worker.getData(srcData, tgtName, tmpData, (Transferable)param, (EventListener)listener);
                } else {
                    this.worker.getData(srcData, param, listener);
                }
            }
        } else {
            listener.addOperation();
            ErrorManager.warn((String)("Access for Param: " + param.getName() + " Task: " + this.taskId + " Job: " + this.jobId + " Method: " + this.taskParams.getName() + " is not defined (null)"));
            listener.notifyFailure(null, new Exception("Access for Param: " + param.getName() + " Task: " + this.taskId + " Job: " + this.jobId + " Method: " + this.taskParams.getName() + " is not defined (null)"));
        }
    }

    private void transferStreamParameter(DependencyParameter param, JobTransfersListener listener) {
        LogicalData target;
        LogicalData source;
        DataAccessId access = param.getDataAccessId();
        if (access.isRead()) {
            source = ((DataAccessId.ReadingDataAccessId)access).getReadDataInstance().getData();
            target = access.isWrite() ? ((DataAccessId.WritingDataAccessId)access).getWrittenDataInstance().getData() : source;
        } else {
            source = target = ((DataAccessId.WritingDataAccessId)access).getWrittenDataInstance().getData();
        }
        if (DEBUG) {
            JOB_LOGGER.debug("Requesting stream transfer from " + source + " to " + target + " at " + this.worker.getName());
        }
        listener.addOperation();
        this.worker.getData(source, target, (Transferable)param, (EventListener)listener);
    }

    protected void removeTmpData() {
        for (Parameter p : this.taskParams.getParameters()) {
            if (DEBUG) {
                JOB_LOGGER.debug("    * " + p);
            }
            if (!p.isPotentialDependency()) continue;
            DependencyParameter dp = (DependencyParameter)p;
            this.removeTmpData(dp);
        }
    }

    private void removeTmpData(DependencyParameter param) {
        if (param.getType() != DataType.STREAM_T && param.getType() != DataType.EXTERNAL_STREAM_T) {
            DataAccessId access;
            if (param.isCollective()) {
                CollectiveParameter cp = (CollectiveParameter)param;
                JOB_LOGGER.debug("Detected CollectiveParameter " + cp);
                for (Parameter p : cp.getElements()) {
                    if (!p.isPotentialDependency()) continue;
                    DependencyParameter dp = (DependencyParameter)p;
                    this.removeTmpData(dp);
                }
            }
            if ((access = param.getDataAccessId()).isRead() && access.isWrite()) {
                String tgtName = "tmp" + ((DataAccessId.WritingDataAccessId)access).getWrittenDataInstance().getRenaming();
                Comm.removeDataKeepingValue(tgtName);
            }
        }
    }

    @Override
    public final void submit() {
        this.listener.submitted(this);
        JobDispatcher.dispatch(this);
        JOB_LOGGER.info("Submitted Task: " + this.taskId + " Job: " + this.jobId + " Method: " + this.taskParams.getName() + " Resource: " + this.worker.getName());
    }

    public abstract void submitJob() throws Exception;

    @Override
    public void cancel() throws Exception {
        this.cancelling = true;
        this.cancelJob();
        this.registerAllJobOutputsAsExpected();
    }

    public abstract void cancelJob() throws Exception;

    public void profileArrival() {
        this.listener.arrived(this);
    }

    public void profileArrivalAt(long ts) {
        this.listener.arrivedAt(this, ts);
    }

    public void fetchedAllInputData() {
        this.listener.allInputDataOnWorker(this);
    }

    public void fetchedAllInputDataAt(long ts) {
        this.listener.allInputDataOnWorkerAt(this, ts);
    }

    public void executionStarts() {
        this.listener.startingExecution(this);
    }

    public void executionStartedAt(long ts) {
        this.listener.startingExecutionAt(this, ts);
    }

    public void executionEnds() {
        this.listener.endedExecution(this);
    }

    public void executionEndsAt(long ts) {
        this.listener.endedExecutionAt(this, ts);
    }

    public void profileEndNotification() {
        this.listener.endNotified(this);
    }

    public void profileEndNotificationAt(long ts) {
        this.listener.endNotifiedAt(this, ts);
    }

    private void cancelled() {
        this.history = JobHistory.CANCELLED;
        this.cancelling = false;
        this.listener.jobCancelled(this);
    }

    public void completed() {
        JOB_LOGGER.info("Received a notification for job " + this.jobId + " with state OK");
        if (this.isBeingCancelled()) {
            this.cancelled();
        } else {
            this.listener.jobCompleted(this);
        }
    }

    public void failed(JobEndStatus status) {
        JOB_LOGGER.error("Received a notification for job " + this.jobId + " with state FAILED");
        if (this.isBeingCancelled()) {
            this.cancelled();
        } else {
            String errMsg = "Job " + this.jobId + ", running Task " + this.taskId + " on worker " + this.worker.getName() + ", has failed.";
            JOB_LOGGER.error(errMsg);
            ErrorManager.warn((String)errMsg);
            ++this.executionErrors;
            if (this.taskParams.getOnFailure() == OnFailure.RETRY && this.transferErrors + this.executionErrors < 2) {
                String resubmitMsg = "Resubmitting job to the same worker.";
                JOB_LOGGER.error("Resubmitting job to the same worker.");
                ErrorManager.warn((String)"Resubmitting job to the same worker.");
                this.history = JobHistory.RESUBMITTED;
                this.submit();
            } else {
                switch (this.taskParams.getOnFailure()) {
                    case IGNORE: {
                        ErrorManager.warn((String)"Ignoring failure.");
                        break;
                    }
                    case CANCEL_SUCCESSORS: {
                        ErrorManager.warn((String)"Cancelling successors.");
                        break;
                    }
                }
                this.listener.jobFailed(this, status);
            }
        }
    }

    public void exception(COMPSsException exception) {
        JOB_LOGGER.error("Received an exception notification for job " + this.jobId);
        if (this.isBeingCancelled()) {
            this.cancelled();
        } else {
            this.listener.jobException(this, exception);
        }
    }

    protected void registerAllJobOutputsAsExpected() {
        List<Parameter> params = this.taskParams.getParameters();
        for (Parameter p : params) {
            this.registerJobOutputAsExpected(p);
        }
    }

    private void registerJobOutputAsExpected(Parameter p) {
        if (!p.isPotentialDependency()) {
            return;
        }
        DependencyParameter dp = (DependencyParameter)p;
        String dataName = this.getOutputRename(p);
        if (dp.isCollective()) {
            CollectiveParameter cp = (CollectiveParameter)dp;
            for (Parameter elem : cp.getElements()) {
                if (!elem.isPotentialDependency()) continue;
                this.registerJobOutputAsExpected(elem);
            }
        } else if (dataName != null) {
            this.registerResultLocation(dp.getDataTarget(), dataName, this.worker);
        }
        if (dataName != null) {
            this.notifyResultAvailability(dp, dataName);
        }
    }

    protected void notifyResultAvailability(DependencyParameter dp, String dataName) {
        this.listener.resultAvailable(dp, dataName);
        DataAccessId access = dp.getDataAccessId();
        if (access.isRead() && access.isWrite()) {
            String tgtName = "tmp" + ((DataAccessId.WritingDataAccessId)access).getWrittenDataInstance().getRenaming();
            Comm.removeDataKeepingValue(tgtName);
        }
    }

    protected String getOutputRename(Parameter p) {
        String name = null;
        if (p.isPotentialDependency()) {
            DependencyParameter dp = (DependencyParameter)p;
            DataInstanceId dId = null;
            DataAccessId daId = dp.getDataAccessId();
            if (!daId.isWrite()) {
                return null;
            }
            dId = ((DataAccessId.WritingDataAccessId)dp.getDataAccessId()).getWrittenDataInstance();
            name = dId.getRenaming();
        }
        return name;
    }

    protected DataLocation registerResultLocation(String dataLocation, String dataName, Resource res) {
        DataLocation outLoc = null;
        try {
            if (DEBUG) {
                JOB_LOGGER.debug("Proposed URI for storing output param: " + dataLocation);
            }
            SimpleURI resultURI = new SimpleURI(dataLocation);
            SimpleURI targetURI = new SimpleURI(resultURI.getSchema() + resultURI.getPath());
            outLoc = DataLocation.createLocation(res, targetURI);
            Comm.registerLocation(dataName, outLoc);
        }
        catch (Exception e) {
            ErrorManager.error((String)("ERROR: Invalid location URI " + dataLocation), (Exception)e);
        }
        return outLoc;
    }
}

