/*
 * Decompiled with CFR 0.152.
 */
package streams.components;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import streams.components.COMPSsStream;
import streams.components.exceptions.InvalidCredentialsException;
import streams.components.exceptions.SubscribeException;
import streams.components.utils.RegistrationId;
import streams.exceptions.ConsumerException;
import streams.types.Result;
import streams.types.Topics;

public class Consumer {
    private static final boolean DEBUG = false;
    private static final int MESSAGE_TIMEOUT = 200;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static Result receiveMessages(COMPSsStream stream) {
        RegistrationId id;
        try {
            id = stream.subscribe(Topics.ALL_TOPICS);
        }
        catch (SubscribeException se) {
            System.err.println("ERROR: Cannot subscribe to stream");
            se.printStackTrace();
            return Result.generate(-1);
        }
        try {
            boolean end = false;
            while (!end) {
                ConsumerRecords<String, String> records = stream.poll(id, 200);
                end = Consumer.processRecords(records);
            }
        }
        catch (InvalidCredentialsException ice) {
            System.err.println("ERROR: Cannot receive messages from stream");
            ice.printStackTrace();
            Result result = Result.generate(-2);
            return result;
        }
        catch (ConsumerException ce) {
            System.err.println("ERROR: Cannot process received records");
            ce.printStackTrace();
            Result result = Result.generate(-3);
            return result;
        }
        finally {
            try {
                stream.unsubscribe(id);
            }
            catch (InvalidCredentialsException ice) {
                System.err.println("ERROR: Cannot unsubscribe from stream");
                ice.printStackTrace();
                return Result.generate(-4);
            }
        }
        return Result.generate(0);
    }

    private static boolean processRecords(ConsumerRecords<String, String> records) throws ConsumerException {
        boolean endRecordReceived = false;
        block8: for (ConsumerRecord<String, String> consumerRecord : records) {
            switch (consumerRecord.topic()) {
                case "regular-messages": {
                    Consumer.processRegularRecord(consumerRecord.value());
                    continue block8;
                }
                case "system-messages": {
                    endRecordReceived = Consumer.processSystemRecord(consumerRecord.value());
                    continue block8;
                }
            }
            throw new ConsumerException("ERROR: Unrecognised topic " + consumerRecord.topic());
        }
        return endRecordReceived;
    }

    private static void processRegularRecord(String value) throws ConsumerException {
        JsonNode msg = null;
        try {
            msg = new ObjectMapper().readTree(value);
        }
        catch (JsonProcessingException jpe) {
            jpe.printStackTrace();
            throw new ConsumerException("ERROR: Cannot read regular message", jpe);
        }
        catch (IOException ioe) {
            ioe.printStackTrace();
            throw new ConsumerException("ERROR: Cannot read regular message", ioe);
        }
        switch (msg.get("type").asText()) {
            case "message": {
                long latency = (long)(((double)System.nanoTime() * 1.0E-9 - msg.get("t").asDouble()) * 1000.0);
                Result.addValue(latency);
                break;
            }
            case "stats": {
                break;
            }
            default: {
                throw new ConsumerException("ERROR: Unrecognised message type: " + msg.get("type").asText());
            }
        }
    }

    private static boolean processSystemRecord(String value) throws ConsumerException {
        JsonNode msg = null;
        try {
            msg = new ObjectMapper().readTree(value);
        }
        catch (JsonProcessingException jpe) {
            throw new ConsumerException("ERROR: Cannot read system message", jpe);
        }
        catch (IOException ioe) {
            throw new ConsumerException("ERROR: Cannot read system message", ioe);
        }
        switch (msg.get("type").asText()) {
            case "end": {
                return true;
            }
        }
        throw new ConsumerException("ERROR: Unrecognised message type: " + msg.get("type").asText());
    }
}

