/*
 * Decompiled with CFR 0.152.
 */
package es.bsc.compss.components.impl;

import es.bsc.compss.COMPSsConstants;
import es.bsc.compss.api.TaskMonitor;
import es.bsc.compss.checkpoint.CheckpointManager;
import es.bsc.compss.components.impl.DataInfoProvider;
import es.bsc.compss.components.impl.TaskAnalyser;
import es.bsc.compss.components.impl.TaskDispatcher;
import es.bsc.compss.components.monitor.impl.GraphHandler;
import es.bsc.compss.types.AbstractTask;
import es.bsc.compss.types.Application;
import es.bsc.compss.types.ReduceTask;
import es.bsc.compss.types.Task;
import es.bsc.compss.types.annotations.parameter.OnFailure;
import es.bsc.compss.types.data.DataAccessId;
import es.bsc.compss.types.data.DataInstanceId;
import es.bsc.compss.types.data.LogicalData;
import es.bsc.compss.types.data.ResultFile;
import es.bsc.compss.types.data.access.MainAccess;
import es.bsc.compss.types.data.accessparams.AccessParams;
import es.bsc.compss.types.data.params.DataParams;
import es.bsc.compss.types.parameter.impl.Parameter;
import es.bsc.compss.types.request.ap.APRequest;
import es.bsc.compss.types.request.ap.AlreadyAccessedRequest;
import es.bsc.compss.types.request.ap.BarrierGroupRequest;
import es.bsc.compss.types.request.ap.BarrierRequest;
import es.bsc.compss.types.request.ap.CancelApplicationTasksRequest;
import es.bsc.compss.types.request.ap.CancelTaskGroupRequest;
import es.bsc.compss.types.request.ap.CloseTaskGroupRequest;
import es.bsc.compss.types.request.ap.DataGetLastVersionRequest;
import es.bsc.compss.types.request.ap.DeleteAllApplicationDataRequest;
import es.bsc.compss.types.request.ap.DeleteDataRequest;
import es.bsc.compss.types.request.ap.EndOfAppRequest;
import es.bsc.compss.types.request.ap.FinishDataAccessRequest;
import es.bsc.compss.types.request.ap.GetResultFilesRequest;
import es.bsc.compss.types.request.ap.OpenTaskGroupRequest;
import es.bsc.compss.types.request.ap.RegisterDataAccessRequest;
import es.bsc.compss.types.request.ap.RegisterRemoteDataRequest;
import es.bsc.compss.types.request.ap.ShutdownNotificationRequest;
import es.bsc.compss.types.request.ap.ShutdownRequest;
import es.bsc.compss.types.request.ap.SnapshotRequest;
import es.bsc.compss.types.request.ap.TaskAnalysisRequest;
import es.bsc.compss.types.request.ap.TaskEndNotification;
import es.bsc.compss.types.request.ap.TasksStateRequest;
import es.bsc.compss.types.request.ap.UnblockResultFilesRequest;
import es.bsc.compss.types.request.ap.WaitForDataReadyToDeleteRequest;
import es.bsc.compss.types.request.exceptions.NonExistingValueException;
import es.bsc.compss.types.request.exceptions.ShutdownException;
import es.bsc.compss.types.request.exceptions.ValueUnawareRuntimeException;
import es.bsc.compss.types.tracing.TraceEvent;
import es.bsc.compss.types.tracing.TraceEventType;
import es.bsc.compss.util.Classpath;
import es.bsc.compss.util.ErrorManager;
import es.bsc.compss.util.Tracer;
import es.bsc.compss.worker.COMPSsException;
import java.io.File;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class AccessProcessor
implements Runnable,
CheckpointManager.User {
    private static final Logger LOGGER = LogManager.getLogger("es.bsc.compss.Components.TaskProcessor");
    private static final boolean DEBUG = LOGGER.isDebugEnabled();
    private static final String CHECKPOINTER_REL_PATH = File.separator + "Runtime" + File.separator + "checkpointer";
    private static final String ERR_LOAD_CHECKPOINTER = "Error loading checkpoint manager";
    private static final String ERROR_QUEUE_OFFER = "ERROR: AccessProcessor queue offer error on ";
    private final TaskDispatcher taskDispatcher;
    private final TaskAnalyser taskAnalyser;
    private final DataInfoProvider dataInfoProvider;
    private final CheckpointManager checkpointManager;
    private static Thread processor;
    private static boolean keepGoing;
    private Semaphore shutdownSemaphore;
    protected LinkedBlockingQueue<APRequest> requestQueue;

    public AccessProcessor(TaskDispatcher td) {
        this.taskDispatcher = td;
        this.taskAnalyser = new TaskAnalyser();
        this.dataInfoProvider = new DataInfoProvider();
        AccessProcessor.loadCheckpointingPoliciesJars();
        this.checkpointManager = this.constructCheckpointManager();
        if (this.checkpointManager == null) {
            ErrorManager.fatal(ERR_LOAD_CHECKPOINTER);
        }
        this.taskAnalyser.setCoWorkers(this.dataInfoProvider, this.checkpointManager);
        this.requestQueue = new LinkedBlockingQueue();
        keepGoing = true;
        processor = new Thread(this);
        processor.setName("Access Processor");
        if (Tracer.isActivated()) {
            Tracer.enablePThreads(1);
        }
        processor.start();
    }

    public void setGM(GraphHandler gh) {
        this.taskAnalyser.setGM(gh);
    }

    @Override
    public void run() {
        if (Tracer.isActivated()) {
            Tracer.emitEvent(TraceEvent.AP_THREAD_ID);
            Tracer.disablePThreads(1);
        }
        while (keepGoing) {
            APRequest request = null;
            try {
                request = this.requestQueue.take();
                if (Tracer.isActivated()) {
                    Tracer.emitEvent(request.getEvent());
                }
                request.process(this, this.taskAnalyser, this.dataInfoProvider, this.taskDispatcher);
            }
            catch (ShutdownException se) {
                se.getSemaphore().release();
                break;
            }
            catch (Exception e) {
                ErrorManager.error("Exception", e);
            }
            finally {
                if (!Tracer.isActivated()) continue;
                Tracer.emitEventEnd(TraceEventType.RUNTIME);
            }
        }
        if (Tracer.isActivated()) {
            Tracer.emitEventEnd(TraceEvent.AP_THREAD_ID);
        }
        LOGGER.info("AccessProcessor shutdown");
    }

    public int newTask(Application app, TaskMonitor monitor, COMPSsConstants.Lang lang, String signature, boolean isPrioritary, int numNodes, boolean isReduce, int reduceChunkSize, boolean isReplicated, boolean isDistributed, boolean hasTarget, int numReturns, List<Parameter> parameters, OnFailure onFailure, long timeOut) {
        Task currentTask;
        if (isReduce) {
            if (reduceChunkSize >= 2) {
                currentTask = new ReduceTask(app, lang, signature, isPrioritary, numNodes, isReduce, reduceChunkSize, isReplicated, isDistributed, hasTarget, numReturns, parameters, monitor, onFailure, timeOut);
            } else {
                ErrorManager.warn("Requesting to create task with chunk_size smaller than 2. Executing as simple task");
                currentTask = new Task(app, lang, signature, isPrioritary, numNodes, isReduce, isReplicated, isDistributed, hasTarget, numReturns, parameters, monitor, onFailure, timeOut);
            }
        } else {
            currentTask = new Task(app, lang, signature, isPrioritary, numNodes, isReduce, isReplicated, isDistributed, hasTarget, numReturns, parameters, monitor, onFailure, timeOut);
        }
        TaskMonitor registeredMonitor = currentTask.getTaskMonitor();
        registeredMonitor.onCreation();
        LOGGER.debug("Requesting analysis of Task " + currentTask.getId());
        if (!this.requestQueue.offer(new TaskAnalysisRequest(currentTask))) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on new method task");
        }
        return currentTask.getId();
    }

    public int newTask(Application app, TaskMonitor monitor, String declareMethodFullyQualifiedName, boolean priority, boolean isReduce, int reduceChunkSize, boolean hasTarget, int numReturns, List<Parameter> parameters, OnFailure onFailure, long timeOut) {
        Task currentTask = new Task(app, declareMethodFullyQualifiedName, priority, hasTarget, numReturns, parameters, monitor, onFailure, timeOut);
        TaskMonitor registeredMonitor = currentTask.getTaskMonitor();
        registeredMonitor.onCreation();
        LOGGER.debug("Requesting analysis of new HTTP Task " + currentTask.getId());
        if (!this.requestQueue.offer(new TaskAnalysisRequest(currentTask))) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on new HTTP task");
        }
        return currentTask.getId();
    }

    public void notifyTaskEnd(AbstractTask task) {
        if (!this.requestQueue.offer(new TaskEndNotification(task))) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on notify task end");
        }
    }

    public <T> T mainAccess(MainAccess<T, ?, ?> ma) throws ValueUnawareRuntimeException {
        DataAccessId daId;
        Object ap = ma.getParameters();
        if (DEBUG) {
            LOGGER.debug("Requesting main access to " + ((AccessParams)ap).getDataDescription());
        }
        if ((daId = this.registerDataAccess((AccessParams)ap)) == null) {
            ErrorManager.warn("No version available. Returning null");
            return ma.getUnavailableValueResponse();
        }
        T oUpdated = ma.fetch(daId);
        if (ma.isAccessFinishedOnRegistration()) {
            DataInstanceId wId = null;
            if (daId.isWrite()) {
                wId = ((DataAccessId.WritingDataAccessId)daId).getWrittenDataInstance();
            }
            this.finishDataAccess((AccessParams)ap, wId);
        }
        return oUpdated;
    }

    public void finishDataAccess(AccessParams ap, DataInstanceId generatedDaId) {
        if (!this.requestQueue.offer(new FinishDataAccessRequest(ap, generatedDaId))) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on finishing data access");
        }
    }

    public LogicalData getDataLastVersion(DataParams data) {
        DataGetLastVersionRequest odr = new DataGetLastVersionRequest(data);
        if (!this.requestQueue.offer(odr)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on data version query");
        }
        return odr.getData();
    }

    public void barrierGroup(Application app, String groupName) throws COMPSsException {
        BarrierGroupRequest bgr = new BarrierGroupRequest(app, groupName);
        if (!this.requestQueue.offer(bgr)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on wait for all tasks");
        }
        bgr.waitForCompletion();
        LOGGER.info("Group barrier: End of tasks of group " + groupName);
    }

    public void barrier(Application app) {
        BarrierRequest br = new BarrierRequest(app);
        if (!this.requestQueue.offer(br)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on wait for all tasks");
        }
        try {
            br.waitForCompletion();
        }
        catch (COMPSsException cOMPSsException) {
            // empty catch block
        }
        LOGGER.info("Barrier: End of waited all tasks");
    }

    public void noMoreTasks(Application app) {
        EndOfAppRequest eoar = new EndOfAppRequest(app);
        if (!this.requestQueue.offer(eoar)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on no more tasks");
        }
        try {
            eoar.waitForCompletion();
        }
        catch (COMPSsException cOMPSsException) {
            // empty catch block
        }
        LOGGER.info("All tasks finished");
    }

    public boolean alreadyAccessed(DataParams data) {
        AlreadyAccessedRequest request = new AlreadyAccessedRequest(data);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on already accessed location");
        }
        return request.getResponse();
    }

    public void cancelApplicationTasks(Application app) {
        Long appId = app.getId();
        LOGGER.info("Cancelled all remaining tasks for application with id " + appId);
        Semaphore sem = new Semaphore(0);
        if (!this.requestQueue.offer(new CancelApplicationTasksRequest(app, sem))) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on wait for task");
        }
        LOGGER.debug("Waiting for finishing tasks cancellation " + appId);
        sem.acquireUninterruptibly();
        LOGGER.info("Tasks cancelled for application with id " + appId);
    }

    public void cancelTaskGroup(Application app, String groupName) {
        Long appId = app.getId();
        LOGGER.info("Cancel remaining tasks for application " + appId + " and group " + groupName);
        Semaphore sem = new Semaphore(0);
        if (!this.requestQueue.offer(new CancelTaskGroupRequest(app, groupName, sem))) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on wait for task");
        }
        LOGGER.debug("Waiting for cancellation of tasks in group " + groupName);
        sem.acquireUninterruptibly();
        LOGGER.info("Tasks cancelled for group " + groupName);
    }

    private DataAccessId registerDataAccess(AccessParams access) throws ValueUnawareRuntimeException {
        RegisterDataAccessRequest request = new RegisterDataAccessRequest(access);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on register data access");
        }
        request.waitForCompletion();
        DataAccessId daId = request.getAccessId();
        return daId;
    }

    public void setCurrentTaskGroup(String groupName, Application app) {
        OpenTaskGroupRequest request = new OpenTaskGroupRequest(groupName, app);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on new task group");
        }
    }

    public void closeCurrentTaskGroup(Application app) {
        CloseTaskGroupRequest request = new CloseTaskGroupRequest(app);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on closure of task group");
        }
    }

    public void unblockResultFiles(List<ResultFile> resFiles) {
        UnblockResultFilesRequest request = new UnblockResultFilesRequest(resFiles);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on unblock result files");
        }
    }

    public void shutdown() {
        this.shutdownSemaphore = new Semaphore(0);
        if (!this.requestQueue.offer(new ShutdownRequest(this.shutdownSemaphore))) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on shutdown");
        }
        this.shutdownSemaphore.acquireUninterruptibly();
    }

    public String getCurrentTaskState() {
        Semaphore sem = new Semaphore(0);
        TasksStateRequest request = new TasksStateRequest(sem);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on get current task state");
        }
        sem.acquireUninterruptibly();
        return request.getResponse();
    }

    public void deleteData(DataParams data, boolean enableReuse, boolean applicationDelete) {
        LOGGER.debug("Marking data " + data.getDescription() + " for deletion");
        boolean delete = true;
        if (enableReuse) {
            WaitForDataReadyToDeleteRequest request = new WaitForDataReadyToDeleteRequest(data);
            if (!this.requestQueue.offer(request)) {
                ErrorManager.error("ERROR: AccessProcessor queue offer error on wait for data ready to delete");
            }
            try {
                request.waitForDataReadiness();
            }
            catch (ValueUnawareRuntimeException vure) {
                try {
                    data.deleteLocal();
                    LOGGER.info("[DeleteData] Data " + data.getDescription() + " deleted.");
                }
                catch (Exception e) {
                    LOGGER.error("[DeleteData] Error on deleting " + data.getDescription(), (Throwable)e);
                }
                return;
            }
            catch (NonExistingValueException ex) {
                delete = false;
            }
        }
        LOGGER.debug("Sending delete request for " + data.getDescription());
        DeleteDataRequest req = new DeleteDataRequest(data, applicationDelete);
        if (!this.requestQueue.offer(req)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on mark for deletion");
        }
        if (enableReuse && delete) {
            try {
                data.deleteLocal();
                LOGGER.info("[DeleteData] Data " + data.getDescription() + " deleted.");
            }
            catch (Exception e) {
                LOGGER.error("[DeleteData] Error on deleting " + data.getDescription(), (Throwable)e);
            }
        }
    }

    public void getResultFiles(Application app) {
        Semaphore sem = new Semaphore(0);
        GetResultFilesRequest request = new GetResultFilesRequest(app, sem);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on get result files");
        }
        sem.acquireUninterruptibly();
        UnblockResultFilesRequest urfr = new UnblockResultFilesRequest(request.getBlockedData());
        if (!this.requestQueue.offer(urfr)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on unlock result files");
        }
    }

    public void registerRemoteData(DataParams accessedValue, String dataId) {
        RegisterRemoteDataRequest request = new RegisterRemoteDataRequest(accessedValue, dataId);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on register data");
        }
    }

    public void deleteAllApplicationDataRequest(Application app) {
        Long appId = app.getId();
        DeleteAllApplicationDataRequest request = new DeleteAllApplicationDataRequest(app);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on delete all data from application " + appId);
        } else {
            request.waitForCompletion();
        }
    }

    public void snapshot(Application app) {
        if (!this.requestQueue.offer(new SnapshotRequest(app))) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on snapshot");
        }
    }

    @Override
    public void addCheckpointRequest(APRequest apRequest, String errorMessage) {
        if (!this.requestQueue.offer(apRequest)) {
            ErrorManager.error(ERROR_QUEUE_OFFER + errorMessage);
        }
    }

    @Override
    public void allAvailableDataCheckpointed() {
        if (!this.requestQueue.offer(new ShutdownNotificationRequest(this.shutdownSemaphore))) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on shutdown");
        }
    }

    private static void loadCheckpointingPoliciesJars() {
        LOGGER.info("Loading checkpointers...");
        String compssHome = System.getenv("COMPSS_HOME");
        if (compssHome == null || compssHome.isEmpty()) {
            LOGGER.warn("WARN: COMPSS_HOME not defined, no checkpointers loaded.");
            return;
        }
        Classpath.loadJarsInPath(compssHome + CHECKPOINTER_REL_PATH, LOGGER);
    }

    private CheckpointManager constructCheckpointManager() {
        CheckpointManager checkpointer = null;
        try {
            String cpFQN;
            String parameters = System.getProperty("compss.checkpoint.params");
            HashMap<String, String> paramsMap = new HashMap<String, String>();
            if (parameters != null && !parameters.equals("")) {
                ArrayList<String> params;
                int index;
                if (DEBUG) {
                    LOGGER.debug("Reading Checkpointing policy parameters  " + parameters);
                }
                if ((index = parameters.indexOf("avoid.checkpoint")) != -1) {
                    params = new ArrayList<String>(Arrays.asList(parameters.substring(0, index).split(",")));
                    String avoidTasks = parameters.substring(index);
                    params.add(avoidTasks);
                } else {
                    params = new ArrayList<String>(Arrays.asList(parameters.split(",")));
                }
                for (String param : params) {
                    String[] values = param.split(":");
                    if (values[0].equals("period.time")) {
                        values[1] = values[1].endsWith("h") ? String.valueOf(Integer.parseInt(values[1].substring(0, values[1].length() - 1)) * 3600 * 1000) : (values[1].endsWith("m") ? String.valueOf(Integer.parseInt(values[1].substring(0, values[1].length() - 1)) * 60 * 1000) : (values[1].endsWith("s") ? String.valueOf(Integer.parseInt(values[1].substring(0, values[1].length() - 1)) * 1000) : String.valueOf(Integer.parseInt(values[1]) * 60 * 1000)));
                    }
                    paramsMap.put(values[0], values[1]);
                }
            }
            if ((cpFQN = System.getProperty("compss.checkpoint.policy")) == null || cpFQN.isEmpty()) {
                cpFQN = "es.bsc.compss.checkpoint.policies.NoCheckpoint";
            }
            Class<?> cpClass = Class.forName(cpFQN);
            Constructor<?> cpCnstr = cpClass.getConstructor(HashMap.class, AccessProcessor.class);
            checkpointer = (CheckpointManager)cpCnstr.newInstance(paramsMap, this);
            if (DEBUG) {
                LOGGER.debug("Loaded checkpointer " + checkpointer);
            }
        }
        catch (Exception e) {
            ErrorManager.fatal(ERR_LOAD_CHECKPOINTER, e);
        }
        return checkpointer;
    }
}

