/*
 * Decompiled with CFR 0.152.
 */
package es.bsc.distrostreamlib.api.objects;

import es.bsc.distrostreamlib.api.objects.ODSProperties;
import es.bsc.distrostreamlib.exceptions.BackendException;
import es.bsc.distrostreamlib.types.ConsumerMode;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.DeletedRecords;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ODSConsumer<T> {
    private static final Logger LOGGER = LogManager.getLogger((String)"es.bsc.distroStreamLib.stream.ObjectDistroStream.Consumer");
    private static final boolean DEBUG = LOGGER.isDebugEnabled();
    private final String topicName;
    private final ConsumerMode mode;
    private final AdminClient adminClient;
    private final KafkaConsumer<String, T> kafkaConsumer;

    public ODSConsumer(String bootstrapServer, String topicName, ConsumerMode mode) throws BackendException {
        LOGGER.debug("Creating Consumer...");
        this.topicName = topicName;
        this.mode = mode;
        this.adminClient = this.registerAdminClient(bootstrapServer);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServer);
        properties.put("group.id", "compss" + UUID.randomUUID());
        properties.putAll(ODSProperties.DEFAULT_CONSUMER_PROPERTIES);
        this.kafkaConsumer = new KafkaConsumer(properties);
        List<String> allTopics = Arrays.asList(this.topicName, "system-messages");
        this.kafkaConsumer.subscribe(allTopics);
        LOGGER.debug("DONE Creating Consumer");
    }

    private AdminClient registerAdminClient(String bootstrapServer) {
        HashMap<String, String> adminConf = new HashMap<String, String>();
        adminConf.put("bootstrap.servers", bootstrapServer);
        adminConf.put("request.timeout.ms", "5000");
        return AdminClient.create(adminConf);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final List<T> pollMessages(long timeout) {
        ConsumerRecords records;
        LOGGER.debug("Polling Messages from " + this.topicName + " ...");
        KafkaConsumer<String, T> kafkaConsumer = this.kafkaConsumer;
        synchronized (kafkaConsumer) {
            records = this.kafkaConsumer.poll(Duration.ofMillis(timeout));
        }
        LinkedList<Object> messages = new LinkedList<Object>();
        HashMap<TopicPartition, RecordsToDelete> toDelete = new HashMap<TopicPartition, RecordsToDelete>();
        for (ConsumerRecord record : records) {
            if (record.topic().equals(this.topicName)) {
                Object msg = record.value();
                messages.add(msg);
                if (!this.mode.equals((Object)ConsumerMode.AT_MOST_ONCE)) continue;
                toDelete.put(new TopicPartition(record.topic(), record.partition()), RecordsToDelete.beforeOffset((long)record.offset()));
                continue;
            }
            if (record.topic().equals("system-messages")) {
                this.processSystemValue(record.value());
                continue;
            }
            this.processUnknownTopicValue(record.topic(), record.value());
        }
        if (this.mode.equals((Object)ConsumerMode.AT_MOST_ONCE) && !toDelete.isEmpty()) {
            this.eraseRecords(toDelete);
        }
        if (DEBUG) {
            LOGGER.debug("DONE Polling Messages (" + messages.size() + " elements)");
        }
        return messages;
    }

    private void eraseRecords(Map<TopicPartition, RecordsToDelete> toDelete) {
        LOGGER.debug("Deleting processed records...");
        DeleteRecordsResult deleted = this.adminClient.deleteRecords(toDelete);
        for (Map.Entry recordToDelete : deleted.lowWatermarks().entrySet()) {
            try {
                ((DeletedRecords)((KafkaFuture)recordToDelete.getValue()).get()).lowWatermark();
            }
            catch (InterruptedException | ExecutionException e) {
                LOGGER.error("ERROR: Cannot erase record, trying to proceed anyway...", (Throwable)e);
            }
        }
        LOGGER.debug("DONE Deleting processed records");
    }

    private void processSystemValue(T value) {
        LOGGER.info("Received [Topic=System] : " + value);
    }

    private void processUnknownTopicValue(String topic, T value) {
        LOGGER.info("Received [Topic=" + topic + "] : " + value);
    }
}

