/*
 * Decompiled with CFR 0.152.
 */
package es.bsc.compss.nio.master;

import es.bsc.comm.Connection;
import es.bsc.comm.Node;
import es.bsc.comm.nio.NIONode;
import es.bsc.compss.COMPSsConstants;
import es.bsc.compss.comm.Comm;
import es.bsc.compss.exceptions.InitNodeException;
import es.bsc.compss.nio.NIOTracer;
import es.bsc.compss.nio.commands.CommandCheckWorker;
import es.bsc.compss.nio.master.NIOAdaptor;
import es.bsc.compss.nio.master.NIOWorkerNode;
import es.bsc.compss.nio.master.handlers.Ender;
import es.bsc.compss.nio.master.handlers.ProcessOut;
import es.bsc.compss.types.COMPSsNode;
import es.bsc.compss.util.Tracer;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.TreeMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class WorkerStarter {
    private static final Logger LOGGER = LogManager.getLogger((String)"es.bsc.compss.Communication");
    private static final boolean DEBUG = LOGGER.isDebugEnabled();
    private static final String LIB_SEPARATOR = ":";
    private static final String CLASSPATH_FROM_ENVIRONMENT = System.getProperty("compss.worker.cp") != null && !System.getProperty("compss.worker.cp").isEmpty() ? System.getProperty("compss.worker.cp") : "";
    private static final String PYTHONPATH_FROM_ENVIRONMENT = System.getProperty("compss.worker.pythonpath") != null && !System.getProperty("compss.worker.pythonpath").isEmpty() ? System.getProperty("compss.worker.pythonpath") : "";
    private static final String LIBPATH_FROM_ENVIRONMENT = System.getenv("LD_LIBRARY_PATH") != null && !System.getenv("LD_LIBRARY_PATH").isEmpty() ? System.getenv("LD_LIBRARY_PATH") : "";
    private static final boolean IS_CPU_AFFINITY_DEFINED = System.getProperty("compss.worker.cpu_affinity") != null && !System.getProperty("compss.worker.cpu_affinity").isEmpty();
    private static final String CPU_AFFINITY = IS_CPU_AFFINITY_DEFINED ? System.getProperty("compss.worker.cpu_affinity") : "disabled";
    private static final boolean IS_GPU_AFFINITY_DEFINED = System.getProperty("compss.worker.gpu_affinity") != null && !System.getProperty("compss.worker.gpu_affinity").isEmpty();
    private static final String GPU_AFFINITY = IS_GPU_AFFINITY_DEFINED ? System.getProperty("compss.worker.gpu_affinity") : "disabled";
    private static final boolean IS_FPGA_AFFINITY_DEFINED = System.getProperty("compss.worker.gpu_affinity") != null && !System.getProperty("compss.worker.gpu_affinity").isEmpty();
    private static final String FPGA_AFFINITY = IS_FPGA_AFFINITY_DEFINED ? System.getProperty("compss.worker.gpu_affinity") : "disabled";
    private static final String WORKER_APPDIR_FROM_ENVIRONMENT = System.getProperty("compss.worker.appdir") != null && !System.getProperty("compss.worker.appdir").isEmpty() ? System.getProperty("compss.worker.appdir") : "";
    private static final String DEPLOYMENT_ID = System.getProperty("compss.uuid");
    private static final String STARTER_SCRIPT_PATH = "Runtime" + File.separator + "scripts" + File.separator + "system" + File.separator + "adaptors" + File.separator + "nio" + File.separator;
    private static final String STARTER_SCRIPT_NAME = "persistent_worker.sh";
    private static final String CLEAN_SCRIPT_PATH = "Runtime" + File.separator + "scripts" + File.separator + "system" + File.separator + "adaptors" + File.separator + "nio" + File.separator;
    private static final String CLEAN_SCRIPT_NAME = "persistent_worker_clean.sh";
    private static final long START_WORKER_INITIAL_WAIT = 100L;
    private static final long WAIT_TIME_UNIT = 500L;
    private static final long MAX_WAIT_FOR_SSH = 160000L;
    private static final long MAX_WAIT_FOR_INIT = 20000L;
    private static final String ERROR_SHUTTING_DOWN_RETRY = "ERROR: Cannot shutdown failed worker PID process";
    private static final Map<String, WorkerStarter> ADDRESS_TO_WORKER_STARTER = new TreeMap<String, WorkerStarter>();
    private boolean workerIsReady = false;
    private boolean toStop = false;
    private final NIOWorkerNode nw;

    public WorkerStarter(NIOWorkerNode nw) {
        this.nw = nw;
    }

    public static WorkerStarter getWorkerStarter(String address) {
        return ADDRESS_TO_WORKER_STARTER.get(address);
    }

    public void setWorkerIsReady() {
        LOGGER.debug("[WorkerStarter] Worker " + this.nw.getName() + " set to ready.");
        this.workerIsReady = true;
    }

    public void setToStop() {
        this.toStop = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public NIONode startWorker() throws InitNodeException {
        String name = this.nw.getName();
        String user = this.nw.getUser();
        int minPort = this.nw.getConfiguration().getMinPort();
        int maxPort = this.nw.getConfiguration().getMaxPort();
        String masterName = COMPSsNode.getMasterName();
        Map<String, WorkerStarter> map = ADDRESS_TO_WORKER_STARTER;
        synchronized (map) {
            ADDRESS_TO_WORKER_STARTER.put(name, this);
            LOGGER.debug("[WorkerStarter] Worker starter for " + name + " registers in the hashmap");
        }
        NIONode n = null;
        int pid = -1;
        for (int port = minPort; port <= maxPort && !this.toStop; ++port) {
            this.killPreviousWorker(user, name, pid);
            n = new NIONode(name, port);
            pid = this.startWorker(user, name, port, masterName);
            LOGGER.info("[WorkerStarter] Worker process started. Checking connectivity...");
            this.checkWorker(n, name);
            LOGGER.debug("[WorkerStarter] Retries for " + name + " have finished.");
            if (!this.workerIsReady) {
                continue;
            }
            try {
                Runtime.getRuntime().addShutdownHook(new Ender(this, this.nw, pid));
            }
            catch (IllegalStateException e) {
                LOGGER.warn("Tried to shutdown vm while it was already being shutdown", (Throwable)e);
            }
            return n;
        }
        if (this.toStop) {
            String msg = "[STOP]: Worker " + name + " stopped during creation because application is stopped";
            LOGGER.warn(msg);
            this.killPreviousWorker(user, name, pid);
            throw new InitNodeException(msg);
        }
        if (!this.workerIsReady) {
            String msg = "[TIMEOUT]: Could not start the NIO worker on resource " + name + " through user " + user + ".";
            LOGGER.warn(msg);
            this.killPreviousWorker(user, name, pid);
            throw new InitNodeException(msg);
        }
        String msg = "[UNKNOWN]: Could not start the NIO worker on resource " + name + " through user " + user + ".";
        LOGGER.warn(msg);
        this.killPreviousWorker(user, name, pid);
        throw new InitNodeException(msg);
    }

    private int startWorker(String user, String name, int port, String masterName) throws InitNodeException {
        try {
            Thread.sleep(100L);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        long timer = 100L;
        int pid = -1;
        String[] command = this.getStartCommand(port, masterName);
        do {
            ProcessOut po;
            if ((po = this.executeCommand(user, name, command)) == null) {
                LOGGER.debug("Worker process started in resource " + name + " by queue system.");
                pid = 0;
            } else if (po.getExitValue() == 0) {
                String output = po.getOutput();
                String[] lines = output.split("\n");
                pid = Integer.parseInt(lines[lines.length - 1]);
            } else {
                if (timer > 160000L) {
                    throw new InitNodeException("[START_CMD_ERROR]: Could not start the NIO worker in resource " + name + " through user " + user + ".\n" + "OUTPUT:" + po.getOutput() + "\n" + "ERROR:" + po.getError() + "\n");
                }
                LOGGER.warn(" Worker process failed to start in resource " + name + ". Retrying...");
            }
            try {
                Thread.sleep(2000L);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
            timer += 2000L;
        } while (pid < 0);
        return pid;
    }

    private void killPreviousWorker(String user, String name, int pid) throws InitNodeException {
        if (pid != -1) {
            String[] command = this.getStopCommand(pid);
            ProcessOut po = this.executeCommand(user, name, command);
            if (po == null) {
                LOGGER.error("[START_CMD_ERROR]: An Error has occurred when queue system started NIO worker in resource " + name + ". Retries not available in this option.");
                throw new InitNodeException("[START_CMD_ERROR]: An Error has occurred when queue system started NIO worker in resource " + name + ". Retries not available in this option.");
            }
            if (po.getExitValue() != 0) {
                LOGGER.error(ERROR_SHUTTING_DOWN_RETRY);
            }
        }
    }

    private void checkWorker(NIONode n, String name) {
        long delay = 500L;
        long totalWait = 0L;
        CommandCheckWorker cmd = new CommandCheckWorker(DEPLOYMENT_ID, name);
        do {
            if (DEBUG) {
                LOGGER.debug("[WorkerStarter] Sending check command to worker " + name);
            }
            Connection c = NIOAdaptor.getTransferManager().startConnection((Node)n);
            c.sendCommand((Object)cmd);
            c.receive();
            c.finishConnection();
            try {
                LOGGER.debug("[WorkerStarter] Waiting to send next check worker command with delay " + delay);
                Thread.sleep(delay);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
            long l = delay = delay < 3900L ? delay * 2L : 4000L;
        } while (!this.workerIsReady && (totalWait += delay) < 20000L && !this.toStop);
    }

    private String[] getStartCommand(int workerPort, String masterName) throws InitNodeException {
        String pythonMpiWorker;
        String pythonPropagateVirtualEnvironment;
        String pythonVirtualEnvironment;
        String pythonVersion;
        String pythonInterpreter;
        String workerPersistentC;
        String executionType;
        String workingDir = this.nw.getWorkingDir();
        String installDir = this.nw.getInstallDir();
        String appDir = this.nw.getAppDir();
        String workerClasspath = "";
        String classpathFromFile = this.nw.getClasspath();
        workerClasspath = !classpathFromFile.isEmpty() ? (!CLASSPATH_FROM_ENVIRONMENT.isEmpty() ? classpathFromFile + LIB_SEPARATOR + CLASSPATH_FROM_ENVIRONMENT : classpathFromFile) : CLASSPATH_FROM_ENVIRONMENT;
        String workerPythonpath = "";
        String pythonpathFromFile = this.nw.getPythonpath();
        workerPythonpath = !pythonpathFromFile.isEmpty() ? (!PYTHONPATH_FROM_ENVIRONMENT.isEmpty() ? pythonpathFromFile + LIB_SEPARATOR + PYTHONPATH_FROM_ENVIRONMENT : pythonpathFromFile) : PYTHONPATH_FROM_ENVIRONMENT;
        String workerLibPath = "";
        String libPathFromFile = this.nw.getLibPath();
        workerLibPath = !libPathFromFile.isEmpty() ? (!LIBPATH_FROM_ENVIRONMENT.isEmpty() ? libPathFromFile + LIB_SEPARATOR + LIBPATH_FROM_ENVIRONMENT : libPathFromFile) : LIBPATH_FROM_ENVIRONMENT;
        String workerJVMflags = System.getProperty("compss.worker.jvm_opts");
        String[] jvmFlags = new String[]{};
        if (workerJVMflags != null && !workerJVMflags.isEmpty()) {
            jvmFlags = workerJVMflags.split(",");
        }
        String workerFPGAargs = System.getProperty("compss.worker.fpga_reprogram");
        String[] fpgaArgs = new String[]{};
        if (workerFPGAargs != null && !workerFPGAargs.isEmpty()) {
            fpgaArgs = workerFPGAargs.split(" ");
        }
        String workerDebug = Boolean.toString(LogManager.getLogger((String)"es.bsc.compss.Worker").isDebugEnabled());
        String storageConf = System.getProperty("compss.storage.conf");
        if (storageConf == null || storageConf.equals("") || storageConf.equals("null")) {
            storageConf = "null";
        }
        if ((executionType = System.getProperty("compss.task.execution")) == null || executionType.equals("") || executionType.equals("null")) {
            executionType = COMPSsConstants.TaskExecution.COMPSS.toString();
        }
        if ((workerPersistentC = System.getProperty("compss.worker.persistent.c")) == null || workerPersistentC.isEmpty() || workerPersistentC.equals("null")) {
            workerPersistentC = "false";
        }
        if ((pythonInterpreter = System.getProperty("compss.python.interpreter")) == null || pythonInterpreter.isEmpty() || pythonInterpreter.equals("null")) {
            pythonInterpreter = "python";
        }
        if ((pythonVersion = System.getProperty("compss.python.version")) == null || pythonVersion.isEmpty() || pythonVersion.equals("null")) {
            pythonVersion = "2";
        }
        if ((pythonVirtualEnvironment = System.getProperty("compss.python.virtualenvironment")) == null || pythonVirtualEnvironment.isEmpty() || pythonVirtualEnvironment.equals("null")) {
            pythonVirtualEnvironment = "null";
        }
        if ((pythonPropagateVirtualEnvironment = System.getProperty("compss.python.propagate_virtualenvironment")) == null || pythonPropagateVirtualEnvironment.isEmpty() || pythonPropagateVirtualEnvironment.equals("null")) {
            pythonPropagateVirtualEnvironment = "true";
        }
        if ((pythonMpiWorker = System.getProperty("compss.python.mpi_worker")) == null || pythonMpiWorker.isEmpty() || pythonMpiWorker.equals("null")) {
            pythonMpiWorker = "false";
        }
        String[] cmd = new String[40 + jvmFlags.length + 1 + fpgaArgs.length];
        cmd[0] = installDir + (installDir.endsWith(File.separator) ? "" : File.separator) + STARTER_SCRIPT_PATH + STARTER_SCRIPT_NAME;
        String string = cmd[1] = workerLibPath.isEmpty() ? "null" : workerLibPath;
        if (WORKER_APPDIR_FROM_ENVIRONMENT.isEmpty() && appDir.isEmpty()) {
            LOGGER.warn("No path passed via appdir option neither xml AppDir field");
            cmd[2] = "null";
        } else if (!appDir.isEmpty()) {
            if (!WORKER_APPDIR_FROM_ENVIRONMENT.isEmpty()) {
                LOGGER.warn("Path passed via appdir option and xml AppDir field.The path provided by the xml will be used");
            }
            cmd[2] = appDir;
        } else if (!WORKER_APPDIR_FROM_ENVIRONMENT.isEmpty()) {
            cmd[2] = WORKER_APPDIR_FROM_ENVIRONMENT;
        }
        cmd[3] = workerClasspath.isEmpty() ? "null" : workerClasspath;
        cmd[4] = Comm.getStreamingBackend().name();
        cmd[5] = String.valueOf(jvmFlags.length);
        for (int i = 0; i < jvmFlags.length; ++i) {
            cmd[6 + i] = jvmFlags[i];
        }
        int nextPosition = 6 + jvmFlags.length;
        cmd[nextPosition++] = String.valueOf(fpgaArgs.length);
        for (String fpgaArg : fpgaArgs) {
            cmd[nextPosition++] = fpgaArg;
        }
        cmd[nextPosition++] = workerDebug;
        cmd[nextPosition++] = String.valueOf(5);
        cmd[nextPosition++] = String.valueOf(5);
        cmd[nextPosition++] = this.nw.getName();
        cmd[nextPosition++] = String.valueOf(workerPort);
        cmd[nextPosition++] = masterName;
        cmd[nextPosition++] = String.valueOf(NIOAdaptor.MASTER_PORT);
        cmd[nextPosition++] = String.valueOf(Comm.getStreamingPort());
        cmd[nextPosition++] = String.valueOf(this.nw.getTotalComputingUnits());
        cmd[nextPosition++] = String.valueOf(this.nw.getTotalGPUs());
        cmd[nextPosition++] = String.valueOf(this.nw.getTotalFPGAs());
        String cpuAffinity = this.nw.getConfiguration().getProperty("cpu_affinity");
        cmd[nextPosition++] = cpuAffinity != null ? String.valueOf(CPU_AFFINITY) : String.valueOf(CPU_AFFINITY);
        cmd[nextPosition++] = String.valueOf(GPU_AFFINITY);
        cmd[nextPosition++] = String.valueOf(FPGA_AFFINITY);
        cmd[nextPosition++] = String.valueOf(this.nw.getLimitOfTasks());
        cmd[nextPosition++] = DEPLOYMENT_ID;
        cmd[nextPosition++] = System.getProperty("compss.lang");
        cmd[nextPosition++] = workingDir;
        cmd[nextPosition++] = this.nw.getInstallDir();
        cmd[nextPosition++] = cmd[2];
        cmd[nextPosition++] = workerLibPath.isEmpty() ? "null" : workerLibPath;
        cmd[nextPosition++] = workerClasspath.isEmpty() ? "null" : workerClasspath;
        cmd[nextPosition++] = workerPythonpath.isEmpty() ? "null" : workerPythonpath;
        cmd[nextPosition++] = String.valueOf(NIOTracer.getLevel());
        cmd[nextPosition++] = NIOTracer.getExtraeFile();
        if (Tracer.extraeEnabled()) {
            Integer hostId = NIOTracer.registerHost((String)this.nw.getName(), (int)0);
            cmd[nextPosition++] = String.valueOf(hostId.toString());
        } else {
            cmd[nextPosition++] = "NoTracinghostID";
        }
        cmd[nextPosition++] = storageConf;
        cmd[nextPosition++] = executionType;
        cmd[nextPosition++] = workerPersistentC;
        cmd[nextPosition++] = pythonInterpreter;
        cmd[nextPosition++] = pythonVersion;
        cmd[nextPosition++] = pythonVirtualEnvironment;
        cmd[nextPosition++] = pythonPropagateVirtualEnvironment;
        cmd[nextPosition++] = pythonMpiWorker;
        if (cmd.length != nextPosition) {
            throw new InitNodeException("ERROR: Incorrect number of parameters. Expected: " + cmd.length + ". Got: " + nextPosition);
        }
        return cmd;
    }

    private String[] getCleanWorkerWorkingDir(String workingDir) {
        String[] cmd = new String[]{"rm", "-rf", workingDir};
        return cmd;
    }

    private String[] getStopCommand(int pid) {
        String[] cmd = new String[2];
        String installDir = this.nw.getInstallDir();
        cmd[0] = installDir + (installDir.endsWith(File.separator) ? "" : File.separator) + CLEAN_SCRIPT_PATH + CLEAN_SCRIPT_NAME;
        cmd[1] = String.valueOf(pid);
        return cmd;
    }

    private ProcessOut executeCommand(String user, String resource, String[] command) {
        ProcessOut processOut = new ProcessOut();
        String[] cmd = this.nw.getConfiguration().getRemoteExecutionCommand(user, resource, command);
        if (cmd == null) {
            LOGGER.warn("Worker configured to be sarted by queue system.");
            return null;
        }
        StringBuilder sb = new StringBuilder("");
        for (String param : cmd) {
            sb.append(param).append(" ");
        }
        LOGGER.debug("COMM CMD: " + sb.toString());
        try {
            String line;
            ProcessBuilder pb = new ProcessBuilder(new String[0]);
            pb.environment().remove("LD_PRELOAD");
            pb.command(cmd);
            Process process = pb.start();
            InputStream stderr = process.getErrorStream();
            InputStream stdout = process.getInputStream();
            process.getOutputStream().close();
            process.waitFor();
            processOut.setExitValue(process.exitValue());
            BufferedReader reader = new BufferedReader(new InputStreamReader(stdout));
            while ((line = reader.readLine()) != null) {
                processOut.appendOutput(line);
                LOGGER.debug("COMM CMD OUT: " + line);
            }
            reader = new BufferedReader(new InputStreamReader(stderr));
            while ((line = reader.readLine()) != null) {
                processOut.appendError(line);
                LOGGER.debug("COMM CMD ERR: " + line);
            }
        }
        catch (Exception e) {
            LOGGER.error("Exception initializing worker ", (Throwable)e);
        }
        return processOut;
    }

    public void ender(NIOWorkerNode node, int pid) {
        if (pid > 0) {
            String user = node.getUser();
            String jvmWorkerOpts = System.getProperty("compss.worker.jvm_opts");
            String removeWDFlagDisabled = "compss.worker.removeWD=false";
            if (jvmWorkerOpts != null && jvmWorkerOpts.contains(removeWDFlagDisabled)) {
                LOGGER.warn("RemoveWD set to false. Not Cleaning " + node.getName() + " working directory");
            } else {
                String sandboxWorkingDir = node.getWorkingDir();
                String[] command = this.getCleanWorkerWorkingDir(sandboxWorkingDir);
                if (command != null) {
                    this.executeCommand(user, node.getName(), command);
                }
            }
            String[] command = this.getStopCommand(pid);
            LOGGER.info("getStopCommand generated this: " + command);
            if (command != null) {
                this.executeCommand(user, node.getName(), command);
            }
        }
    }
}

