/*
 * Decompiled with CFR 0.152.
 */
package es.bsc.distrostreamlib.server;

import es.bsc.distrostreamlib.requests.StopRequest;
import es.bsc.distrostreamlib.server.types.StreamBackend;
import es.bsc.distrostreamlib.server.types.StreamInfo;
import es.bsc.distrostreamlib.types.ConsumerMode;
import es.bsc.distrostreamlib.types.RequestType;
import es.bsc.distrostreamlib.types.StreamType;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class DistroStreamServer
extends Thread {
    private static final Logger LOGGER = LogManager.getLogger((String)"es.bsc.distroStreamLib.server");
    private static final boolean DEBUG = LOGGER.isDebugEnabled();
    private static final int DEFAULT_SERVER_PORT = 49049;
    private static final int DEFAULT_BOOTSTRAP_PORT = 49001;
    private static DistroStreamServer server;
    private final String serverName;
    private final int serverPort;
    private final StreamBackend streamBackend;
    private boolean keepRunning;
    private List<String> registeredClients;
    private Map<UUID, StreamInfo> registeredStreams;
    private Map<String, UUID> registeredStreamAlias;

    public DistroStreamServer(String serverName, Integer serverPort, StreamBackend streamBackend) {
        this.serverName = serverName;
        this.serverPort = serverPort == null ? 49049 : serverPort;
        this.streamBackend = streamBackend;
        this.keepRunning = true;
        this.registeredClients = new LinkedList<String>();
        this.registeredStreams = new HashMap<UUID, StreamInfo>();
        this.registeredStreamAlias = new HashMap<String, UUID>();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        LOGGER.info("DS Server started");
        if (DEBUG) {
            LOGGER.debug("DS Server connecting to socket " + this.serverPort + " ...");
        }
        try (ServerSocket listener = new ServerSocket(this.serverPort);){
            LOGGER.debug("DS Server connected to socket...");
            this.processRequests(listener);
        }
        catch (IOException ioe) {
            LOGGER.error("DS Server raised an exception", (Throwable)ioe);
        }
        finally {
            LOGGER.debug("DS Server disconnected from socket...");
        }
        LOGGER.info("DS Server stopped");
    }

    public StreamBackend getStreamBackend() {
        return this.streamBackend;
    }

    private void processRequests(ServerSocket listener) throws IOException {
        LOGGER.debug("Processing requests...");
        while (this.keepRunning) {
            Socket socket = listener.accept();
            Throwable throwable = null;
            try {
                Scanner in = new Scanner(socket.getInputStream());
                Throwable throwable2 = null;
                try {
                    PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
                    Throwable throwable3 = null;
                    try {
                        while (in.hasNextLine()) {
                            String info = in.nextLine();
                            String answer = this.processMessage(info);
                            out.println(answer);
                            out.flush();
                        }
                    }
                    catch (Throwable throwable4) {
                        throwable3 = throwable4;
                        throw throwable4;
                    }
                    finally {
                        if (out == null) continue;
                        if (throwable3 != null) {
                            try {
                                out.close();
                            }
                            catch (Throwable throwable5) {
                                throwable3.addSuppressed(throwable5);
                            }
                            continue;
                        }
                        out.close();
                    }
                }
                catch (Throwable throwable6) {
                    throwable2 = throwable6;
                    throw throwable6;
                }
                finally {
                    if (in == null) continue;
                    if (throwable2 != null) {
                        try {
                            in.close();
                        }
                        catch (Throwable throwable7) {
                            throwable2.addSuppressed(throwable7);
                        }
                        continue;
                    }
                    in.close();
                }
            }
            catch (Throwable throwable8) {
                throwable = throwable8;
                throw throwable8;
            }
            finally {
                if (socket == null) continue;
                if (throwable != null) {
                    try {
                        socket.close();
                    }
                    catch (Throwable throwable9) {
                        throwable.addSuppressed(throwable9);
                    }
                    continue;
                }
                socket.close();
            }
        }
    }

    private String processMessage(String message) {
        if (DEBUG) {
            LOGGER.debug("Processing message: " + message);
        }
        String answer = null;
        String[] content = message.split(" ");
        RequestType token = RequestType.valueOf((String)content[0].trim().toUpperCase());
        switch (token) {
            case REGISTER_CLIENT: {
                assert (content.length == 2);
                String regClientIP = content[1].trim();
                this.registeredClients.add(regClientIP);
                answer = "DONE";
                break;
            }
            case UNREGISTER_CLIENT: {
                assert (content.length == 2);
                String unregClientIP = content[1].trim();
                this.registeredClients.remove(unregClientIP);
                answer = "DONE";
                break;
            }
            case BOOTSTRAP_SERVER: {
                assert (content.length == 1);
                answer = this.serverName + ":" + 49001;
                break;
            }
            case REGISTER_STREAM: {
                assert (content.length >= 4);
                StreamType streamType = StreamType.valueOf((String)content[1].trim().toUpperCase());
                ConsumerMode accessMode = ConsumerMode.valueOf((String)content[2].trim().toUpperCase());
                String alias = content[3].trim();
                LinkedList<String> internalStreamInfo = new LinkedList<String>();
                for (int i = 4; i < content.length; ++i) {
                    internalStreamInfo.add(content[i].trim());
                }
                answer = this.registerStream(alias, streamType, accessMode, internalStreamInfo);
                break;
            }
            case STREAM_STATUS: {
                assert (content.length == 2);
                UUID statusStreamId = UUID.fromString(content[1].trim());
                answer = this.getStreamStatus(statusStreamId);
                break;
            }
            case CLOSE_STREAM: {
                assert (content.length == 2);
                UUID closeStreamId = UUID.fromString(content[1].trim());
                answer = this.closeStream(closeStreamId);
                break;
            }
            case POLL: {
                assert (content.length == 2);
                UUID pollStreamId = UUID.fromString(content[1].trim());
                answer = this.pollFromStream(pollStreamId);
                break;
            }
            case STOP: {
                assert (content.length == 1);
                LOGGER.info("Received STOP request, stopping...");
                answer = null;
            }
        }
        return answer;
    }

    private String registerStream(String alias, StreamType streamType, ConsumerMode accessMode, List<String> internalStreamInfo) {
        if (!(alias == null || alias.isEmpty() || "null".equals(alias) || "None".equals(alias))) {
            UUID id = this.registeredStreamAlias.get(alias);
            if (id != null) {
                if (DEBUG) {
                    LOGGER.debug("Stream " + alias + " was already registered.");
                    LOGGER.debug("Retrieving stream from registered streams with id = " + id);
                }
                return id.toString();
            }
            id = this.addNewStream(alias, streamType, accessMode, internalStreamInfo);
            this.registeredStreamAlias.put(alias, id);
            return id.toString();
        }
        UUID id = this.addNewStream(alias, streamType, accessMode, internalStreamInfo);
        return id.toString();
    }

    private UUID addNewStream(String alias, StreamType streamType, ConsumerMode accessMode, List<String> internalStreamInfo) {
        UUID id = UUID.randomUUID();
        if (DEBUG) {
            LOGGER.debug("Registering new Stream with alias = " + alias + " and id " + id.toString());
        }
        StreamInfo streamInfo = new StreamInfo(id, alias, streamType, accessMode, internalStreamInfo);
        this.registeredStreams.put(id, streamInfo);
        return id;
    }

    private String getStreamStatus(UUID streamId) {
        StreamInfo streamInfo = this.registeredStreams.get(streamId);
        if (streamInfo == null) {
            LOGGER.warn("Skipping status on unregistered stream with ID = " + streamId.toString());
            return "";
        }
        return String.valueOf(streamInfo.isStreamClosed());
    }

    private String closeStream(UUID streamId) {
        StreamInfo streamInfo = this.registeredStreams.get(streamId);
        if (streamInfo == null) {
            LOGGER.warn("Skipping close on unregistered stream with ID = " + streamId.toString());
            return "";
        }
        streamInfo.markAsClosed();
        return "";
    }

    private String pollFromStream(UUID streamId) {
        StreamInfo streamInfo = this.registeredStreams.get(streamId);
        if (streamInfo == null) {
            LOGGER.warn("Skipping poll on unregistered stream with ID = " + streamId.toString());
            return "";
        }
        switch (streamInfo.getStreamType()) {
            case FILE: {
                return this.pollFileStream(streamInfo);
            }
            case OBJECT: {
                LOGGER.warn("Skipping poll on OBJECT stream with ID = " + streamId.toString());
                return "";
            }
            case PSCO: {
                LOGGER.warn("Skipping poll on PSCO stream with ID = " + streamId.toString());
                return "";
            }
        }
        return "";
    }

    private String pollFileStream(StreamInfo streamInfo) {
        long lastPollTimestamp;
        List<String> internalStreamInfo = streamInfo.getInternalStreamInfo();
        String baseFolderPath = internalStreamInfo.get(0);
        if (DEBUG) {
            LOGGER.debug("Polling new files at " + baseFolderPath);
        }
        Path baseFolder = Paths.get(baseFolderPath, new String[0]);
        List<Object> filePaths = new ArrayList();
        try {
            filePaths = Files.list(baseFolder).collect(Collectors.toList());
        }
        catch (IOException ioe) {
            LOGGER.warn("Cannot list files in baseFolder. Returning empty file list.", (Throwable)ioe);
        }
        ArrayList<Path> newFilePaths = new ArrayList<Path>();
        long lastTimestamp = lastPollTimestamp = streamInfo.getLastPollTimestamp();
        for (Path path : filePaths) {
            long fileModificationTimestamp = lastPollTimestamp;
            try {
                fileModificationTimestamp = Files.getLastModifiedTime(path, new LinkOption[0]).toMillis();
            }
            catch (IOException ioe) {
                LOGGER.warn("Cannot retrieve modification date from " + path.getFileName() + ". Skipping file...", (Throwable)ioe);
            }
            if (fileModificationTimestamp <= lastPollTimestamp) continue;
            newFilePaths.add(path);
            if (fileModificationTimestamp < lastTimestamp) continue;
            lastTimestamp = fileModificationTimestamp;
        }
        streamInfo.setPollTimestamp(lastTimestamp);
        if (DEBUG) {
            LOGGER.debug("Registered new files: " + newFilePaths);
        }
        return newFilePaths.stream().map(s -> String.valueOf(s)).collect(Collectors.joining(" ", "", ""));
    }

    private void setStopFlag() {
        LOGGER.info("DS Server marked to stop");
        this.keepRunning = false;
    }

    private void autoSendStopRequest() {
        StopRequest sr = new StopRequest();
        try (Socket socket = new Socket(this.serverName, this.serverPort);
             PrintWriter out = new PrintWriter(socket.getOutputStream(), true);){
            String reqMsg = sr.getRequestMessage();
            out.println(reqMsg);
        }
        catch (IOException ioe) {
            LOGGER.error("Error auto-sending stop request", (Throwable)ioe);
            sr.setError(1, ioe.getMessage());
        }
    }

    public static void initAndStart(String serverName, Integer serverPort, StreamBackend streamBackend) {
        server = new DistroStreamServer(serverName, serverPort, streamBackend);
        server.setName("DistroStreamServer");
        server.start();
    }

    public static void setStop() {
        server.setStopFlag();
        server.autoSendStopRequest();
    }
}

