/*
 * Decompiled with CFR 0.152.
 */
package streams.components;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import streams.components.exceptions.AnnounceException;
import streams.components.exceptions.InvalidCredentialsException;
import streams.components.exceptions.SubscribeException;
import streams.components.utils.RegistrationId;

public class COMPSsStream {
    private static final String PROPERTIES_FILE_PATH_CONSUMER = "/home/cramonco/svn/compss/framework/trunk/tests/sources/basic/77-streams/src/main/resources/consumer.props";
    private static final String PROPERTIES_FILE_PATH_PRODUCER = "/home/cramonco/svn/compss/framework/trunk/tests/sources/basic/77-streams/src/main/resources/producer.props";
    private final Map<String, List<String>> subscriberIds2consumerIds = new HashMap<String, List<String>>();
    private final Map<String, String> consumerIds2subscriberIds = new HashMap<String, String>();
    private final Map<String, KafkaConsumer<String, String>> consumerIds2consumers = new HashMap<String, KafkaConsumer<String, String>>();
    private final Map<String, List<String>> consumerIds2topics = new HashMap<String, List<String>>();
    private final Map<String, List<String>> subscriberIds2producerIds = new HashMap<String, List<String>>();
    private final Map<String, String> producerIds2subscriberIds = new HashMap<String, String>();
    private final Map<String, KafkaProducer<String, String>> producerIds2producers = new HashMap<String, KafkaProducer<String, String>>();

    public RegistrationId subscribe(List<String> topics) throws SubscribeException {
        RegistrationId registrationId = new RegistrationId();
        Properties properties = new Properties();
        try (FileInputStream fis = new FileInputStream(new File(PROPERTIES_FILE_PATH_CONSUMER));){
            properties.load(fis);
        }
        catch (IOException ioe) {
            throw new SubscribeException("ERROR: Cannot open properties file for subscription", ioe);
        }
        if (properties.getProperty("group.id") == null) {
            properties.setProperty("group.id", "group-" + new Random().nextInt(100000));
        }
        KafkaConsumer consumer = new KafkaConsumer(properties);
        consumer.subscribe(topics);
        this.consumerIds2consumers.put(registrationId.getConsumerId(), consumer);
        this.consumerIds2topics.put(registrationId.getConsumerId(), topics);
        List<String> consumers = this.subscriberIds2consumerIds.get(registrationId.getSubscriberId());
        if (consumers == null) {
            consumers = new LinkedList<String>();
        }
        consumers.add(registrationId.getConsumerId());
        this.subscriberIds2consumerIds.put(registrationId.getSubscriberId(), consumers);
        this.consumerIds2subscriberIds.put(registrationId.getConsumerId(), registrationId.getSubscriberId());
        return registrationId;
    }

    public void unsubscribe(RegistrationId registrationId) throws InvalidCredentialsException {
        if (!this.isConsumerValid(registrationId)) {
            throw new InvalidCredentialsException();
        }
        KafkaConsumer<String, String> consumer = this.consumerIds2consumers.remove(registrationId.getConsumerId());
        consumer.close();
        this.consumerIds2topics.remove(registrationId.getConsumerId());
        this.consumerIds2subscriberIds.remove(registrationId.getConsumerId());
        List<String> consumers = this.subscriberIds2consumerIds.remove(registrationId.getSubscriberId());
        consumers.remove(registrationId.getConsumerId());
        if (!consumers.isEmpty()) {
            this.subscriberIds2consumerIds.put(registrationId.getSubscriberId(), consumers);
        }
    }

    public void updateSubscriptionTopics(RegistrationId registrationId, List<String> newTopics) throws InvalidCredentialsException {
        if (!this.isConsumerValid(registrationId)) {
            throw new InvalidCredentialsException();
        }
        KafkaConsumer<String, String> consumer = this.consumerIds2consumers.get(registrationId.getConsumerId());
        consumer.unsubscribe();
        consumer.subscribe(newTopics);
        this.consumerIds2topics.put(registrationId.getConsumerId(), newTopics);
    }

    public ConsumerRecords<String, String> poll(RegistrationId registrationId, int timeout) throws InvalidCredentialsException {
        if (!this.isConsumerValid(registrationId)) {
            throw new InvalidCredentialsException();
        }
        KafkaConsumer<String, String> consumer = this.consumerIds2consumers.get(registrationId.getConsumerId());
        return consumer.poll(timeout);
    }

    public RegistrationId announce() throws AnnounceException {
        RegistrationId registrationId = new RegistrationId();
        Properties properties = new Properties();
        try (FileInputStream fis = new FileInputStream(new File(PROPERTIES_FILE_PATH_PRODUCER));){
            properties.load(fis);
        }
        catch (IOException ioe) {
            throw new AnnounceException("ERROR: Cannot open properties file for announcement", ioe);
        }
        KafkaProducer producer = new KafkaProducer(properties);
        this.producerIds2producers.put(registrationId.getProducerId(), producer);
        List<String> producers = this.subscriberIds2producerIds.get(registrationId.getSubscriberId());
        if (producers == null) {
            producers = new LinkedList<String>();
        }
        producers.add(registrationId.getProducerId());
        this.subscriberIds2producerIds.put(registrationId.getSubscriberId(), producers);
        this.producerIds2subscriberIds.put(registrationId.getProducerId(), registrationId.getSubscriberId());
        return registrationId;
    }

    public void close(RegistrationId registrationId) throws InvalidCredentialsException {
        if (!this.isProducerValid(registrationId)) {
            throw new InvalidCredentialsException();
        }
        KafkaProducer<String, String> producer = this.producerIds2producers.remove(registrationId.getProducerId());
        producer.close();
        this.producerIds2subscriberIds.remove(registrationId.getProducerId());
        List<String> producers = this.subscriberIds2producerIds.remove(registrationId.getSubscriberId());
        producers.remove(registrationId.getConsumerId());
        if (!producers.isEmpty()) {
            this.subscriberIds2producerIds.put(registrationId.getSubscriberId(), producers);
        }
    }

    public void publish(RegistrationId registrationId, String topic, String message) throws InvalidCredentialsException {
        if (!this.isProducerValid(registrationId)) {
            throw new InvalidCredentialsException();
        }
        KafkaProducer<String, String> producer = this.producerIds2producers.get(registrationId.getProducerId());
        ProducerRecord record = new ProducerRecord(topic, message);
        producer.send(record);
    }

    private boolean isConsumerValid(RegistrationId registrationId) {
        List<String> consumers = this.subscriberIds2consumerIds.get(registrationId.getSubscriberId());
        if (consumers != null) {
            return consumers.contains(registrationId.getConsumerId());
        }
        return false;
    }

    private boolean isProducerValid(RegistrationId registrationId) {
        List<String> producers = this.subscriberIds2producerIds.get(registrationId.getSubscriberId());
        if (producers != null) {
            return producers.contains(registrationId.getProducerId());
        }
        return false;
    }
}

