/*
 * Decompiled with CFR 0.152.
 */
package es.bsc.distrostreamlib.client;

import es.bsc.distrostreamlib.exceptions.DistroStreamClientInitException;
import es.bsc.distrostreamlib.requests.Request;
import es.bsc.distrostreamlib.requests.StopRequest;
import es.bsc.distrostreamlib.types.RequestType;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Scanner;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class DistroStreamClient
extends Thread {
    private static final Logger LOGGER = LogManager.getLogger((String)"es.bsc.distroStreamLib.client");
    private static final boolean DEBUG = LOGGER.isDebugEnabled();
    private static final int MAX_READ_RETRIES = 5;
    private static final int TIME_BETWEEN_READ_RETRIES = 10;
    private static DistroStreamClient client;
    private final LinkedBlockingDeque<Request> requests;
    private boolean keepRunning;
    private final String masterIP;
    private final int masterPort;

    private DistroStreamClient(String masterIP, int masterPort) throws DistroStreamClientInitException {
        LOGGER.info("DS Client initialized");
        this.keepRunning = true;
        this.requests = new LinkedBlockingDeque();
        this.masterIP = masterIP;
        this.masterPort = masterPort;
    }

    @Override
    public void run() {
        LOGGER.info("DS Client started");
        this.processRequests();
        LOGGER.info("DS Client stopped");
    }

    private void processRequests() {
        while (this.keepRunning) {
            Request cr = null;
            try {
                cr = this.requests.take();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (cr == null) continue;
            if (DEBUG) {
                LOGGER.debug("Processing request " + cr.getType());
            }
            if (cr.getType().equals((Object)RequestType.STOP)) {
                LOGGER.info("DS Client asked to stop");
                this.keepRunning = false;
                cr.setResponse("DONE");
            } else {
                this.processRequest(cr);
            }
            cr.setProcessed();
        }
    }

    private void processRequest(Request cr) {
        try (Socket socket = new Socket(this.masterIP, this.masterPort);
             PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
             Scanner in = new Scanner(socket.getInputStream());){
            String reqMsg = cr.getRequestMessage();
            if (DEBUG) {
                LOGGER.debug("Sending request to server: " + reqMsg);
            }
            out.println(reqMsg);
            LOGGER.debug("Receiving answer from server...");
            for (int numRetries = 0; !in.hasNextLine() && numRetries < 5; ++numRetries) {
                Thread.sleep(10L);
            }
            if (in.hasNextLine()) {
                String responseMessage = in.nextLine();
                if (DEBUG) {
                    LOGGER.debug("Received answer from server: " + responseMessage);
                }
                cr.setResponse(responseMessage);
            } else {
                LOGGER.error("Error receiving request answer (timeout)");
                cr.setError(2, "ERROR: Timeout on answer wait");
            }
        }
        catch (IOException ioe) {
            LOGGER.error("Error sending request", (Throwable)ioe);
            cr.setError(1, ioe.getMessage());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void addRequest(Request r) {
        boolean success;
        if (DEBUG) {
            LOGGER.debug("Adding new request to client queue: " + r.getType());
        }
        if (!(success = this.requests.offer(r))) {
            LOGGER.error("Unexpected error adding request to queue. Skipping request.");
        }
    }

    public static void initAndStart(String masterIP, int masterPort) throws DistroStreamClientInitException {
        try {
            client = new DistroStreamClient(masterIP, masterPort);
        }
        catch (DistroStreamClientInitException dcie) {
            LOGGER.error("ERROR: Cannot start DistroStream Client", (Throwable)dcie);
            throw dcie;
        }
        client.setName("DistroStreamClient");
        client.start();
    }

    public static void setStop() {
        StopRequest stopRequest = new StopRequest();
        client.addRequest((Request)stopRequest);
    }

    public static void request(Request r) {
        client.addRequest(r);
    }
}

