/*
 * Decompiled with CFR 0.152.
 */
package net.sf.okapi.lib.concurrent;

import java.util.List;
import java.util.Stack;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import net.sf.okapi.common.pipeline.IPipeline;
import net.sf.okapi.common.pipeline.IPipelineStep;
import net.sf.okapi.common.pipeline.IWorkQueueStep;
import net.sf.okapi.common.pipeline.Pipeline;
import net.sf.okapi.common.pipeline.PipelineReturnValue;
import net.sf.okapi.common.pipelinedriver.IBatchItemContext;
import net.sf.okapi.common.pipelinedriver.PipelineDriver;
import net.sf.okapi.common.pipelinedriver.PipelineDriverUtils;
import net.sf.okapi.lib.concurrent.CallablePipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkQueuePipelineDriver
extends PipelineDriver {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private CompletionService<IPipeline> completionService;
    private ThreadPoolExecutor executor;
    private Stack<CallablePipeline> pipelines;
    private int workQueueCount;

    public WorkQueuePipelineDriver() {
        this(Runtime.getRuntime().availableProcessors());
    }

    public WorkQueuePipelineDriver(int workQueueCount) {
        this.workQueueCount = workQueueCount;
        this.pipelines = new Stack();
        this.executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(workQueueCount);
        this.completionService = new ExecutorCompletionService<IPipeline>(this.executor);
    }

    @Override
    public void processBatch() {
        int threadCount = 0;
        List<IBatchItemContext> bis = this.getBatchItems();
        try {
            IPipeline cp;
            int i;
            threadCount = Math.min(this.workQueueCount, bis.size());
            if (this.pipelines.isEmpty()) {
                for (i = 0; i < threadCount; ++i) {
                    CallablePipeline cp2 = new CallablePipeline(this.clonePipeline(this.getPipeline()));
                    cp2.setId("Thread: " + Integer.toString(threadCount));
                    this.pipelines.push(cp2);
                }
            }
            for (i = 0; i < bis.size(); ++i) {
                IBatchItemContext item = bis.get(i);
                this.displayInput(item);
                cp = null;
                if (this.pipelines.isEmpty()) {
                    cp = this.recyleFinishedPipeline();
                    this.pipelines.push((CallablePipeline)cp);
                }
                cp = this.pipelines.pop();
                this.initializeClonedPipeline((CallablePipeline)cp, item);
                ((CallablePipeline)cp).process(item.getRawDocument(0));
                this.completionService.submit((Callable<IPipeline>)((Object)cp));
                this.logger.info(String.format("Pipeline thread running: %s", ((CallablePipeline)cp).getId()));
            }
            for (i = 0; i < threadCount; ++i) {
                Future<IPipeline> r = this.completionService.take();
                cp = r.get();
                cp.endBatch();
                if (!cp.getState().equals((Object)PipelineReturnValue.SUCCEDED)) {
                    this.logger.error(String.format("Pipeline failure: %s", cp.getId()));
                }
                this.pipelines.push((CallablePipeline)cp);
            }
        }
        catch (InstantiationException e) {
            throw new RuntimeException("Error instantiating copy of pipeline.", e);
        }
        catch (IllegalAccessException e) {
            throw new RuntimeException("Error instantiating copy of pipeline.", e);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Error. Pipeline thread interupted", e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException(String.format("Error pipeline threw an exception. Cannot retrieve pipeline future from workqueue: %s", e.getCause().toString()), e);
        }
    }

    @Override
    public void destroy() {
        super.destroy();
        for (CallablePipeline p : this.pipelines) {
            p.destroy();
            Object var2_2 = null;
        }
        this.executor.shutdownNow();
        if (!this.executor.isShutdown()) {
            this.logger.error("WorkQueue Pipeline did not shutdown. Main thread still running.");
        }
    }

    private CallablePipeline recyleFinishedPipeline() throws InterruptedException, ExecutionException, InstantiationException, IllegalAccessException {
        Future<IPipeline> r = this.completionService.take();
        CallablePipeline cp = (CallablePipeline)r.get();
        cp.endBatch();
        if (!cp.getState().equals((Object)PipelineReturnValue.SUCCEDED)) {
            this.logger.error(String.format("Pipeline failure: %s", cp.getId()));
        }
        return cp;
    }

    private void initializeClonedPipeline(CallablePipeline cp, IBatchItemContext item) {
        for (IPipelineStep step : cp.getSteps()) {
            PipelineDriverUtils.assignRuntimeParameters(this, step, this.getBatchItems().get(0), null);
        }
        cp.startBatch();
        for (IPipelineStep step : cp.getSteps()) {
            PipelineDriverUtils.assignRuntimeParameters(this, step, item, null);
        }
    }

    private IPipeline clonePipeline(IPipeline pipeline) throws InstantiationException, IllegalAccessException {
        Pipeline p = new Pipeline();
        for (IPipelineStep s : pipeline.getSteps()) {
            IPipelineStep cs = (IPipelineStep)s.getClass().newInstance();
            if (s instanceof IWorkQueueStep) {
                IWorkQueueStep wqs = (IWorkQueueStep)s;
                ((IWorkQueueStep)cs).setMainStep(wqs.getMainStep());
                ((IWorkQueueStep)cs).setWorkQueueCount(wqs.getWorkQueueCount());
                ((IWorkQueueStep)cs).init();
            }
            cs.setLastOutputStep(s.isLastOutputStep());
            cs.setParameters(s.getParameters());
            p.addStep(cs);
        }
        return p;
    }
}

