/*
 * Decompiled with CFR 0.152.
 */
package es.bsc.compss.components.impl;

import es.bsc.compss.COMPSsConstants;
import es.bsc.compss.api.TaskMonitor;
import es.bsc.compss.checkpoint.CheckpointManager;
import es.bsc.compss.comm.Comm;
import es.bsc.compss.components.impl.DataInfoProvider;
import es.bsc.compss.components.impl.TaskAnalyser;
import es.bsc.compss.components.impl.TaskDispatcher;
import es.bsc.compss.components.monitor.impl.GraphGenerator;
import es.bsc.compss.exceptions.CannotLoadException;
import es.bsc.compss.types.AbstractTask;
import es.bsc.compss.types.Application;
import es.bsc.compss.types.BindingObject;
import es.bsc.compss.types.ReduceTask;
import es.bsc.compss.types.Task;
import es.bsc.compss.types.annotations.parameter.OnFailure;
import es.bsc.compss.types.data.DataAccessId;
import es.bsc.compss.types.data.DataInstanceId;
import es.bsc.compss.types.data.LogicalData;
import es.bsc.compss.types.data.ResultFile;
import es.bsc.compss.types.data.accessid.RAccessId;
import es.bsc.compss.types.data.accessid.RWAccessId;
import es.bsc.compss.types.data.accessid.WAccessId;
import es.bsc.compss.types.data.accessparams.AccessParams;
import es.bsc.compss.types.data.accessparams.BindingObjectAccessParams;
import es.bsc.compss.types.data.accessparams.DataParams;
import es.bsc.compss.types.data.accessparams.FileAccessParams;
import es.bsc.compss.types.data.accessparams.ObjectAccessParams;
import es.bsc.compss.types.data.location.DataLocation;
import es.bsc.compss.types.data.location.ProtocolType;
import es.bsc.compss.types.parameter.Parameter;
import es.bsc.compss.types.request.ap.APRequest;
import es.bsc.compss.types.request.ap.AlreadyAccessedRequest;
import es.bsc.compss.types.request.ap.BarrierGroupRequest;
import es.bsc.compss.types.request.ap.BarrierRequest;
import es.bsc.compss.types.request.ap.CancelApplicationTasksRequest;
import es.bsc.compss.types.request.ap.CancelTaskGroupRequest;
import es.bsc.compss.types.request.ap.CloseTaskGroupRequest;
import es.bsc.compss.types.request.ap.DataGetLastVersionRequest;
import es.bsc.compss.types.request.ap.DeleteAllApplicationDataRequest;
import es.bsc.compss.types.request.ap.DeleteBindingObjectRequest;
import es.bsc.compss.types.request.ap.DeleteFileRequest;
import es.bsc.compss.types.request.ap.DeregisterObject;
import es.bsc.compss.types.request.ap.EndOfAppRequest;
import es.bsc.compss.types.request.ap.FinishDataAccessRequest;
import es.bsc.compss.types.request.ap.GetLastRenamingRequest;
import es.bsc.compss.types.request.ap.GetResultFilesRequest;
import es.bsc.compss.types.request.ap.IsObjectHereRequest;
import es.bsc.compss.types.request.ap.OpenTaskGroupRequest;
import es.bsc.compss.types.request.ap.RegisterDataAccessRequest;
import es.bsc.compss.types.request.ap.RegisterRemoteDataRequest;
import es.bsc.compss.types.request.ap.SetObjectVersionValueRequest;
import es.bsc.compss.types.request.ap.ShutdownNotificationRequest;
import es.bsc.compss.types.request.ap.ShutdownRequest;
import es.bsc.compss.types.request.ap.SnapshotRequest;
import es.bsc.compss.types.request.ap.TaskAnalysisRequest;
import es.bsc.compss.types.request.ap.TaskEndNotification;
import es.bsc.compss.types.request.ap.TasksStateRequest;
import es.bsc.compss.types.request.ap.TransferBindingObjectRequest;
import es.bsc.compss.types.request.ap.TransferObjectRequest;
import es.bsc.compss.types.request.ap.TransferOpenDirectoryRequest;
import es.bsc.compss.types.request.ap.TransferOpenFileRequest;
import es.bsc.compss.types.request.ap.TransferRawFileRequest;
import es.bsc.compss.types.request.ap.UnblockResultFilesRequest;
import es.bsc.compss.types.request.ap.WaitForDataReadyToDeleteRequest;
import es.bsc.compss.types.request.exceptions.ShutdownException;
import es.bsc.compss.types.tracing.TraceEvent;
import es.bsc.compss.types.tracing.TraceEventType;
import es.bsc.compss.types.uri.SimpleURI;
import es.bsc.compss.util.Classpath;
import es.bsc.compss.util.ErrorManager;
import es.bsc.compss.util.Tracer;
import es.bsc.compss.worker.COMPSsException;
import java.io.File;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class AccessProcessor
implements Runnable,
CheckpointManager.User {
    private static final Logger LOGGER = LogManager.getLogger("es.bsc.compss.Components.TaskProcessor");
    private static final boolean DEBUG = LOGGER.isDebugEnabled();
    private static final String CHECKPOINTER_REL_PATH = File.separator + "Runtime" + File.separator + "checkpointer";
    private static final String ERR_LOAD_CHECKPOINTER = "Error loading checkpoint manager";
    private static final String ERROR_OBJECT_LOAD_FROM_STORAGE = "ERROR: Cannot load object from storage (file or PSCO)";
    private static final String ERROR_QUEUE_OFFER = "ERROR: AccessProcessor queue offer error on ";
    private final TaskDispatcher taskDispatcher;
    private final TaskAnalyser taskAnalyser;
    private final DataInfoProvider dataInfoProvider;
    private final CheckpointManager checkpointManager;
    private static Thread processor;
    private static boolean keepGoing;
    private Semaphore shutdownSemaphore;
    protected LinkedBlockingQueue<APRequest> requestQueue;

    public AccessProcessor(TaskDispatcher td) {
        this.taskDispatcher = td;
        this.taskAnalyser = new TaskAnalyser();
        this.dataInfoProvider = new DataInfoProvider();
        AccessProcessor.loadCheckpointingPoliciesJars();
        this.checkpointManager = this.constructCheckpointManager();
        if (this.checkpointManager == null) {
            ErrorManager.fatal(ERR_LOAD_CHECKPOINTER);
        }
        this.taskAnalyser.setCoWorkers(this.dataInfoProvider, this.checkpointManager);
        this.requestQueue = new LinkedBlockingQueue();
        keepGoing = true;
        processor = new Thread(this);
        processor.setName("Access Processor");
        if (Tracer.isActivated()) {
            Tracer.enablePThreads(1);
        }
        processor.start();
    }

    public void setGM(GraphGenerator gm) {
        this.taskAnalyser.setGM(gm);
    }

    @Override
    public void run() {
        if (Tracer.isActivated()) {
            Tracer.emitEvent(TraceEvent.AP_THREAD_ID);
            Tracer.disablePThreads(1);
        }
        while (keepGoing) {
            APRequest request = null;
            try {
                request = this.requestQueue.take();
                if (Tracer.isActivated()) {
                    Tracer.emitEvent(request.getEvent());
                }
                request.process(this, this.taskAnalyser, this.dataInfoProvider, this.taskDispatcher);
            }
            catch (ShutdownException se) {
                se.getSemaphore().release();
                break;
            }
            catch (Exception e) {
                ErrorManager.error("Exception", e);
            }
            finally {
                if (!Tracer.isActivated()) continue;
                Tracer.emitEventEnd(TraceEventType.RUNTIME);
            }
        }
        if (Tracer.isActivated()) {
            Tracer.emitEventEnd(TraceEvent.AP_THREAD_ID);
        }
        LOGGER.info("AccessProcessor shutdown");
    }

    public int newTask(Application app, TaskMonitor monitor, COMPSsConstants.Lang lang, String signature, boolean isPrioritary, int numNodes, boolean isReduce, int reduceChunkSize, boolean isReplicated, boolean isDistributed, boolean hasTarget, int numReturns, List<Parameter> parameters, OnFailure onFailure, long timeOut) {
        Task currentTask;
        if (isReduce) {
            if (reduceChunkSize >= 2) {
                currentTask = new ReduceTask(app, lang, signature, isPrioritary, numNodes, isReduce, reduceChunkSize, isReplicated, isDistributed, hasTarget, numReturns, parameters, monitor, onFailure, timeOut);
            } else {
                ErrorManager.warn("Requesting to create task with chunk_size smaller than 2. Executing as simple task");
                currentTask = new Task(app, lang, signature, isPrioritary, numNodes, isReduce, isReplicated, isDistributed, hasTarget, numReturns, parameters, monitor, onFailure, timeOut);
            }
        } else {
            currentTask = new Task(app, lang, signature, isPrioritary, numNodes, isReduce, isReplicated, isDistributed, hasTarget, numReturns, parameters, monitor, onFailure, timeOut);
        }
        TaskMonitor registeredMonitor = currentTask.getTaskMonitor();
        registeredMonitor.onCreation();
        LOGGER.debug("Requesting analysis of Task " + currentTask.getId());
        if (!this.requestQueue.offer(new TaskAnalysisRequest(currentTask))) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on new method task");
        }
        return currentTask.getId();
    }

    public int newTask(Application app, TaskMonitor monitor, String declareMethodFullyQualifiedName, boolean priority, boolean isReduce, int reduceChunkSize, boolean hasTarget, int numReturns, List<Parameter> parameters, OnFailure onFailure, long timeOut) {
        Task currentTask = new Task(app, declareMethodFullyQualifiedName, priority, hasTarget, numReturns, parameters, monitor, onFailure, timeOut);
        TaskMonitor registeredMonitor = currentTask.getTaskMonitor();
        registeredMonitor.onCreation();
        LOGGER.debug("Requesting analysis of new HTTP Task " + currentTask.getId());
        if (!this.requestQueue.offer(new TaskAnalysisRequest(currentTask))) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on new HTTP task");
        }
        return currentTask.getId();
    }

    public void notifyTaskEnd(AbstractTask task) {
        if (!this.requestQueue.offer(new TaskEndNotification(task))) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on notify task end");
        }
    }

    public void finishAccessToFile(DataLocation sourceLocation, FileAccessParams fap, String destDir) {
        boolean alreadyAccessed = this.alreadyAccessed(fap.getApp(), sourceLocation);
        if (!alreadyAccessed) {
            LOGGER.debug("File not accessed before. Nothing to do");
            return;
        }
        this.finishDataAccess(fap);
    }

    private void finishDataAccess(AccessParams fap) {
        if (!this.requestQueue.offer(new FinishDataAccessRequest(fap))) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on finishing data access");
        }
    }

    public LogicalData getFileLastVersion(Application app, DataLocation sourceLocation) {
        boolean alreadyAccessed = this.alreadyAccessed(app, sourceLocation);
        if (!alreadyAccessed) {
            LOGGER.debug("File not accessed before, returning the same location");
            return null;
        }
        DataGetLastVersionRequest fvr = new DataGetLastVersionRequest(new DataParams.FileData(app, sourceLocation));
        if (!this.requestQueue.offer(fvr)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on data version query");
        }
        return fvr.getData();
    }

    public DataLocation mainAccessToFile(Application app, DataLocation sourceLocation, FileAccessParams fap, String destDir) {
        boolean alreadyAccessed = this.alreadyAccessed(app, sourceLocation);
        if (!alreadyAccessed) {
            LOGGER.debug("File not accessed before, returning the same location");
            return sourceLocation;
        }
        DataAccessId faId = this.registerDataAccess(fap, AccessParams.AccessMode.R);
        if (faId != null && !faId.isValidVersion()) {
            ErrorManager.warn("The version " + ((RAccessId)faId).getRVersionId() + " of " + sourceLocation + " has been cancelled. Trying to access the latest version");
            faId = this.registerDataAccess(fap, AccessParams.AccessMode.R);
        }
        DataLocation tgtLocation = sourceLocation;
        if (faId == null) {
            ErrorManager.warn("No version available. Returning null");
            try {
                tgtLocation = DataLocation.createLocation(Comm.getAppHost(), new SimpleURI(ProtocolType.FILE_URI.getSchema() + "null"));
            }
            catch (Exception e) {
                ErrorManager.error("ERROR: Invalid location URI", e);
            }
        } else {
            SimpleURI uri;
            String path;
            String rename;
            DataInstanceId daId;
            DataAccessId ra;
            if (fap.getMode() != AccessParams.AccessMode.W) {
                if (destDir == null) {
                    tgtLocation = this.transferFileOpen(faId);
                } else {
                    if (fap.getMode() == AccessParams.AccessMode.R) {
                        ra = (RAccessId)faId;
                        daId = ((RAccessId)ra).getReadDataInstance();
                    } else {
                        ra = (RWAccessId)faId;
                        daId = ((RWAccessId)ra).getReadDataInstance();
                    }
                    rename = daId.getRenaming();
                    path = ProtocolType.FILE_URI.getSchema() + destDir + rename;
                    try {
                        uri = new SimpleURI(path);
                        tgtLocation = DataLocation.createLocation(Comm.getAppHost(), uri);
                    }
                    catch (Exception e) {
                        ErrorManager.error("ERROR: Invalid location URI " + path, e);
                    }
                    this.transferFileRaw(faId, tgtLocation);
                }
            }
            if (fap.getMode() != AccessParams.AccessMode.R && fap.getMode() != AccessParams.AccessMode.C) {
                LOGGER.debug("File " + faId.getDataId() + " mode contains W, register new writer");
                if (fap.getMode() == AccessParams.AccessMode.RW || fap.getMode() == AccessParams.AccessMode.CV) {
                    ra = (RWAccessId)faId;
                    daId = ((RWAccessId)ra).getWrittenDataInstance();
                } else {
                    ra = (WAccessId)faId;
                    daId = ((WAccessId)ra).getWrittenDataInstance();
                }
                rename = daId.getRenaming();
                path = ProtocolType.FILE_URI.getSchema() + Comm.getAppHost().getWorkingDirectory() + rename;
                try {
                    uri = new SimpleURI(path);
                    tgtLocation = DataLocation.createLocation(Comm.getAppHost(), uri);
                }
                catch (Exception e) {
                    ErrorManager.error("ERROR: Invalid location URI " + path, e);
                }
                Comm.registerLocation(rename, tgtLocation);
            }
            if (DEBUG) {
                LOGGER.debug("File " + faId.getDataId() + " located on " + (tgtLocation != null ? tgtLocation.toString() : "null"));
            }
        }
        return tgtLocation;
    }

    public DataLocation mainAccessToDirectory(Application app, DataLocation sourceLocation, FileAccessParams fap, String destDir) {
        boolean alreadyAccessed = this.alreadyAccessed(app, sourceLocation);
        if (!alreadyAccessed) {
            LOGGER.debug("Directory not accessed before, returning the same location");
            return sourceLocation;
        }
        DataAccessId faId = this.registerDataAccess(fap, AccessParams.AccessMode.R);
        if (faId != null && !faId.isValidVersion()) {
            ErrorManager.warn("The version " + faId.getDataId() + " of " + sourceLocation + " has been cancelled. Trying to access the latest version");
            faId = this.registerDataAccess(fap, AccessParams.AccessMode.R);
        }
        DataLocation tgtLocation = sourceLocation;
        if (faId == null) {
            ErrorManager.warn("No version available. Returning null");
            try {
                tgtLocation = DataLocation.createLocation(Comm.getAppHost(), new SimpleURI(ProtocolType.FILE_URI.getSchema() + destDir + "null"));
            }
            catch (Exception e) {
                ErrorManager.error("ERROR: Invalid location URI", e);
            }
        } else {
            if (fap.getMode() != AccessParams.AccessMode.W) {
                tgtLocation = this.transferDirectoryOpen(faId);
            }
            if (fap.getMode() != AccessParams.AccessMode.R && fap.getMode() != AccessParams.AccessMode.C) {
                DataInstanceId daId;
                DataAccessId ra;
                LOGGER.debug("File " + faId.getDataId() + " mode contains W, register new writer");
                if (fap.getMode() == AccessParams.AccessMode.RW || fap.getMode() == AccessParams.AccessMode.CV) {
                    ra = (RWAccessId)faId;
                    daId = ((RWAccessId)ra).getWrittenDataInstance();
                } else {
                    ra = (WAccessId)faId;
                    daId = ((WAccessId)ra).getWrittenDataInstance();
                }
                String rename = daId.getRenaming();
                String path = ProtocolType.DIR_URI.getSchema() + Comm.getAppHost().getWorkingDirectory() + rename;
                try {
                    SimpleURI uri = new SimpleURI(path);
                    tgtLocation = DataLocation.createLocation(Comm.getAppHost(), uri);
                }
                catch (Exception e) {
                    ErrorManager.error("ERROR: Invalid location URI " + path, e);
                }
                Comm.registerLocation(rename, tgtLocation);
            }
            if (DEBUG) {
                LOGGER.debug("Directory " + faId.getDataId() + " located on " + tgtLocation.toString());
            }
        }
        return tgtLocation;
    }

    public boolean isCurrentRegisterValueValid(int hashCode) {
        LOGGER.debug("Checking if value of object with hashcode " + hashCode + " is valid");
        Semaphore sem = new Semaphore(0);
        IsObjectHereRequest request = new IsObjectHereRequest(hashCode, sem);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on valid object value");
        }
        sem.acquireUninterruptibly();
        boolean isValid = request.getResponse();
        if (DEBUG) {
            if (isValid) {
                LOGGER.debug("Value of object with hashcode " + hashCode + " is valid");
            } else {
                LOGGER.debug("Value of object with hashcode " + hashCode + " is NOT valid");
            }
        }
        return isValid;
    }

    public LogicalData getObjectLastVersion(Application app, Object obj, int hashCode) {
        DataGetLastVersionRequest odr = new DataGetLastVersionRequest(new DataParams.ObjectData(app, hashCode));
        if (!this.requestQueue.offer(odr)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on data version query");
        }
        return odr.getData();
    }

    public Object mainAccessToObject(Application app, Object obj, int hashCode) {
        ObjectAccessParams<Object> oap;
        DataAccessId oaId;
        if (DEBUG) {
            LOGGER.debug("Requesting main access to object with hash code " + hashCode);
        }
        if (!(oaId = this.registerDataAccess(oap = new ObjectAccessParams<Object>(app, AccessParams.AccessMode.RW, obj, hashCode), AccessParams.AccessMode.RW)).isValidVersion()) {
            ErrorManager.warn("The version " + oaId.getDataId() + " of " + hashCode + " has been cancelled. Trying to access the latest version");
            oaId = this.registerDataAccess(oap, AccessParams.AccessMode.RW);
        }
        DataInstanceId wId = ((RWAccessId)oaId).getWrittenDataInstance();
        String wRename = wId.getRenaming();
        if (DEBUG) {
            LOGGER.debug("Request object transfer " + oaId.getDataId() + " with renaming " + wRename);
        }
        Object oUpdated = this.obtainObject(oaId);
        if (DEBUG) {
            LOGGER.debug("Object retrieved. Set new version to: " + wRename);
        }
        this.setObjectVersionValue(wRename, oUpdated);
        this.finishDataAccess(oap);
        return oUpdated;
    }

    public String mainAccessToExternalPSCO(Application app, String id, int hashCode) {
        ObjectAccessParams<String> oap;
        DataAccessId oaId;
        if (DEBUG) {
            LOGGER.debug("Requesting main access to external object with hash code " + hashCode);
        }
        if (!(oaId = this.registerDataAccess(oap = new ObjectAccessParams<String>(app, AccessParams.AccessMode.RW, id, hashCode), AccessParams.AccessMode.RW)).isValidVersion()) {
            ErrorManager.warn("The version " + oaId.getDataId() + " of " + hashCode + " has been cancelled. Trying to access the latest version");
            oaId = this.registerDataAccess(oap, AccessParams.AccessMode.RW);
        }
        String lastRenaming = ((RWAccessId)oaId).getReadDataInstance().getRenaming();
        String newId = Comm.getData(lastRenaming).getPscoId();
        return ProtocolType.PERSISTENT_URI.getSchema() + newId;
    }

    private String obtainBindingObject(RAccessId oaId) {
        LOGGER.debug("[AccessProcessor] Obtaining binding object with id " + oaId);
        Semaphore sem = new Semaphore(0);
        TransferBindingObjectRequest tor = new TransferBindingObjectRequest(oaId, sem);
        if (!this.requestQueue.offer(tor)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on obtain object");
        }
        sem.acquireUninterruptibly();
        BindingObject bo = BindingObject.generate(tor.getTargetName());
        return bo.getName();
    }

    public String mainAccessToBindingObject(Application app, BindingObject bo, int hashCode) {
        BindingObjectAccessParams oap;
        DataAccessId oaId;
        if (DEBUG) {
            LOGGER.debug("Requesting main access to binding object with bo " + bo.toString() + " and hash code " + hashCode);
        }
        if (!(oaId = this.registerDataAccess(oap = new BindingObjectAccessParams(app, AccessParams.AccessMode.R, bo, hashCode), AccessParams.AccessMode.RW)).isValidVersion()) {
            ErrorManager.warn("The version " + oaId.getDataId() + " of " + hashCode + " has been cancelled. Trying to access the latest version");
            oaId = this.registerDataAccess(oap, AccessParams.AccessMode.RW);
        }
        String bindingObjectID = this.obtainBindingObject((RAccessId)oaId);
        this.finishDataAccess(oap);
        return bindingObjectID;
    }

    public void barrierGroup(Application app, String groupName) throws COMPSsException {
        BarrierGroupRequest bgr = new BarrierGroupRequest(app, groupName);
        if (!this.requestQueue.offer(bgr)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on wait for all tasks");
        }
        bgr.waitForCompletion();
        LOGGER.info("Group barrier: End of tasks of group " + groupName);
    }

    public void barrier(Application app) {
        BarrierRequest br = new BarrierRequest(app);
        if (!this.requestQueue.offer(br)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on wait for all tasks");
        }
        try {
            br.waitForCompletion();
        }
        catch (COMPSsException cOMPSsException) {
            // empty catch block
        }
        LOGGER.info("Barrier: End of waited all tasks");
    }

    public void noMoreTasks(Application app) {
        EndOfAppRequest eoar = new EndOfAppRequest(app);
        if (!this.requestQueue.offer(eoar)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on no more tasks");
        }
        try {
            eoar.waitForCompletion();
        }
        catch (COMPSsException cOMPSsException) {
            // empty catch block
        }
        LOGGER.info("All tasks finished");
    }

    public boolean alreadyAccessed(Application app, DataLocation loc) {
        Semaphore sem = new Semaphore(0);
        AlreadyAccessedRequest request = new AlreadyAccessedRequest(app, loc, sem);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on already accessed location");
        }
        sem.acquireUninterruptibly();
        return request.getResponse();
    }

    public void cancelApplicationTasks(Application app) {
        Long appId = app.getId();
        LOGGER.info("Cancelled all remaining tasks for application with id " + appId);
        Semaphore sem = new Semaphore(0);
        if (!this.requestQueue.offer(new CancelApplicationTasksRequest(app, sem))) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on wait for task");
        }
        LOGGER.debug("Waiting for finishing tasks cancellation " + appId);
        sem.acquireUninterruptibly();
        LOGGER.info("Tasks cancelled for application with id " + appId);
    }

    public void cancelTaskGroup(Application app, String groupName) {
        Long appId = app.getId();
        LOGGER.info("Cancel remaining tasks for application " + appId + " and group " + groupName);
        Semaphore sem = new Semaphore(0);
        if (!this.requestQueue.offer(new CancelTaskGroupRequest(app, groupName, sem))) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on wait for task");
        }
        LOGGER.debug("Waiting for cancellation of tasks in group " + groupName);
        sem.acquireUninterruptibly();
        LOGGER.info("Tasks cancelled for group " + groupName);
    }

    private DataAccessId registerDataAccess(AccessParams access, AccessParams.AccessMode taskMode) {
        RegisterDataAccessRequest request = new RegisterDataAccessRequest(access, taskMode);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on register data access");
        }
        request.waitForCompletion();
        DataAccessId daId = request.getAccessId();
        return daId;
    }

    public void setObjectVersionValue(String renaming, Object value) {
        SetObjectVersionValueRequest request = new SetObjectVersionValueRequest(renaming, value);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on new object version value");
        }
    }

    public void setCurrentTaskGroup(String groupName, boolean implicitBarrier, Application app) {
        OpenTaskGroupRequest request = new OpenTaskGroupRequest(groupName, implicitBarrier, app);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on new task group");
        }
    }

    public void closeCurrentTaskGroup(Application app) {
        CloseTaskGroupRequest request = new CloseTaskGroupRequest(app);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on closure of task group");
        }
    }

    public String getLastRenaming(int code) {
        Semaphore sem = new Semaphore(0);
        GetLastRenamingRequest request = new GetLastRenamingRequest(code, sem);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on get last renaming");
        }
        sem.acquireUninterruptibly();
        return request.getResponse();
    }

    public void unblockResultFiles(List<ResultFile> resFiles) {
        UnblockResultFilesRequest request = new UnblockResultFilesRequest(resFiles);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on unblock result files");
        }
    }

    public void shutdown() {
        this.shutdownSemaphore = new Semaphore(0);
        if (!this.requestQueue.offer(new ShutdownRequest(this.shutdownSemaphore))) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on shutdown");
        }
        this.shutdownSemaphore.acquireUninterruptibly();
    }

    public String getCurrentTaskState() {
        Semaphore sem = new Semaphore(0);
        TasksStateRequest request = new TasksStateRequest(sem);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on get current task state");
        }
        sem.acquireUninterruptibly();
        return request.getResponse();
    }

    public void markForDeletion(Application app, DataLocation loc, boolean enableReuse, boolean applicationDelete) {
        LOGGER.debug("Marking data " + loc + " for deletion");
        Semaphore sem = new Semaphore(0);
        if (enableReuse) {
            Semaphore semWait = new Semaphore(0);
            WaitForDataReadyToDeleteRequest request = new WaitForDataReadyToDeleteRequest(app, loc, sem, semWait);
            if (!this.requestQueue.offer(request)) {
                ErrorManager.error("ERROR: AccessProcessor queue offer error on wait for data ready to delete");
            }
            LOGGER.debug("Waiting for ready to delete request response...");
            sem.acquireUninterruptibly();
            int nPermits = request.getNumPermits();
            if (nPermits > 0) {
                LOGGER.debug("Waiting for " + nPermits + " tasks to finish...");
                semWait.acquireUninterruptibly(nPermits);
            }
        }
        LOGGER.debug("Sending delete request response for " + loc);
        if (!this.requestQueue.offer(new DeleteFileRequest(app, loc, sem, !enableReuse, applicationDelete))) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on mark for deletion");
        }
        if (enableReuse) {
            LOGGER.debug("Waiting for delete request response...");
            sem.acquireUninterruptibly();
            LOGGER.debug("Data " + loc + " deleted.");
        }
    }

    public void markForBindingObjectDeletion(Application app, int code) {
        if (!this.requestQueue.offer(new DeleteBindingObjectRequest(app, code))) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on mark for deletion");
        }
    }

    private void transferFileRaw(DataAccessId faId, DataLocation location) {
        RAccessId faRId = (RAccessId)faId;
        Semaphore sem = new Semaphore(0);
        TransferRawFileRequest request = new TransferRawFileRequest(faRId, location, sem);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on transfer file raw");
        }
        sem.acquireUninterruptibly();
        LOGGER.debug("Raw file transferred");
    }

    private DataLocation transferFileOpen(DataAccessId faId) {
        Semaphore sem = new Semaphore(0);
        TransferOpenFileRequest request = new TransferOpenFileRequest(faId, sem);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on transfer file open");
        }
        sem.acquireUninterruptibly();
        LOGGER.debug("Open file transferred");
        return request.getLocation();
    }

    private DataLocation transferDirectoryOpen(DataAccessId faId) {
        Semaphore sem = new Semaphore(0);
        TransferOpenDirectoryRequest req = new TransferOpenDirectoryRequest(faId, sem);
        if (!this.requestQueue.offer(req)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on transfer directory open");
        }
        sem.acquireUninterruptibly();
        LOGGER.debug("Open directory transferred");
        return req.getLocation();
    }

    private Object obtainObject(DataAccessId oaId) {
        Semaphore sem = new Semaphore(0);
        TransferObjectRequest tor = new TransferObjectRequest(oaId, sem);
        if (!this.requestQueue.offer(tor)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on obtain object");
        }
        sem.acquireUninterruptibly();
        Object oUpdated = tor.getResponse();
        if (oUpdated == null) {
            LogicalData ld = tor.getTargetData();
            try {
                ld.loadFromStorage();
                oUpdated = ld.getValue();
            }
            catch (CannotLoadException e) {
                LOGGER.fatal("ERROR: Cannot load object from storage (file or PSCO): " + (ld == null ? "null" : ld.getName()), (Throwable)e);
                ErrorManager.fatal("ERROR: Cannot load object from storage (file or PSCO): " + (ld == null ? "null" : ld.getName()), e);
            }
        }
        return oUpdated;
    }

    public void getResultFiles(Application app) {
        Semaphore sem = new Semaphore(0);
        GetResultFilesRequest request = new GetResultFilesRequest(app, sem);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on get result files");
        }
        sem.acquireUninterruptibly();
        UnblockResultFilesRequest urfr = new UnblockResultFilesRequest(request.getBlockedData());
        if (!this.requestQueue.offer(urfr)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on unlock result files");
        }
    }

    public void deregisterObject(Application app, Object o, int hashcode) {
        if (DEBUG) {
            LOGGER.debug("Deregistering object " + hashcode);
        }
        if (!this.requestQueue.offer(new DeregisterObject(app, o, hashcode))) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on deregister object");
        }
    }

    public void registerRemoteData(DataParams accessedValue, String dataId) {
        RegisterRemoteDataRequest request = new RegisterRemoteDataRequest(accessedValue, dataId);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on register data");
        }
    }

    public void deleteAllApplicationDataRequest(Application app) {
        Long appId = app.getId();
        DeleteAllApplicationDataRequest request = new DeleteAllApplicationDataRequest(app);
        if (!this.requestQueue.offer(request)) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on delete all data from application " + appId);
        } else {
            request.waitForCompletion();
        }
    }

    public void snapshot(Application app) {
        if (!this.requestQueue.offer(new SnapshotRequest(app))) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on snapshot");
        }
    }

    @Override
    public void addCheckpointRequest(APRequest apRequest, String errorMessage) {
        if (!this.requestQueue.offer(apRequest)) {
            ErrorManager.error(ERROR_QUEUE_OFFER + errorMessage);
        }
    }

    @Override
    public void allAvailableDataCheckpointed() {
        if (!this.requestQueue.offer(new ShutdownNotificationRequest(this.shutdownSemaphore))) {
            ErrorManager.error("ERROR: AccessProcessor queue offer error on shutdown");
        }
    }

    private static void loadCheckpointingPoliciesJars() {
        LOGGER.info("Loading checkpointers...");
        String compssHome = System.getenv("COMPSS_HOME");
        if (compssHome == null || compssHome.isEmpty()) {
            LOGGER.warn("WARN: COMPSS_HOME not defined, no checkpointers loaded.");
            return;
        }
        Classpath.loadJarsInPath(compssHome + CHECKPOINTER_REL_PATH, LOGGER);
    }

    private CheckpointManager constructCheckpointManager() {
        CheckpointManager checkpointer = null;
        try {
            String cpFQN;
            String parameters = System.getProperty("compss.checkpoint.params");
            HashMap<String, String> paramsMap = new HashMap<String, String>();
            if (parameters != null && !parameters.equals("")) {
                ArrayList<String> params;
                int index;
                if (DEBUG) {
                    LOGGER.debug("Reading Checkpointing policy parameters  " + parameters);
                }
                if ((index = parameters.indexOf("avoid.checkpoint")) != -1) {
                    params = new ArrayList<String>(Arrays.asList(parameters.substring(0, index).split(",")));
                    String avoidTasks = parameters.substring(index);
                    params.add(avoidTasks);
                } else {
                    params = new ArrayList<String>(Arrays.asList(parameters.split(",")));
                }
                for (String param : params) {
                    String[] values = param.split(":");
                    if (values[0].equals("period.time")) {
                        values[1] = values[1].endsWith("h") ? String.valueOf(Integer.parseInt(values[1].substring(0, values[1].length() - 1)) * 3600 * 1000) : (values[1].endsWith("m") ? String.valueOf(Integer.parseInt(values[1].substring(0, values[1].length() - 1)) * 60 * 1000) : (values[1].endsWith("s") ? String.valueOf(Integer.parseInt(values[1].substring(0, values[1].length() - 1)) * 1000) : String.valueOf(Integer.parseInt(values[1]) * 60 * 1000)));
                    }
                    paramsMap.put(values[0], values[1]);
                }
            }
            if ((cpFQN = System.getProperty("compss.checkpoint.policy")) == null || cpFQN.isEmpty()) {
                cpFQN = "es.bsc.compss.checkpoint.policies.NoCheckpoint";
            }
            Class<?> cpClass = Class.forName(cpFQN);
            Constructor<?> cpCnstr = cpClass.getConstructor(HashMap.class, AccessProcessor.class);
            checkpointer = (CheckpointManager)cpCnstr.newInstance(paramsMap, this);
            if (DEBUG) {
                LOGGER.debug("Loaded checkpointer " + checkpointer);
            }
        }
        catch (Exception e) {
            ErrorManager.fatal(ERR_LOAD_CHECKPOINTER, e);
        }
        return checkpointer;
    }
}

