/*
 * Decompiled with CFR 0.152.
 */
package es.bsc.compss.executor.external.piped;

import es.bsc.compss.executor.external.ExecutionPlatformMirror;
import es.bsc.compss.executor.external.ExternalExecutorException;
import es.bsc.compss.executor.external.commands.ExternalCommand;
import es.bsc.compss.executor.external.piped.ControlPipePair;
import es.bsc.compss.executor.external.piped.MirrorMonitor;
import es.bsc.compss.executor.external.piped.PipePair;
import es.bsc.compss.executor.external.piped.commands.AddExecutorPipeCommand;
import es.bsc.compss.executor.external.piped.commands.AddedExecutorPipeCommand;
import es.bsc.compss.executor.external.piped.commands.ChannelCreatedPipeCommand;
import es.bsc.compss.executor.external.piped.commands.CreateChannelPipeCommand;
import es.bsc.compss.executor.external.piped.commands.ExecutorPIDQueryPipeCommand;
import es.bsc.compss.executor.external.piped.commands.ExecutorPIDReplyPipeCommand;
import es.bsc.compss.executor.external.piped.commands.PingPipeCommand;
import es.bsc.compss.executor.external.piped.commands.PipeCommand;
import es.bsc.compss.executor.external.piped.commands.PongPipeCommand;
import es.bsc.compss.executor.external.piped.commands.QuitPipeCommand;
import es.bsc.compss.executor.external.piped.commands.RemoveExecutorPipeCommand;
import es.bsc.compss.executor.external.piped.commands.RemovedExecutorPipeCommand;
import es.bsc.compss.executor.external.piped.commands.StartWorkerPipeCommand;
import es.bsc.compss.executor.external.piped.commands.WorkerStartedPipeCommand;
import es.bsc.compss.executor.external.piped.exceptions.ClosedPipeException;
import es.bsc.compss.types.execution.InvocationContext;
import es.bsc.compss.util.ErrorManager;
import es.bsc.compss.util.StreamGobbler;
import es.bsc.compss.util.Tracer;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.UUID;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public abstract class PipedMirror
implements ExecutionPlatformMirror<PipePair> {
    private static final Logger LOGGER = LogManager.getLogger((String)"es.bsc.compss.Worker.Executor");
    private static final String ERROR_PB_START = "Error starting ProcessBuilder";
    private static final String ERROR_W_START = "Error starting Worker";
    private static final String ERROR_W_PIPE = "Error on Worker pipe";
    protected static final String TOKEN_NEW_LINE = "\n";
    protected static final String TOKEN_SEP = " ";
    protected static final String PIPER_SCRIPT_RELATIVE_PATH = "Runtime" + File.separator + "scripts" + File.separator + "system" + File.separator + "adaptors" + File.separator + "nio" + File.separator + "pipers" + File.separator;
    private static final String PIPE_SCRIPT_NAME = "bindings_piper.sh";
    private static final String PIPE_FILE_BASENAME = "pipe_";
    protected final String mirrorId = String.valueOf(UUID.randomUUID().hashCode());
    protected final int size;
    private final HashMap<String, PipePair> pipePool;
    protected final String basePipePath;
    private final MirrorMonitor monitor;
    private Process pipeBuilderProcess;
    private ControlPipePair pipeBuilderPipe;
    private StreamGobbler pipeBuildeOutGobbler;
    private StreamGobbler pipeBuildeErrGobbler;
    private ControlPipePair pipeWorkerPipe;

    public PipedMirror(InvocationContext context, int size) {
        String workingDir = context.getWorkingDir();
        if (!workingDir.endsWith(File.separator)) {
            workingDir = workingDir + File.separator;
        }
        this.basePipePath = workingDir + PIPE_FILE_BASENAME + this.mirrorId + "_";
        this.size = size;
        this.pipePool = new HashMap();
        this.monitor = new MirrorMonitor();
    }

    public String getMirrorId() {
        return this.mirrorId;
    }

    protected final void init(InvocationContext context) {
        this.monitor.start();
        this.startPipeBuilder(context);
        this.startWorker(context);
    }

    private void startPipeBuilder(InvocationContext context) {
        String installDir = context.getInstallDir();
        String piperScript = installDir + PIPER_SCRIPT_RELATIVE_PATH + PIPE_SCRIPT_NAME;
        LOGGER.debug("PIPE Script: " + piperScript);
        String args = this.constructPipeBuilderArgs(context);
        LOGGER.info("Init piper PipeBuilder");
        ProcessBuilder pb = new ProcessBuilder(piperScript, args);
        try {
            Map<String, String> env = this.getEnvironment(context);
            env.put("COMPSS_WORKING_DIR", context.getWorkingDir());
            env.put("COMPSS_APP_DIR", context.getAppDir());
            pb.directory(new File(this.getPBWorkingDir(context)));
            pb.environment().putAll(env);
            pb.environment().remove("LD_PRELOAD");
            pb.environment().remove("EXTRAE_CONFIG_FILE");
            if (Tracer.extraeEnabled()) {
                long tracingHostId = context.getTracingHostID();
                Tracer.emitEvent((long)tracingHostId, (int)Tracer.getSyncType());
            }
            this.pipeBuilderProcess = pb.start();
            LOGGER.debug("Starting stdout/stderr gobblers ...");
            try {
                this.pipeBuilderProcess.getOutputStream().close();
            }
            catch (IOException tracingHostId) {
                // empty catch block
            }
            while (this.pipeBuilderProcess.isAlive() && !new File(this.pipeBuilderPipe.getOutboundPipe()).exists()) {
            }
            if (!this.pipeBuilderProcess.isAlive()) {
                ErrorManager.fatal((String)ERROR_PB_START);
            }
            this.pipeBuildeOutGobbler = new StreamGobbler(this.pipeBuilderProcess.getInputStream(), null, LOGGER);
            this.pipeBuildeErrGobbler = new StreamGobbler(this.pipeBuilderProcess.getErrorStream(), null, LOGGER);
            this.pipeBuildeOutGobbler.start();
            this.pipeBuildeErrGobbler.start();
            this.monitor.mainProcess(this.pipeBuilderProcess, this.pipeBuilderPipe);
            if (this.pipeBuilderPipe.sendCommand(new PingPipeCommand())) {
                try {
                    this.pipeBuilderPipe.waitForCommand(new PongPipeCommand());
                }
                catch (ClosedPipeException ie) {
                    ErrorManager.fatal((String)ERROR_PB_START);
                }
            } else {
                ErrorManager.fatal((String)ERROR_PB_START);
            }
        }
        catch (IOException e) {
            ErrorManager.error((String)ERROR_PB_START, (Exception)e);
        }
    }

    private String constructPipeBuilderArgs(InvocationContext context) {
        StringBuilder cmd = new StringBuilder();
        this.pipeBuilderPipe = new ControlPipePair(this.basePipePath, "control");
        cmd.append(this.pipeBuilderPipe.getOutboundPipe()).append(TOKEN_SEP);
        cmd.append(this.pipeBuilderPipe.getInboundPipe()).append(TOKEN_SEP);
        StringBuilder writePipes = new StringBuilder();
        StringBuilder readPipes = new StringBuilder();
        for (int i = 0; i < this.size; ++i) {
            String pipeName = "compute" + i;
            PipePair computePipe = new PipePair(this.basePipePath, pipeName);
            this.pipePool.put(pipeName, computePipe);
            writePipes.append(computePipe.getOutboundPipe()).append(TOKEN_SEP);
            readPipes.append(computePipe.getInboundPipe()).append(TOKEN_SEP);
        }
        cmd.append(this.size).append(TOKEN_SEP);
        cmd.append(writePipes.toString());
        cmd.append(this.size).append(TOKEN_SEP);
        cmd.append(readPipes.toString());
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("WRITE PIPE Files: " + writePipes.toString() + TOKEN_NEW_LINE);
            LOGGER.debug("READ PIPE Files: " + readPipes.toString() + TOKEN_NEW_LINE);
            LOGGER.debug("WRITE DATA PIPE: " + this.pipeBuilderPipe + ".outbound");
            LOGGER.debug("READ DATA PIPE: " + this.pipeBuilderPipe + ".inbound");
        }
        cmd.append(Tracer.getLevel()).append(TOKEN_SEP);
        cmd.append(this.getPipeBuilderContext());
        return cmd.toString();
    }

    public abstract String getPipeBuilderContext();

    private void startWorker(InvocationContext context) {
        this.pipeWorkerPipe = new ControlPipePair(this.basePipePath, "control_worker");
        String cmd = this.getLaunchWorkerCommand(context, this.pipeWorkerPipe);
        if (this.pipeBuilderPipe.sendCommand(new StartWorkerPipeCommand(cmd, this.pipeWorkerPipe))) {
            WorkerStartedPipeCommand startedCMD = new WorkerStartedPipeCommand();
            try {
                this.pipeBuilderPipe.waitForCommand(startedCMD);
            }
            catch (ClosedPipeException ie) {
                ErrorManager.fatal((String)ERROR_W_START);
            }
            int workerPID = startedCMD.getPid();
            this.monitor.registerWorker(this.mirrorId, workerPID, this.pipeWorkerPipe);
        } else {
            ErrorManager.fatal((String)ERROR_W_START);
        }
    }

    public abstract String getLaunchWorkerCommand(InvocationContext var1, ControlPipePair var2);

    public abstract Map<String, String> getEnvironment(InvocationContext var1);

    protected String getPBWorkingDir(InvocationContext context) {
        return context.getWorkingDir();
    }

    @Override
    public void stop() {
        this.stopWorker();
        this.stopPiper();
        this.monitor.stop();
    }

    private void stopExecutors() {
        LOGGER.info("Stopping compute pipes for mirror " + this.mirrorId);
        for (String executorId : new LinkedList<String>(this.pipePool.keySet())) {
            this.unregisterExecutor(executorId);
        }
    }

    private void stopWorker() {
        this.stopExecutors();
        LOGGER.info("Stopping mirror " + this.mirrorId);
        if (this.pipeWorkerPipe.sendCommand(new QuitPipeCommand())) {
            try {
                this.pipeWorkerPipe.waitForCommand(new QuitPipeCommand());
            }
            catch (ClosedPipeException closedPipeException) {
                // empty catch block
            }
        }
        this.monitor.unregisterWorker(this.mirrorId);
        this.pipeWorkerPipe.delete();
    }

    private void stopPiper() {
        LOGGER.info("Stopping piper process");
        if (this.pipeBuilderPipe.sendCommand(new QuitPipeCommand())) {
            try {
                this.pipeBuilderPipe.waitForCommand(new QuitPipeCommand());
            }
            catch (ClosedPipeException cpe) {
                ErrorManager.fatal((String)ERROR_W_PIPE);
            }
        } else {
            ErrorManager.fatal((String)ERROR_W_PIPE);
        }
        try {
            LOGGER.info("Waiting for finishing piper process");
            int exitCode = this.pipeBuilderProcess.waitFor();
            this.pipeBuildeOutGobbler.join();
            this.pipeBuildeErrGobbler.join();
            if (exitCode != 0) {
                ErrorManager.error((String)("ExternalExecutor piper ended with " + exitCode + " status"));
            }
        }
        catch (InterruptedException interruptedException) {
        }
        finally {
            if (Tracer.extraeEnabled()) {
                Tracer.emitEvent((long)0L, (int)Tracer.getSyncType());
            }
            if (this.pipeBuilderProcess != null) {
                if (this.pipeBuilderProcess.getInputStream() != null) {
                    try {
                        this.pipeBuilderProcess.getInputStream().close();
                    }
                    catch (IOException iOException) {}
                }
                if (this.pipeBuilderProcess.getErrorStream() != null) {
                    try {
                        this.pipeBuilderProcess.getErrorStream().close();
                    }
                    catch (IOException iOException) {}
                }
            }
        }
        this.pipeBuilderPipe.delete();
        LOGGER.info("ExternalThreadPool finished");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public PipePair registerExecutor(String executorId) {
        PipePair pp;
        boolean createExecutor = false;
        HashMap<String, PipePair> hashMap = this.pipePool;
        synchronized (hashMap) {
            pp = this.pipePool.get(executorId);
            if (pp == null) {
                pp = new PipePair(this.basePipePath, executorId);
                createExecutor = true;
                this.pipePool.put(executorId, pp);
            }
        }
        int executorPID = -1;
        if (createExecutor) {
            if (!this.pipeBuilderPipe.sendCommand(new CreateChannelPipeCommand(pp))) throw new UnsupportedOperationException("Not yet implemented. Specific exception should be raised");
            ChannelCreatedPipeCommand createdPipe = new ChannelCreatedPipeCommand(pp);
            try {
                this.pipeBuilderPipe.waitForCommand(createdPipe);
            }
            catch (ClosedPipeException ie) {
                throw new UnsupportedOperationException("Not yet implemented. Specific exception should be raised");
            }
            if (!this.pipeWorkerPipe.sendCommand(new AddExecutorPipeCommand(pp))) throw new UnsupportedOperationException("Not yet implemented. Specific exception should be raised");
            try {
                AddedExecutorPipeCommand reply = new AddedExecutorPipeCommand(pp);
                this.pipeWorkerPipe.waitForCommand(reply);
                executorPID = reply.getPid();
            }
            catch (ClosedPipeException ie) {
                throw new UnsupportedOperationException("Not yet implemented. Specific exception should be raised");
            }
        }
        if (!this.pipeWorkerPipe.sendCommand(new ExecutorPIDQueryPipeCommand(pp))) throw new UnsupportedOperationException("Not yet implemented. Specific exception should be raised");
        try {
            ExecutorPIDReplyPipeCommand reply = new ExecutorPIDReplyPipeCommand(pp);
            this.pipeWorkerPipe.waitForCommand(reply);
            executorPID = reply.getPids().get(0);
        }
        catch (ClosedPipeException ie) {
            throw new UnsupportedOperationException("Not yet implemented. Specific exception should be raised");
        }
        this.monitor.registerExecutor(this, executorId, executorPID, pp);
        return pp;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unregisterExecutor(String executorId) {
        PipePair pp;
        HashMap<String, PipePair> hashMap = this.pipePool;
        synchronized (hashMap) {
            pp = this.pipePool.remove(executorId);
        }
        if (pp == null) {
            return;
        }
        if (pp.sendCommand(new QuitPipeCommand())) {
            try {
                PipeCommand command = null;
                while (command == null && (command = pp.readCommand()).getType() == ExternalCommand.CommandType.QUIT) {
                }
            }
            catch (ExternalExecutorException command) {
                // empty catch block
            }
        }
        this.monitor.unregisterExecutor(this, executorId);
        if (this.pipeWorkerPipe.sendCommand(new RemoveExecutorPipeCommand(pp))) {
            try {
                this.pipeWorkerPipe.waitForCommand(new RemovedExecutorPipeCommand(pp));
            }
            catch (ClosedPipeException cpe) {
                ErrorManager.fatal((String)ERROR_W_PIPE);
            }
        }
        pp.delete();
        LOGGER.debug("EXECUTOR " + executorId + " shut down!");
    }
}

