/*
 * Decompiled with CFR 0.152.
 */
package es.bsc.compss.invokers.external;

import es.bsc.compss.COMPSsConstants;
import es.bsc.compss.exceptions.InvokeExecutionException;
import es.bsc.compss.exceptions.StreamCloseException;
import es.bsc.compss.execution.types.InvocationResources;
import es.bsc.compss.executor.external.commands.ExecuteTaskExternalCommand;
import es.bsc.compss.executor.external.piped.commands.ExecuteTaskPipeCommand;
import es.bsc.compss.invokers.external.ExternalInvoker;
import es.bsc.compss.invokers.types.PythonParams;
import es.bsc.compss.invokers.types.StdIOStream;
import es.bsc.compss.invokers.util.BinaryRunner;
import es.bsc.compss.types.CollectionLayout;
import es.bsc.compss.types.execution.Invocation;
import es.bsc.compss.types.execution.InvocationContext;
import es.bsc.compss.types.execution.LanguageParams;
import es.bsc.compss.types.execution.exceptions.JobExecutionException;
import es.bsc.compss.types.implementations.definition.PythonMPIDefinition;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;

public class PythonMPIInvoker
extends ExternalInvoker {
    private static final int NUM_BASE_PYTHON_MPI_ARGS = 8;
    private final PythonMPIDefinition mpiDef;
    private BinaryRunner br;

    public PythonMPIInvoker(InvocationContext context, Invocation invocation, File taskSandboxWorkingDir, InvocationResources assignedResources) throws JobExecutionException {
        super(context, invocation, taskSandboxWorkingDir, assignedResources);
        try {
            this.mpiDef = (PythonMPIDefinition)invocation.getMethodImplementation().getDefinition();
            this.mpiDef.setRunnerProperties(context.getInstallDir());
        }
        catch (Exception e) {
            throw new JobExecutionException("Incorrect method definition for task of type " + (Object)((Object)invocation.getMethodImplementation().getMethodType()), e);
        }
        super.appendOtherExecutionCommandArguments();
        this.br = null;
    }

    @Override
    protected ExecuteTaskExternalCommand getTaskExecutionCommand(InvocationContext context, Invocation invocation, String sandBox, InvocationResources assignedResources) {
        ExecuteTaskPipeCommand taskExecution = new ExecuteTaskPipeCommand(invocation.getJobId());
        return taskExecution;
    }

    @Override
    protected void invokeMethod() throws JobExecutionException {
        Object retObj;
        try {
            this.mpiDef.checkArguments();
        }
        catch (IllegalArgumentException e) {
            throw new JobExecutionException(e);
        }
        try {
            retObj = this.runPythonMPIInvocation();
        }
        catch (InvokeExecutionException e1) {
            throw new JobExecutionException(e1);
        }
        try {
            if (this.br != null) {
                String pythonInterpreter = null;
                LanguageParams lp = this.context.getLanguageParams(COMPSsConstants.Lang.PYTHON);
                if (lp instanceof PythonParams) {
                    PythonParams pp = (PythonParams)lp;
                    pythonInterpreter = pp.checkCoverageAndGetPythonInterpreter();
                }
                this.br.closeStreams(this.invocation.getParams(), pythonInterpreter);
            }
        }
        catch (StreamCloseException se) {
            LOGGER.error("Exception closing binary streams", (Throwable)se);
            throw new JobExecutionException(se);
        }
        if (this.invocation.isDebugEnabled()) {
            LOGGER.debug("Exit value of MPI executor of job " + this.invocation.getJobId() + " of task " + this.invocation.getTaskId() + ": " + retObj.toString());
        }
        if (retObj.toString().compareTo("0") != 0) {
            throw new JobExecutionException("Received non-zero exit value: " + retObj.toString() + " for job " + this.invocation.getJobId() + " of task " + this.invocation.getTaskId());
        }
    }

    private Object runPythonMPIInvocation() throws InvokeExecutionException {
        int i;
        String taskCMD = this.command.getAsString();
        String pythonInterpreter = null;
        LanguageParams lp = this.context.getLanguageParams(COMPSsConstants.Lang.PYTHON);
        if (!(lp instanceof PythonParams)) {
            LOGGER.error("Incorrect language parameters for PYTHON MPI task. No Python language parameters");
            throw new InvokeExecutionException("Incorrect language parameters for PYTHON MPI task. No Python language parameters");
        }
        PythonParams pp = (PythonParams)lp;
        pythonInterpreter = pp.checkCoverageAndGetPythonInterpreter();
        int numMPIFlags = 0;
        String mpiFlags = this.mpiDef.getMpiFlags();
        String[] mpiflagsArray = null;
        if (mpiFlags == null || mpiFlags.isEmpty()) {
            mpiflagsArray = mpiFlags.split(" ");
            numMPIFlags = mpiflagsArray.length;
        }
        String[] pythonInterpreterArray = null;
        int interpreterflags = 0;
        if (pythonInterpreter != null) {
            pythonInterpreterArray = pythonInterpreter.split(" ");
            interpreterflags = pythonInterpreterArray.length - 1;
        }
        int numBasePythonMpiArgs = 8;
        numBasePythonMpiArgs = numBasePythonMpiArgs + numMPIFlags + interpreterflags;
        StdIOStream streamValues = new StdIOStream();
        ArrayList<String> binaryParams = BinaryRunner.createCMDParametersFromValues(this.invocation.getParams(), this.invocation.getTarget(), streamValues, pythonInterpreter);
        String[] cmd = new String[numBasePythonMpiArgs + binaryParams.size()];
        cmd[0] = this.mpiDef.getMpiRunner();
        cmd[1] = this.mpiDef.getHostsFlag();
        try {
            cmd[2] = this.mpiDef.generateHostsDefinition(this.taskSandboxWorkingDir, this.hostnames, this.computingUnits);
        }
        catch (IOException ioe) {
            throw new InvokeExecutionException("ERROR: writting hostfile", ioe);
        }
        cmd[3] = "-n";
        cmd[4] = this.mpiDef.generateNumberOfProcesses(this.numWorkers, this.computingUnits);
        for (i = 0; i < numMPIFlags; ++i) {
            cmd[5 + i] = mpiflagsArray[i];
        }
        if (pythonInterpreterArray != null || pythonInterpreterArray.length > 0) {
            for (i = 0; i < pythonInterpreterArray.length; ++i) {
                cmd[numBasePythonMpiArgs - (2 + pythonInterpreterArray.length - i)] = pythonInterpreterArray[i];
            }
        } else {
            LOGGER.error("Incorrect python interpreter parameter");
            throw new InvokeExecutionException("Incorrect python interpreter parameter");
        }
        String installDir = this.context.getInstallDir();
        String pycompssRelativePath = File.separator + "Bindings" + File.separator + "python";
        String pythonVersion = ((PythonParams)this.context.getLanguageParams(COMPSsConstants.Lang.PYTHON)).getPythonVersion();
        String pyCOMPSsHome = installDir + pycompssRelativePath + File.separator + pythonVersion;
        cmd[numBasePythonMpiArgs - 2] = pyCOMPSsHome + File.separator + "pycompss" + File.separator + "worker" + File.separator + "external" + File.separator + "mpi_executor.py";
        CollectionLayout[] cls = this.mpiDef.getCollectionLayouts();
        int collectionLayoutNum = cls == null ? 0 : cls.length;
        StringBuilder collectionLayoutParams = new StringBuilder(" ");
        for (CollectionLayout cl : cls) {
            collectionLayoutParams.append(cl.getParamName()).append(" ");
            collectionLayoutParams.append(Integer.toString(cl.getBlockCount())).append(" ");
            collectionLayoutParams.append(Integer.toString(cl.getBlockLen())).append(" ");
            collectionLayoutParams.append(Integer.toString(cl.getBlockStride())).append(" ");
        }
        collectionLayoutParams.append(Integer.toString(collectionLayoutNum));
        cmd[numBasePythonMpiArgs - 1] = taskCMD + collectionLayoutParams.toString();
        String pythonPath = System.getenv("PYTHONPATH");
        pythonPath = pyCOMPSsHome + ":" + pythonPath;
        for (int i2 = 0; i2 < binaryParams.size(); ++i2) {
            cmd[numBasePythonMpiArgs + i2] = binaryParams.get(i2);
        }
        if (this.invocation.isDebugEnabled()) {
            PrintStream outLog = this.context.getThreadOutStream();
            outLog.println("");
            outLog.println("[Python MPI INVOKER] Begin MPI call to " + this.mpiDef.getDeclaringClass() + "." + this.mpiDef.getAlternativeMethodName());
            outLog.println("[Python MPI INVOKER] On WorkingDir : " + this.taskSandboxWorkingDir.getAbsolutePath());
            outLog.print("[Python MPI INVOKER] MPI CMD: ");
            for (int i3 = 0; i3 < cmd.length; ++i3) {
                outLog.print(cmd[i3] + " ");
            }
            outLog.println("");
        }
        this.br = new BinaryRunner();
        return this.br.executeCMD(cmd, streamValues, this.taskSandboxWorkingDir, this.context.getThreadOutStream(), this.context.getThreadErrStream(), pythonPath, this.mpiDef.isFailByEV());
    }

    @Override
    public void cancelMethod() {
        LOGGER.debug("Cancelling PythonMPI process");
        if (this.br != null) {
            this.br.cancelProcess();
        }
    }

    @Override
    protected int getNumThreads(InvocationContext context, Invocation invocation) {
        int ppn = this.mpiDef.getPPN();
        if (ppn > 1) {
            return this.computingUnits / ppn;
        }
        return this.computingUnits;
    }

    @Override
    protected void setEnvironmentVariables() {
        super.setEnvironmentVariables();
        int ppn = this.mpiDef.getPPN();
        if (LOGGER.isDebugEnabled()) {
            System.out.println("[PYTHON MPI INVOKER] OVERWRITING COMPSS_NUM_PROCS: " + this.computingUnits);
        }
        System.setProperty("COMPSS_NUM_PROCS", String.valueOf(this.computingUnits));
        if (ppn > 1) {
            int threads = this.computingUnits / ppn;
            System.setProperty("COMPSS_NUM_THREADS", String.valueOf(threads));
            System.setProperty("OMP_NUM_THREADS", String.valueOf(threads));
            if (LOGGER.isDebugEnabled()) {
                System.out.println("[INVOKER] OVEWRITING COMPSS_NUM_THREADS: " + threads);
            }
        }
    }
}

