/*
 * Decompiled with CFR 0.152.
 */
package es.bsc.compss.nio.master;

import es.bsc.comm.Connection;
import es.bsc.comm.exceptions.CommException;
import es.bsc.comm.nio.NIONode;
import es.bsc.comm.stage.Transfer;
import es.bsc.compss.comm.Comm;
import es.bsc.compss.comm.CommAdaptor;
import es.bsc.compss.exceptions.ConstructConfigurationException;
import es.bsc.compss.nio.NIOAgent;
import es.bsc.compss.nio.NIOMessageHandler;
import es.bsc.compss.nio.NIOTask;
import es.bsc.compss.nio.NIOTaskResult;
import es.bsc.compss.nio.NIOTracer;
import es.bsc.compss.nio.NIOURI;
import es.bsc.compss.nio.commands.NIOData;
import es.bsc.compss.nio.commands.workerFiles.CommandWorkerDebugFilesDone;
import es.bsc.compss.nio.dataRequest.DataRequest;
import es.bsc.compss.nio.dataRequest.MasterDataRequest;
import es.bsc.compss.nio.exceptions.SerializedObjectException;
import es.bsc.compss.nio.master.NIOJob;
import es.bsc.compss.nio.master.NIOWorkerNode;
import es.bsc.compss.nio.master.WorkerStarter;
import es.bsc.compss.nio.master.configuration.NIOConfiguration;
import es.bsc.compss.types.BindingObject;
import es.bsc.compss.types.annotations.parameter.DataType;
import es.bsc.compss.types.data.LogicalData;
import es.bsc.compss.types.data.listener.EventListener;
import es.bsc.compss.types.data.location.DataLocation;
import es.bsc.compss.types.data.operation.DataOperation;
import es.bsc.compss.types.data.operation.copy.Copy;
import es.bsc.compss.types.job.Job;
import es.bsc.compss.types.parameter.DependencyParameter;
import es.bsc.compss.types.project.jaxb.NIOAdaptorProperties;
import es.bsc.compss.types.resources.ExecutorShutdownListener;
import es.bsc.compss.types.resources.Resource;
import es.bsc.compss.types.resources.ShutdownListener;
import es.bsc.compss.types.resources.configuration.Configuration;
import es.bsc.compss.types.resources.jaxb.ResourcesNIOAdaptorProperties;
import es.bsc.compss.types.uri.MultiURI;
import es.bsc.compss.util.BindingDataManager;
import es.bsc.compss.util.ErrorManager;
import java.io.File;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class NIOAdaptor
extends NIOAgent
implements CommAdaptor {
    public static final int MAX_SEND = 1000;
    public static final int MAX_RECEIVE = 1000;
    public static final int MAX_SEND_WORKER = 5;
    public static final int MAX_RECEIVE_WORKER = 5;
    private static final Logger LOGGER = LogManager.getLogger("es.bsc.compss.Communication");
    private static final boolean WORKER_DEBUG = LogManager.getLogger("es.bsc.compss.Worker").isDebugEnabled();
    private static final int BASE_MASTER_PORT = 43000;
    private static final int MAX_RANDOM_VALUE = 1000;
    private static final int RANDOM_VALUE = new Random().nextInt(1000);
    private static final int MASTER_PORT_CALCULATED = 43000 + RANDOM_VALUE;
    private static final String MASTER_PORT_PROPERTY = System.getProperty("compss.masterPort");
    public static final int MASTER_PORT = MASTER_PORT_PROPERTY != null && !MASTER_PORT_PROPERTY.isEmpty() ? Integer.valueOf(MASTER_PORT_PROPERTY) : MASTER_PORT_CALCULATED;
    private static final String JOBS_DIR = System.getProperty("compss.appLogDir") + "jobs" + File.separator;
    private static final String TERM_ERR = "Error terminating";
    private static final Set<NIOWorkerNode> NODES = new HashSet<NIOWorkerNode>();
    private static final ConcurrentMap<Integer, NIOJob> RUNNING_JOBS = new ConcurrentHashMap<Integer, NIOJob>();
    private static final Map<Integer, LinkedList<Copy>> GROUP_TO_COPY = new HashMap<Integer, LinkedList<Copy>>();
    private static final Map<Connection, ClosingWorker> STOPPING_NODES = new HashMap<Connection, ClosingWorker>();
    private static final Map<Connection, ClosingExecutor> STOPPING_EXECUTORS = new HashMap<Connection, ClosingExecutor>();
    private Semaphore tracingGeneration = new Semaphore(0);
    private Semaphore workersDebugInfo = new Semaphore(0);

    public NIOAdaptor() {
        super(1000, 1000, MASTER_PORT);
        String persistentCStr = System.getProperty("compss.worker.persistent.c");
        if (persistentCStr == null || persistentCStr.isEmpty() || persistentCStr.equals("null")) {
            persistentCStr = "false";
        }
        this.setPersistent(Boolean.parseBoolean(persistentCStr));
        File file = new File(JOBS_DIR);
        if (!file.exists()) {
            file.mkdir();
        }
    }

    @Override
    public void init() {
        String errMsg;
        LOGGER.info("Initializing NIO Adaptor...");
        this.masterNode = new NIONode(null, MASTER_PORT);
        NIOMessageHandler mhm = new NIOMessageHandler(this);
        LOGGER.debug("  Initializing the TransferManager structures...");
        try {
            TM.init(NIO_EVENT_MANAGER_CLASS, null, mhm);
        }
        catch (CommException ce) {
            errMsg = "Error initializing the TransferManager";
            ErrorManager.error(errMsg, ce);
        }
        tracing = System.getProperty("compss.tracing") != null && Integer.parseInt(System.getProperty("compss.tracing")) > 0;
        tracing_level = Integer.parseInt(System.getProperty("compss.tracing"));
        LOGGER.debug("  Starting transfer server...");
        try {
            TM.startServer(this.masterNode);
        }
        catch (CommException ce) {
            errMsg = "Error starting transfer server";
            ErrorManager.error(errMsg, ce);
        }
        LOGGER.debug("  Starting TransferManager Thread");
        TM.start();
    }

    @Override
    public Configuration constructConfiguration(Object project_properties, Object resources_properties) throws ConstructConfigurationException {
        NIOConfiguration config = new NIOConfiguration(this.getClass().getName());
        NIOAdaptorProperties props_project = (NIOAdaptorProperties)project_properties;
        ResourcesNIOAdaptorProperties props_resources = (ResourcesNIOAdaptorProperties)resources_properties;
        int min_project = props_project != null ? props_project.getMinPort() : -1;
        int min_resources = -1;
        if (props_resources == null) {
            throw new ConstructConfigurationException("Resources file doesn't contain a minimum port value");
        }
        min_resources = props_resources.getMinPort();
        int max_project = props_project != null ? props_project.getMaxPort() : -1;
        int max_resources = props_resources != null ? props_resources.getMaxPort() : -1;
        int min_final = -1;
        if (min_project < 0) {
            min_final = min_resources;
        } else if (min_project < min_resources) {
            LOGGER.warn("resources.xml MinPort is more restrictive than project.xml. Loading resources.xml values");
            min_final = min_resources;
        } else {
            min_final = min_project;
        }
        int max_final = -1;
        if (max_project < 0) {
            if (max_resources < 0) {
                LOGGER.warn("MaxPort not defined in resources.xml/project.xml. Loading no limit");
            } else {
                LOGGER.warn("resources.xml MaxPort is more restrictive than project.xml. Loading resources.xml values");
                max_final = max_resources;
            }
        } else if (max_resources < 0) {
            max_final = max_project;
        } else if (max_project < max_resources) {
            max_final = max_project;
        } else {
            LOGGER.warn("resources.xml MaxPort is more restrictive than project.xml. Loading resources.xml values");
            max_final = max_resources;
        }
        LOGGER.info("NIO Min Port: " + min_final);
        LOGGER.info("NIO MAX Port: " + max_final);
        config.setMinPort(min_final);
        config.setMaxPort(max_final);
        String remoteExecutionCommand = props_resources.getRemoteExecutionCommand();
        if (remoteExecutionCommand == null || remoteExecutionCommand.isEmpty()) {
            remoteExecutionCommand = "ssh";
        }
        if (!NIOConfiguration.AVAILABLE_REMOTE_EXECUTION_COMMANDS.contains(remoteExecutionCommand)) {
            throw new ConstructConfigurationException("Invalid remote execution command on resources file");
        }
        config.setRemoteExecutionCommand(remoteExecutionCommand);
        return config;
    }

    @Override
    public NIOWorkerNode initWorker(String workerName, Configuration config) {
        LOGGER.debug("Init NIO Worker Node named " + workerName);
        NIOWorkerNode worker = new NIOWorkerNode(workerName, (NIOConfiguration)config, this);
        NODES.add(worker);
        return worker;
    }

    public void removedNode(NIOWorkerNode worker) {
        LOGGER.debug("Remove worker " + worker.getName());
        NODES.remove(worker);
    }

    @Override
    public void stop() {
        LOGGER.debug("NIO Adaptor stopping workers...");
        HashSet<NIOWorkerNode> workers = new HashSet<NIOWorkerNode>();
        workers.addAll(NODES);
        Semaphore sem = new Semaphore(0);
        ShutdownListener sl = new ShutdownListener(sem);
        for (NIOWorkerNode worker : workers) {
            LOGGER.debug("- Stopping worker " + worker.getName());
            sl.addOperation();
            worker.stop(sl);
        }
        LOGGER.debug("- Waiting for workers to shutdown...");
        sl.enable();
        try {
            sem.acquire();
        }
        catch (Exception e) {
            LOGGER.error("ERROR: Exception raised on worker shutdown");
        }
        LOGGER.debug("- Workers stopped");
        LOGGER.debug("- Shutting down TM...");
        TM.shutdown(null);
        LOGGER.debug("NIO Adaptor stop completed!");
    }

    protected static void submitTask(NIOJob job) throws Exception {
        LOGGER.debug("NIO submitting new job " + job.getJobId());
        Resource res = job.getResource();
        NIOWorkerNode worker = (NIOWorkerNode)res.getNode();
        LogicalData[] obsoletes = res.pollObsoletes();
        LinkedList<String> obsoleteRenamings = new LinkedList<String>();
        for (LogicalData ld : obsoletes) {
            obsoleteRenamings.add(worker.getWorkingDir() + File.separator + ld.getName());
        }
        RUNNING_JOBS.put(job.getJobId(), job);
        worker.submitTask(job, obsoleteRenamings);
    }

    @Override
    public void receivedNewTask(NIONode master, NIOTask t, List<String> obsoleteFiles) {
    }

    @Override
    public void setMaster(NIONode master) {
    }

    @Override
    public boolean isMyUuid(String uuid, String nodeName) {
        return false;
    }

    @Override
    public void setWorkerIsReady(String nodeName) {
        LOGGER.info("Notifying that worker is ready '" + nodeName + "'");
        WorkerStarter ws = WorkerStarter.getWorkerStarter(nodeName);
        if (ws != null) {
            ws.setWorkerIsReady();
        } else {
            LOGGER.warn("WARN: worker starter for worker " + nodeName + " is null.");
        }
    }

    @Override
    public void receivedNIOTaskDone(Connection c, NIOTaskResult tr, boolean successful) {
        int jobId = tr.getJobId();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Received Task done message for Task " + jobId);
        }
        NIOJob nj = (NIOJob)RUNNING_JOBS.remove(jobId);
        int taskId = nj.getTaskId();
        List<DataType> taskResultTypes = tr.getParamTypes();
        block3: for (int i = 0; i < taskResultTypes.size(); ++i) {
            DataType newType = taskResultTypes.get(i);
            switch (newType) {
                case PSCO_T: 
                case EXTERNAL_PSCO_T: {
                    String pscoId = (String)tr.getParamValue(i);
                    DependencyParameter dp = (DependencyParameter)nj.getTaskParams().getParameters()[i];
                    this.updateParameter(newType, pscoId, dp);
                    continue block3;
                }
            }
        }
        if (nj != null) {
            Job.JobHistory prevJobHistory = nj.getHistory();
            nj.taskFinished(successful);
            if (WORKER_DEBUG || !successful) {
                String jobOut = JOBS_DIR + "job" + jobId + "_" + (Object)((Object)prevJobHistory) + ".out";
                String jobErr = JOBS_DIR + "job" + jobId + "_" + (Object)((Object)prevJobHistory) + ".err";
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Requesting JobOut " + jobOut + " for Task " + taskId);
                    LOGGER.debug("Requesting JobErr " + jobErr + " for Task " + taskId);
                }
                c.receiveDataFile(jobOut);
                c.receiveDataFile(jobErr);
            }
        }
        c.finishConnection();
    }

    private void updateParameter(DataType newType, String pscoId, DependencyParameter dp) {
        DataType previousType = dp.getType();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Updating parameter " + dp.getDataTarget() + " from type " + (Object)((Object)previousType) + " to type " + (Object)((Object)newType) + " with id " + pscoId);
        }
        switch (previousType) {
            case PSCO_T: 
            case EXTERNAL_PSCO_T: {
                if (previousType.equals((Object)newType)) {
                    dp.setDataTarget(pscoId);
                    break;
                }
                LOGGER.warn("WARN: Cannot update parameter " + dp.getDataTarget() + " because types are not compatible");
                break;
            }
            default: {
                this.registerUpdatedParameter(newType, pscoId, dp);
            }
        }
    }

    private void registerUpdatedParameter(DataType newType, String pscoId, DependencyParameter dp) {
        String renaming = dp.getDataTarget();
        switch (newType) {
            case PSCO_T: {
                Comm.registerPSCO(renaming, pscoId);
                break;
            }
            case EXTERNAL_PSCO_T: {
                if (renaming.contains("/")) {
                    renaming = renaming.substring(renaming.lastIndexOf(47) + 1);
                }
                Comm.registerExternalPSCO(renaming, pscoId);
                break;
            }
            default: {
                LOGGER.warn("WARN: Invalid new type " + (Object)((Object)newType) + " for parameter " + renaming);
            }
        }
        dp.setType(newType);
        dp.setDataTarget(pscoId);
    }

    public void registerCopy(Copy c) {
        for (EventListener el : c.getEventListeners()) {
            Integer groupId = el.getId();
            LinkedList<Copy> copies = GROUP_TO_COPY.get(groupId);
            if (copies == null) {
                copies = new LinkedList();
                GROUP_TO_COPY.put(groupId, copies);
            }
            copies.add(c);
        }
    }

    @Override
    protected void handleDataToSendNotAvailable(Connection c, NIOData d) {
        c.finishConnection();
    }

    @Override
    public void handleRequestedDataNotAvailableError(List<DataRequest> failedRequests, String dataId) {
        for (DataRequest dr : failedRequests) {
            MasterDataRequest mdr = (MasterDataRequest)dr;
            Copy c = (Copy)mdr.getOperation();
            c.getSourceData().finishedCopy(c);
            c.end(DataOperation.OpEndState.OP_FAILED);
        }
    }

    @Override
    public void receivedValue(Transfer.Destination type, String dataId, Object object, List<DataRequest> achievedRequests) {
        for (DataRequest dr : achievedRequests) {
            MasterDataRequest mdr = (MasterDataRequest)dr;
            Copy c = (Copy)mdr.getOperation();
            DataLocation actualLocation = c.getSourceData().finishedCopy(c);
            LogicalData tgtData = c.getTargetData();
            if (tgtData != null) {
                tgtData.addLocation(actualLocation);
                if (object != null) {
                    tgtData.setValue(object);
                }
            }
            c.end(DataOperation.OpEndState.OP_OK);
        }
        if (NIOTracer.isActivated()) {
            NIOTracer.emitDataTransferEvent("0");
        }
    }

    @Override
    public void copiedData(int transferGroupId) {
        LOGGER.debug("Notifying copied Data to master");
        LinkedList<Copy> copies = GROUP_TO_COPY.remove(transferGroupId);
        if (copies == null) {
            LOGGER.debug("No copies to process");
            return;
        }
        for (Copy c : copies) {
            LOGGER.debug("Treating copy " + c.getName());
            if (!c.isRegistered()) {
                LOGGER.debug("No registered copy " + c.getName());
                continue;
            }
            DataLocation actualLocation = c.getSourceData().finishedCopy(c);
            if (actualLocation != null) {
                LOGGER.debug("Actual Location " + actualLocation.getPath());
            } else {
                LOGGER.debug("Actual Location is null");
            }
            LogicalData tgtData = c.getTargetData();
            if (tgtData != null) {
                LOGGER.debug("targetData is not null");
                switch (actualLocation.getType()) {
                    case PERSISTENT: {
                        LOGGER.debug("Persistent location no need to update location for " + tgtData.getName());
                        break;
                    }
                    case BINDING: 
                    case PRIVATE: {
                        LOGGER.debug("Adding location:" + actualLocation.getPath() + " to " + tgtData.getName());
                        tgtData.addLocation(actualLocation);
                        break;
                    }
                    case SHARED: {
                        LOGGER.debug("Shared location no need to update location for " + tgtData.getName());
                    }
                }
                LOGGER.debug("Locations for " + tgtData.getName() + " are: " + tgtData.getURIs());
                continue;
            }
            LOGGER.warn("No target Data defined for copy " + c.getName());
        }
    }

    @Override
    public List<DataOperation> getPending() {
        return new LinkedList<DataOperation>();
    }

    public boolean checkData(NIOData d) {
        boolean data = false;
        return data;
    }

    @Override
    public Object getObject(String name) throws SerializedObjectException {
        LogicalData ld = Comm.getData(name);
        Object o = ld.getValue();
        if (o == null) {
            for (MultiURI loc : ld.getURIs()) {
                if (loc.getProtocol().equals((Object)DataLocation.Protocol.OBJECT_URI) || !loc.getHost().equals(Comm.getAppHost())) continue;
                throw new SerializedObjectException(name);
            }
        }
        return o;
    }

    @Override
    public String getObjectAsFile(String name) {
        LogicalData ld = Comm.getData(name);
        for (MultiURI loc : ld.getURIs()) {
            if (loc.getProtocol().equals((Object)DataLocation.Protocol.OBJECT_URI) || !loc.getHost().equals(Comm.getAppHost())) continue;
            return loc.getPath();
        }
        return null;
    }

    @Override
    public String getWorkingDir() {
        return "";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stopSubmittedJobs() {
        ConcurrentMap<Integer, NIOJob> concurrentMap = RUNNING_JOBS;
        synchronized (concurrentMap) {
            for (Job job : RUNNING_JOBS.values()) {
                try {
                    job.stop();
                }
                catch (Exception e) {
                    LOGGER.error(TERM_ERR, (Throwable)e);
                }
            }
        }
    }

    @Override
    public void completeMasterURI(MultiURI u) {
        u.setInternalURI(ID, new NIOURI(this.masterNode, u.getPath(), u.getProtocol()));
    }

    public void requestData(Copy c, DataType paramType, NIOData d, String path) {
        MasterDataRequest dr = new MasterDataRequest(c, paramType, d, path);
        this.addTransferRequest(dr);
        this.requestTransfers();
    }

    public void shuttingDown(NIOWorkerNode worker, Connection c, ShutdownListener listener) {
        STOPPING_NODES.put(c, new ClosingWorker(worker, listener));
    }

    public void shuttingDownEM(NIOWorkerNode worker, Connection c, ExecutorShutdownListener listener) {
        STOPPING_EXECUTORS.put(c, new ClosingExecutor(listener));
    }

    @Override
    public void shutdownNotification(Connection c) {
        ClosingWorker closing = STOPPING_NODES.remove(c);
        NIOWorkerNode worker = closing.worker;
        this.removedNode(worker);
        ShutdownListener listener = closing.listener;
        listener.notifyEnd();
    }

    @Override
    public void shutdown(Connection closingConnection) {
    }

    @Override
    public void shutdownExecutionManager(Connection closingConnection) {
    }

    @Override
    public void shutdownExecutionManagerNotification(Connection c) {
        ClosingExecutor closing = STOPPING_EXECUTORS.remove(c);
        ExecutorShutdownListener listener = closing.listener;
        listener.notifyEnd();
    }

    @Override
    public void waitUntilTracingPackageGenerated() {
        try {
            this.tracingGeneration.acquire();
        }
        catch (InterruptedException ex) {
            LOGGER.error("Error waiting for package generation");
        }
    }

    @Override
    public void notifyTracingPackageGeneration() {
        this.tracingGeneration.release();
    }

    @Override
    public void waitUntilWorkersDebugInfoGenerated() {
        try {
            this.workersDebugInfo.acquire();
        }
        catch (InterruptedException ex) {
            LOGGER.error("Error waiting for package generation");
        }
    }

    @Override
    public void notifyWorkersDebugInfoGeneration() {
        this.workersDebugInfo.release();
    }

    @Override
    public void generateWorkersDebugInfo(Connection c) {
        c.sendCommand(new CommandWorkerDebugFilesDone());
        c.finishConnection();
    }

    @Override
    public void receivedBindingObjectAsFile(String filename, String target) {
        if (filename.contains("#")) {
            filename = BindingObject.generate(filename).getId();
        }
        if (target.contains("#")) {
            BindingObject bo = BindingObject.generate(target);
            BindingDataManager.loadFromFile(bo.getName(), filename, bo.getType(), bo.getElements());
        } else {
            ErrorManager.error("Incorrect target format for binding object.(" + target + ")");
        }
    }

    @Override
    protected boolean isMaster() {
        return true;
    }

    private class ClosingExecutor {
        private final ExecutorShutdownListener listener;

        public ClosingExecutor(ExecutorShutdownListener l) {
            this.listener = l;
        }
    }

    private class ClosingWorker {
        private final NIOWorkerNode worker;
        private final ShutdownListener listener;

        public ClosingWorker(NIOWorkerNode w, ShutdownListener l) {
            this.worker = w;
            this.listener = l;
        }
    }
}

