/*
 * Decompiled with CFR 0.152.
 */
package es.bsc.distrostreamlib.api.pscos;

import es.bsc.distrostreamlib.api.impl.DistroStreamImpl;
import es.bsc.distrostreamlib.client.DistroStreamClient;
import es.bsc.distrostreamlib.exceptions.BackendException;
import es.bsc.distrostreamlib.exceptions.RegistrationException;
import es.bsc.distrostreamlib.requests.PollRequest;
import es.bsc.distrostreamlib.requests.PublishRequest;
import es.bsc.distrostreamlib.types.ConsumerMode;
import es.bsc.distrostreamlib.types.StreamType;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import storage.StorageException;
import storage.StorageItf;
import storage.StubItf;

public class PscoDistroStream<T>
extends DistroStreamImpl<T> {
    private static final Logger LOGGER = LogManager.getLogger((String)"es.bsc.distroStreamLib.stream.PscoDistroStream");
    private static final boolean DEBUG = LOGGER.isDebugEnabled();

    public PscoDistroStream() {
    }

    public PscoDistroStream(ConsumerMode mode) throws RegistrationException {
        super(null, StreamType.PSCO, mode, new LinkedList<String>());
    }

    public PscoDistroStream(String alias, ConsumerMode mode) throws RegistrationException {
        super(alias, StreamType.PSCO, mode, new LinkedList<String>());
    }

    @Override
    public final void publish(T object) throws BackendException {
        LOGGER.info("Publishing new PSCO object...");
        this.publishPsco(object);
        LOGGER.info("Published new PSCO object");
    }

    @Override
    public final void publish(List<T> objects) throws BackendException {
        LOGGER.info("Publishing new List of PSCOs...");
        for (T obj : objects) {
            this.publishPsco(obj);
        }
        LOGGER.info("Published new List of PSCOs");
    }

    private void publishPsco(T object) throws BackendException {
        LOGGER.debug("Persisting user PSCO...");
        StubItf stub = (StubItf)object;
        if (stub.getID() == null) {
            String alias = UUID.randomUUID().toString();
            stub.makePersistent(alias);
        }
        String pscoId = stub.getID();
        LOGGER.debug("Registering PSCO publish...");
        PublishRequest req = new PublishRequest(this.id, pscoId);
        DistroStreamClient.request(req);
        req.waitProcessed();
        int error = req.getErrorCode();
        if (error != 0) {
            StringBuilder sb = new StringBuilder();
            sb.append("ERROR: Cannot publish stream").append("\n");
            sb.append("  - Error Code: ").append(error).append("\n");
            sb.append("  - Nested Error Message: ").append(req.getErrorMessage()).append("\n");
            throw new BackendException(sb.toString());
        }
        String response = req.getResponseMessage();
        if (DEBUG) {
            LOGGER.debug("Publish stream answer: " + response);
        }
    }

    @Override
    public final List<T> poll() throws BackendException {
        return this.pollPscos();
    }

    @Override
    public final List<T> poll(long timeout) throws BackendException {
        LOGGER.warn("WARN: Ignoring timeout for PscoDistroStream");
        return this.pollPscos();
    }

    private List<T> pollPscos() throws BackendException {
        LOGGER.info("Polling new stream items...");
        PollRequest req = new PollRequest(this.id);
        DistroStreamClient.request(req);
        req.waitProcessed();
        int error = req.getErrorCode();
        if (error != 0) {
            StringBuilder sb = new StringBuilder();
            sb.append("ERROR: Cannot poll stream").append("\n");
            sb.append("  - Error Code: ").append(error).append("\n");
            sb.append("  - Nested Error Message: ").append(req.getErrorMessage()).append("\n");
            throw new BackendException(sb.toString());
        }
        String response = req.getResponseMessage();
        if (DEBUG) {
            LOGGER.debug("Retrieved stream items: " + response);
        }
        ArrayList<Object> newPscos = new ArrayList<Object>();
        if (response != null && !response.isEmpty()) {
            for (String pscoId : response.split(" ")) {
                try {
                    Object object = StorageItf.getByID(pscoId);
                    newPscos.add(object);
                }
                catch (StorageException se) {
                    throw new BackendException("ERROR: Cannot getById PSCO with id = " + pscoId, se);
                }
            }
        }
        return newPscos;
    }

    @Override
    public void readExternal(ObjectInput oi) throws IOException, ClassNotFoundException {
        super.readExternal(oi);
    }

    @Override
    public void writeExternal(ObjectOutput oo) throws IOException {
        super.writeExternal(oo);
    }
}

