/*
 * Decompiled with CFR 0.152.
 */
package es.bsc.comm.stage;

import es.bsc.comm.CommProperties;
import es.bsc.comm.Connection;
import es.bsc.comm.EventManager;
import es.bsc.comm.InternalConnection;
import es.bsc.comm.exceptions.CommException;
import es.bsc.comm.exceptions.SerializerException;
import es.bsc.comm.exceptions.StageException;
import es.bsc.comm.exceptions.ViabilityException;
import es.bsc.comm.stage.Token;
import es.bsc.comm.stage.Transfer;
import es.bsc.comm.util.Serializer;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class Submission
extends Transfer {
    private static final Logger LOGGER = LogManager.getLogger("Communication.Stage");
    private static final Object tokenLock = new Object();
    private static Integer tokens = CommProperties.getMaxSends();
    private static LinkedList<InternalConnection> pausedConnections = new LinkedList();
    protected InputStream streamIn;
    private boolean hasToken = false;
    protected Token token;

    public Submission(Transfer.Type type, byte[] array) {
        super(true);
        this.type = type;
        this.destination = Transfer.Destination.ARRAY;
        this.array = array;
        this.remainingSize = 0L;
    }

    public Submission(Transfer.Type type, Object o) {
        super(true);
        this.type = type;
        this.destination = Transfer.Destination.OBJECT;
        this.object = o;
        this.remainingSize = 0L;
    }

    public Submission(String name) {
        super(true);
        this.type = Transfer.Type.DATA;
        this.destination = Transfer.Destination.FILE;
        this.fileName = name;
        this.remainingSize = 0L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void increaseTokens() {
        Object object = tokenLock;
        synchronized (object) {
            tokens = tokens + 1;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void decreaseTokens() {
        Object object = tokenLock;
        synchronized (object) {
            tokens = tokens - 1;
        }
    }

    private void setSize() {
        long length = 0L;
        if (this.destination == Transfer.Destination.OBJECT || this.destination == Transfer.Destination.ARRAY) {
            length = this.array.length;
        } else if (this.destination == Transfer.Destination.FILE) {
            File f = new File(this.fileName);
            length = f.length();
        }
        this.setSize(length);
    }

    private void openStream() {
        try {
            if (this.destination == Transfer.Destination.FILE) {
                this.streamIn = new FileInputStream(this.fileName);
            } else {
                if (this.destination == Transfer.Destination.OBJECT) {
                    this.array = Serializer.serialize(this.object);
                }
                this.streamIn = new ByteArrayInputStream(this.array);
            }
        }
        catch (IOException e) {
            LOGGER.error("Exception openning stream for " + this, (Throwable)e);
        }
        catch (SerializerException se) {
            LOGGER.error("Exception serializing object for " + this, (Throwable)se);
        }
    }

    public void loadFirstToken() throws IOException {
        int available = this.streamIn.available();
        int tokenSize = (int)Math.min(Math.min(10240L, (long)available + 16L), this.remainingSize + 16L);
        byte[] content = new byte[tokenSize];
        ByteBuffer bb = ByteBuffer.wrap(content);
        bb.putLong(this.totalSize);
        bb.putInt(this.type.ordinal());
        bb.putInt(this.destination.ordinal());
        this.streamIn.read(content, 16, tokenSize - 16);
        this.remainingSize -= (long)(tokenSize - 16);
        if (this.remainingSize == 0L) {
            this.closeStream();
        }
        this.token = new Token(content);
    }

    public void loadNextToken() throws IOException {
        int available = this.streamIn.available();
        int tokenSize = (int)Math.min((long)Math.min(10240, available), this.remainingSize);
        byte[] content = new byte[tokenSize];
        this.streamIn.read(content, 0, tokenSize);
        this.remainingSize -= (long)tokenSize;
        if (this.remainingSize == 0L) {
            this.closeStream();
        }
        this.token = new Token(content);
    }

    private void closeStream() {
        try {
            this.streamIn.close();
        }
        catch (IOException e) {
            LOGGER.error("Error closing input stream on connection " + this, (Throwable)e);
        }
    }

    @Override
    public Transfer.Direction getDirection() {
        return Transfer.Direction.SEND;
    }

    @Override
    public void start(InternalConnection connection, List<ByteBuffer> received, List<ByteBuffer> transmit) throws StageException {
        Submission.decreaseTokens();
        this.hasToken = true;
        this.openStream();
        this.setSize();
        try {
            this.loadFirstToken();
        }
        catch (IOException ioe) {
            throw new StageException(ioe);
        }
        this.sendTokenPacket(this.token, transmit);
    }

    @Override
    public void progress(InternalConnection connection, List<ByteBuffer> received, List<ByteBuffer> transmit) throws StageException {
        this.sendToken(this.token, transmit);
        while (this.token.isCompletelyRead() && this.remainingSize > 0L) {
            try {
                this.loadNextToken();
            }
            catch (IOException ioe) {
                throw new StageException(ioe);
            }
            this.sendToken(this.token, transmit);
        }
    }

    @Override
    public boolean checkViability(boolean closedCommunication, List<ByteBuffer> received, List<ByteBuffer> transmit) throws ViabilityException {
        if (!closedCommunication) {
            return tokens > 0;
        }
        throw new ViabilityException("Channel was already closed for connection " + this);
    }

    @Override
    public void pause(InternalConnection ic) {
        pausedConnections.add(ic);
    }

    @Override
    public boolean isComplete(List<ByteBuffer> received, List<ByteBuffer> transmit) {
        return this.sizeInit && this.remainingSize == 0L && transmit.isEmpty() && this.token.isCompletelyRead();
    }

    @Override
    public void notifyCompletion(Connection c, EventManager<?> em) {
        em.writeFinished(c, this);
        Submission.increaseTokens();
        this.hasToken = false;
        if (!pausedConnections.isEmpty()) {
            InternalConnection ic = pausedConnections.removeFirst();
            ic.resume();
        }
    }

    @Override
    public void notifyError(Connection c, EventManager<?> em, CommException exc) {
        em.notifyError(c, this, exc);
        if (this.hasToken) {
            Submission.increaseTokens();
            this.hasToken = false;
            if (!pausedConnections.isEmpty()) {
                InternalConnection ic = pausedConnections.removeFirst();
                ic.resume();
            }
        }
    }
}

