/*
 * Decompiled with CFR 0.152.
 */
package net.sf.okapi.common.pipeline.threaded;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import net.sf.okapi.common.Event;
import net.sf.okapi.common.EventType;
import net.sf.okapi.common.pipeline.IPipeline;
import net.sf.okapi.common.pipeline.IPipelineStep;
import net.sf.okapi.common.pipeline.PipelineReturnValue;
import net.sf.okapi.common.pipeline.threaded.BaseThreadedPipelineStepAdaptor;
import net.sf.okapi.common.pipeline.threaded.ConsumerPipelineStepAdaptor;
import net.sf.okapi.common.pipeline.threaded.ProducerConsumerPipelineStepAdaptor;
import net.sf.okapi.common.pipeline.threaded.ProducerPipelineStepAdaptor;
import net.sf.okapi.common.pipeline.threaded.RuntimeInterruptedException;
import net.sf.okapi.common.resource.RawDocument;

public class ThreadedPipeline
implements IPipeline {
    private static final int DEFAULT_BLOCKING_QUEUE_SIZE = 50;
    private ExecutorService executor;
    private int blockingQueueSize;
    private int totalThreads;
    private volatile PipelineReturnValue state;
    private LinkedList<BaseThreadedPipelineStepAdaptor> threadedSteps;
    private LinkedList<IPipelineStep> steps = new LinkedList();
    private ProducerPipelineStepAdaptor firstThreadedStep;
    private BlockingQueue<Event> previousQueue;
    private BlockingQueue<Event> inputQueue;
    private String id;

    public ThreadedPipeline() {
        this(Executors.newCachedThreadPool(), 50);
    }

    public ThreadedPipeline(ExecutorService executor, int blockingQueueSize) {
        this.executor = executor;
        this.blockingQueueSize = blockingQueueSize;
        this.initialize();
    }

    private void initialize() {
        this.totalThreads = 0;
        this.state = PipelineReturnValue.PAUSED;
        this.threadedSteps = new LinkedList();
        this.inputQueue = new ArrayBlockingQueue<Event>(this.blockingQueueSize, true);
    }

    private void prepareThreadedSteps() {
        ArrayBlockingQueue<Event> queue = null;
        for (IPipelineStep step : this.steps) {
            if (this.threadedSteps.isEmpty()) {
                queue = new ArrayBlockingQueue<Event>(this.blockingQueueSize, true);
                ProducerPipelineStepAdaptor producerStep = new ProducerPipelineStepAdaptor(step);
                producerStep.setProducerQueue(queue);
                producerStep.setInputQueue(this.inputQueue);
                this.executor.submit(producerStep);
                this.threadedSteps.add(producerStep);
                this.firstThreadedStep = producerStep;
            } else if (step == this.steps.getLast()) {
                if (this.previousQueue == null) {
                    throw new RuntimeException("Previous queue should not be null");
                }
                ConsumerPipelineStepAdaptor consumerStep = new ConsumerPipelineStepAdaptor(step);
                consumerStep.setConsumerQueue(this.previousQueue);
                this.executor.submit(consumerStep);
                this.threadedSteps.add(consumerStep);
            } else {
                if (this.previousQueue == null) {
                    throw new RuntimeException("Previous queue should not be null");
                }
                ProducerConsumerPipelineStepAdaptor producerConsumerStep = new ProducerConsumerPipelineStepAdaptor(step);
                queue = new ArrayBlockingQueue(this.blockingQueueSize, true);
                producerConsumerStep.setProducerQueue(queue);
                producerConsumerStep.setConsumerQueue(this.previousQueue);
                this.executor.submit(producerConsumerStep);
                this.threadedSteps.add(producerConsumerStep);
            }
            ++this.totalThreads;
            this.previousQueue = queue;
        }
    }

    @Override
    public void addStep(IPipelineStep step) {
        this.steps.add(step);
    }

    @Override
    public void cancel() {
        for (BaseThreadedPipelineStepAdaptor step : this.threadedSteps) {
            step.cancel();
        }
        this.inputQueue.clear();
        this.state = PipelineReturnValue.CANCELLED;
    }

    @Override
    public PipelineReturnValue getState() {
        return this.state;
    }

    @Override
    public void destroy() {
        for (BaseThreadedPipelineStepAdaptor step : this.threadedSteps) {
            step.destroy();
        }
        this.inputQueue.clear();
        this.inputQueue = null;
        this.clearSteps();
        this.executor.shutdownNow();
        this.state = PipelineReturnValue.DESTROYED;
    }

    @Override
    public void process(RawDocument input) {
        this.process(new Event(EventType.RAW_DOCUMENT, input));
    }

    @Override
    public void process(Event event) {
        this.state = PipelineReturnValue.RUNNING;
        try {
            this.inputQueue.put(event);
        }
        catch (InterruptedException e) {
            throw new RuntimeInterruptedException(e);
        }
    }

    @Override
    public List<IPipelineStep> getSteps() {
        return this.steps;
    }

    @Override
    public void startBatch() {
        this.prepareThreadedSteps();
        try {
            this.inputQueue.put(new Event(EventType.START_BATCH));
        }
        catch (InterruptedException e) {
            throw new RuntimeInterruptedException(e);
        }
    }

    @Override
    public void endBatch() {
        try {
            this.inputQueue.put(new Event(EventType.END_BATCH));
        }
        catch (InterruptedException e) {
            throw new RuntimeInterruptedException(e);
        }
    }

    @Override
    public void clearSteps() {
        this.steps.clear();
        this.threadedSteps.clear();
    }

    @Override
    public void setId(String id) {
        this.id = id;
    }

    @Override
    public String getId() {
        return this.id;
    }
}

