/*
 * Decompiled with CFR 0.152.
 */
package es.bsc.comm;

import es.bsc.comm.Connection;
import es.bsc.comm.MessageHandler;
import es.bsc.comm.Node;
import es.bsc.comm.event.Event;
import es.bsc.comm.exceptions.CommException;
import es.bsc.comm.stage.Reception;
import es.bsc.comm.stage.Submission;
import es.bsc.comm.stage.Transfer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public abstract class EventManager<T extends Event>
extends Thread {
    private static final Logger LOGGER = LogManager.getLogger("es.bsc.comm.EventManager");
    private static final boolean DEBUG = LOGGER.isDebugEnabled();
    private final BlockingQueue<T> events;
    private volatile boolean stopReceived;
    private final MessageHandler mh;

    public EventManager(MessageHandler messageHandler) {
        super("EventManager");
        LOGGER.info("Instantiating EventManager");
        this.mh = messageHandler;
        this.events = new LinkedBlockingQueue<T>();
        this.stopReceived = false;
    }

    public abstract void init(String var1) throws CommException;

    public abstract void startServer(Node var1) throws CommException;

    @Override
    public void run() {
        LOGGER.info("EventManager started");
        while (!this.stopReceived) {
            this.specificActions();
            this.processEvent();
        }
        LOGGER.info("EventManager begins shutdown process");
        this.handleSpecificStop();
        this.mh.shutdown();
        LOGGER.info("EventManager stopped");
    }

    protected final void processEvent() {
        Event event = null;
        try {
            event = (Event)this.events.take();
        }
        catch (InterruptedException e) {
            this.interrupt();
        }
        if (event != null) {
            if (DEBUG) {
                LOGGER.debug("Processing " + (Object)((Object)event.getEventImplementationType()) + " event");
                this.logProcessingEvent(event);
            }
            event.processEventOnConnection(this);
        }
    }

    protected abstract void specificActions();

    protected abstract void logProcessingEvent(Event var1);

    protected abstract void handleSpecificStop();

    public final void addEvent(T e) {
        if (!this.events.offer(e)) {
            LOGGER.error("ERROR in Comm EventManager. Event " + (Object)((Object)e.getEventImplementationType()) + " has not been added to the event queue! Message Lost ");
        }
    }

    public final void shutdown() {
        LOGGER.info("Shutting down EventManager");
        this.stopReceived = true;
        this.addEvent(this.createEndEvent());
    }

    protected abstract T createEndEvent();

    public final void notifyError(Connection con, Transfer t, CommException exception) {
        this.mh.errorHandler(con, t, exception);
    }

    public final void dataReceived(Connection c, Reception t) {
        this.mh.dataReceived(c, t);
    }

    public final void commandReceived(Connection c, Reception t) {
        this.mh.commandReceived(c, t);
    }

    public final void writeFinished(Connection c, Submission t) {
        this.mh.writeFinished(c, t);
    }

    public final void connectionFinished(Connection c) {
        this.mh.connectionFinished(c);
    }

    public abstract Connection startConnection(Node var1);

    public abstract void shutdown(Connection var1);
}

