package cascalog.aggregator;

import cascading.flow.FlowProcess;
import cascading.operation.Aggregator;
import cascading.operation.AggregatorCall;
import cascading.operation.BaseOperation;
import cascading.operation.OperationCall;
import cascading.tuple.Fields;
import cascalog.ParallelAgg;
import cascalog.Util;
import java.util.List;

/* loaded from: input_file:cascalog/aggregator/ClojureParallelAggregator.class */
public class ClojureParallelAggregator extends BaseOperation<Object> implements Aggregator<Object> {
    ParallelAgg agg;

    public ClojureParallelAggregator(Fields fields, ParallelAgg parallelAgg) {
        super(fields);
        this.agg = parallelAgg;
    }

    public void prepare(FlowProcess flowProcess, OperationCall<Object> operationCall) {
        this.agg.prepare(flowProcess);
    }

    public void start(FlowProcess flowProcess, AggregatorCall<Object> aggregatorCall) {
        aggregatorCall.setContext((Object) null);
    }

    public void aggregate(FlowProcess flowProcess, AggregatorCall<Object> aggregatorCall) {
        try {
            List<Object> init = this.agg.init(Util.tupleToList(aggregatorCall.getArguments().getTuple()));
            List<Object> list = (List) aggregatorCall.getContext();
            if (list == null) {
                aggregatorCall.setContext(init);
            } else {
                aggregatorCall.setContext(this.agg.combine(list, init));
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void complete(FlowProcess flowProcess, AggregatorCall<Object> aggregatorCall) {
        try {
            aggregatorCall.getOutputCollector().add(Util.coerceToTuple(aggregatorCall.getContext()));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
