/*
 * Decompiled with CFR 0.152.
 */
package es.bsc.compss.executor.external.piped;

import es.bsc.compss.executor.external.ExternalExecutor;
import es.bsc.compss.executor.external.ExternalExecutorException;
import es.bsc.compss.executor.external.commands.ExternalCommand;
import es.bsc.compss.executor.external.piped.PipedMirror;
import es.bsc.compss.executor.external.piped.commands.AddedExecutorPipeCommand;
import es.bsc.compss.executor.external.piped.commands.AliveReplyPipeCommand;
import es.bsc.compss.executor.external.piped.commands.ChannelCreatedPipeCommand;
import es.bsc.compss.executor.external.piped.commands.CompssExceptionPipeCommand;
import es.bsc.compss.executor.external.piped.commands.EndTaskPipeCommand;
import es.bsc.compss.executor.external.piped.commands.ErrorPipeCommand;
import es.bsc.compss.executor.external.piped.commands.ExecutorPIDReplyPipeCommand;
import es.bsc.compss.executor.external.piped.commands.PipeCommand;
import es.bsc.compss.executor.external.piped.commands.PongPipeCommand;
import es.bsc.compss.executor.external.piped.commands.QuitPipeCommand;
import es.bsc.compss.executor.external.piped.commands.RemovedExecutorPipeCommand;
import es.bsc.compss.executor.external.piped.commands.WorkerStartedPipeCommand;
import es.bsc.compss.executor.external.piped.exceptions.ClosedPipeException;
import es.bsc.compss.executor.external.piped.exceptions.UnknownCommandException;
import es.bsc.compss.util.ErrorManager;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class PipePair
implements ExternalExecutor<PipeCommand> {
    private static final Logger LOGGER = LogManager.getLogger((String)"es.bsc.compss.Worker.Executor");
    private static final String ERROR_PIPE_CLOSE = "Error on closing pipe ";
    private static final String WRITE_PIPE_CLOSING_NOT_EXISTS = "Deleted pipe being written blocks the execution";
    private static final String READ_PIPE_CLOSING_NOT_EXISTS = "Deleted pipe read written blocks the execution";
    private static final String CLOSED_PIPE_ERROR = "CLOSED_PIPE_ERROR";
    protected static final String TOKEN_NEW_LINE = "\n";
    protected static final String TOKEN_SEP = " ";
    private static final int MAX_WRITE_PIPE_RETRIES = 3;
    private static final int PIPE_ERROR_WAIT_TIME = 50;
    private static final long PIPE_READ_COMMAND_PERIOD = 20L;
    private final String pipePath;
    private BufferedReader reader;
    private final Lock sendMutex = new ReentrantLock();
    private int senders;
    private int readers;
    private boolean closed = false;
    private final PipedMirror mirror;

    public PipePair(String basePipePath, String id, PipedMirror mirror) {
        this.pipePath = basePipePath + id;
        this.mirror = mirror;
    }

    public final String getPipesLocation() {
        return this.pipePath;
    }

    public final String getInboundPipe() {
        return this.pipePath + ".inbound";
    }

    public final String getOutboundPipe() {
        return this.pipePath + ".outbound";
    }

    public PipedMirror getMirror() {
        return this.mirror;
    }

    public final void delete() {
        File f = new File(this.pipePath + ".inbound");
        if (f.exists()) {
            f.delete();
        }
        if ((f = new File(this.pipePath + ".outbound")).exists()) {
            f.delete();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean sendCommand(PipeCommand command) {
        boolean done = false;
        int retries = 0;
        String taskCMD = command.getAsString();
        String writePipe = this.pipePath + ".outbound";
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("EXECUTOR COMMAND: " + taskCMD + " @ " + writePipe);
        }
        taskCMD = taskCMD + TOKEN_NEW_LINE;
        while (!done && retries < 3) {
            if (!new File(writePipe).exists()) {
                LOGGER.debug("Warn pipe doesn't exist. Retry");
                ++retries;
            } else {
                FileOutputStream output = null;
                PipePair pipePair = this;
                synchronized (pipePair) {
                    ++this.senders;
                }
                this.sendMutex.lock();
                try {
                    output = new FileOutputStream(writePipe, true);
                    ((OutputStream)output).write(taskCMD.getBytes());
                    output.flush();
                    pipePair = this;
                    synchronized (pipePair) {
                        --this.senders;
                        done = !this.closed;
                    }
                    LOGGER.debug("Written " + taskCMD + " into " + writePipe);
                }
                catch (IOException e) {
                    PipePair pipePair2 = this;
                    synchronized (pipePair2) {
                        --this.senders;
                    }
                    LOGGER.debug("Error on pipe write. Retry");
                    ++retries;
                }
                finally {
                    if (output != null) {
                        try {
                            ((OutputStream)output).close();
                        }
                        catch (Exception e) {
                            ErrorManager.error((String)(ERROR_PIPE_CLOSE + writePipe), (Exception)e);
                        }
                    }
                    this.sendMutex.unlock();
                }
            }
            if (done) continue;
            try {
                Thread.sleep(50L);
            }
            catch (InterruptedException e) {
                LOGGER.debug("Pipe error wait time for message " + command + " on pipe " + this.getPipesLocation());
            }
        }
        if (!done) {
            LOGGER.debug("Failed to send " + command + " on pipe " + this.getPipesLocation());
        }
        return done;
    }

    public String toString() {
        return "READ pipe: " + this.pipePath + ".inbound  WRITE pipe:" + this.pipePath + ".outbound";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public PipeCommand readCommand() throws ExternalExecutorException {
        PipeCommand readCommand = null;
        PipePair pipePair = this;
        synchronized (pipePair) {
            if (this.closed) {
                throw new ClosedPipeException();
            }
            ++this.readers;
        }
        if (this.reader == null) {
            try {
                String readPipe = this.getInboundPipe();
                FileInputStream input = new FileInputStream(readPipe);
                this.reader = new BufferedReader(new InputStreamReader(input));
            }
            catch (FileNotFoundException fnfe) {
                throw new ExternalExecutorException(fnfe);
            }
        }
        PipePair fnfe = this;
        synchronized (fnfe) {
            --this.readers;
        }
        try {
            String line = null;
            while (line == null || line.length() == 0) {
                if (this.closed) {
                    throw new ClosedPipeException();
                }
                line = this.reader.readLine();
                if (line != null) continue;
                try {
                    Thread.sleep(20L);
                }
                catch (InterruptedException interruptedException) {}
            }
            LOGGER.debug(Thread.currentThread().getName() + " READS -" + line + "-(" + line.length() + ")");
            readCommand = this.readCommand(line, line.split(TOKEN_SEP));
        }
        catch (IOException ioe) {
            throw new ExternalExecutorException(ioe);
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("EXECUTOR COMMAND: " + readCommand + " @ " + this.getInboundPipe());
        }
        return readCommand;
    }

    private PipeCommand readCommand(String cmd, String[] command) throws ClosedPipeException, UnknownCommandException {
        PipeCommand readCommand = null;
        String commandTypeTag = command[0].toUpperCase();
        if (commandTypeTag.compareTo(CLOSED_PIPE_ERROR) == 0) {
            throw new ClosedPipeException();
        }
        ExternalCommand.CommandType commandType = ExternalCommand.CommandType.valueOf(commandTypeTag);
        switch (commandType) {
            case PONG: {
                readCommand = new PongPipeCommand();
                break;
            }
            case WORKER_STARTED: {
                readCommand = new WorkerStartedPipeCommand(command);
                break;
            }
            case ALIVE_REPLY: {
                readCommand = new AliveReplyPipeCommand(command);
                break;
            }
            case CHANNEL_CREATED: {
                readCommand = new ChannelCreatedPipeCommand(command);
                break;
            }
            case REPLY_EXECUTOR_ID: {
                readCommand = new ExecutorPIDReplyPipeCommand(command);
                break;
            }
            case ADDED_EXECUTOR: {
                readCommand = new AddedExecutorPipeCommand(command);
                break;
            }
            case REMOVED_EXECUTOR: {
                readCommand = new RemovedExecutorPipeCommand(command);
                break;
            }
            case QUIT: {
                LOGGER.debug("Received quit message");
                readCommand = new QuitPipeCommand();
                break;
            }
            case END_TASK: {
                if (command.length < 3) {
                    LOGGER.warn("WARN: Skipping endTask line because is malformed");
                    break;
                }
                readCommand = new EndTaskPipeCommand(command);
                LOGGER.debug("Received endTask message: " + readCommand.getAsString());
                break;
            }
            case COMPSS_EXCEPTION: {
                if (command.length < 2) {
                    LOGGER.warn("WARN: Skipping endTask line because is malformed");
                    break;
                }
                readCommand = new CompssExceptionPipeCommand(command);
                LOGGER.debug("Received compssException message: " + ((CompssExceptionPipeCommand)readCommand).getMessage());
                break;
            }
            case ERROR: {
                String[] expected = Arrays.copyOfRange(command, 1, command.length);
                PipeCommand expectedCommand = this.readCommand(cmd, expected);
                readCommand = new ErrorPipeCommand(expectedCommand);
                break;
            }
            default: {
                throw new UnknownCommandException(cmd);
            }
        }
        return readCommand;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void noLongerExists() {
        int readers;
        int senders;
        PipePair pipePair = this;
        synchronized (pipePair) {
            this.closed = true;
            senders = this.senders;
            readers = this.readers;
        }
        String readPipe = this.pipePath + ".outbound";
        while (senders > 0) {
            try (FileInputStream input = new FileInputStream(readPipe);){
                input.read();
            }
            catch (FileNotFoundException fnfe) {
                ErrorManager.fatal((String)WRITE_PIPE_CLOSING_NOT_EXISTS, (Exception)fnfe);
            }
            catch (IOException ioe) {
                ErrorManager.fatal((String)WRITE_PIPE_CLOSING_NOT_EXISTS, (Exception)ioe);
            }
            try {
                Thread.sleep(50L);
            }
            catch (InterruptedException ioe) {
                // empty catch block
            }
            PipePair ioe = this;
            synchronized (ioe) {
                senders = this.senders;
            }
        }
        String writePipe = this.pipePath + ".inbound";
        if (readers > 0) {
            File pipe = new File(writePipe);
            if (!pipe.exists()) {
                ErrorManager.fatal((String)READ_PIPE_CLOSING_NOT_EXISTS);
            } else {
                try (FileOutputStream fos = new FileOutputStream(writePipe);){
                    String message = "CLOSED_PIPE_ERROR\n";
                    fos.write(message.getBytes());
                    fos.flush();
                }
                catch (IOException ioe) {
                    ioe.printStackTrace();
                }
            }
        }
        if (this.reader != null) {
            try {
                this.reader.close();
                this.reader = null;
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }
}

