/*
 * Decompiled with CFR 0.152.
 */
package stormy.pythian.component.statistic.aggregation;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.List;
import storm.trident.Stream;
import storm.trident.TridentState;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.CombinerAggregator;
import storm.trident.operation.Function;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.builtin.MapGet;
import storm.trident.state.QueryFunction;
import storm.trident.state.StateFactory;
import storm.trident.tuple.TridentTuple;
import stormy.pythian.component.common.ExtractFeatures;
import stormy.pythian.component.statistic.aggregation.StatisticAggregator;
import stormy.pythian.model.annotation.ExpectedFeature;
import stormy.pythian.model.annotation.InputStream;
import stormy.pythian.model.annotation.Mapper;
import stormy.pythian.model.annotation.MappingType;
import stormy.pythian.model.annotation.Property;
import stormy.pythian.model.annotation.State;
import stormy.pythian.model.component.Component;
import stormy.pythian.model.instance.InputFixedFeaturesMapper;

public abstract class AbstractTimeWindowFeatureStatistic<T>
implements Component {
    @InputStream(name="in", type=MappingType.FIXED_FEATURES, expectedFeatures={@ExpectedFeature(name="Group by", type=Object.class), @ExpectedFeature(name="Date", type=Long.class), @ExpectedFeature(name="Computed feature", type=Number.class)})
    private Stream in;
    @Mapper(stream="in")
    private InputFixedFeaturesMapper inputMapper;
    @State(name="Statistics' state")
    private StateFactory stateFactory;
    @Property(name="Window length (in ms)")
    private Long windowLengthMs;
    @Property(name="Slot precision (in ms)")
    private Long slotLengthMs;
    private static final long serialVersionUID = -5312700259983804231L;

    public Stream initOutputStream(StatisticAggregator.AggregableStatistic<T> aggregableStatistic) {
        TridentState statistics = this.in.each(new Fields(new String[]{"INSTANCE_FIELD"}), (Function)new ExtractFeatures(this.inputMapper, "Computed feature", "Group by", "Date"), new Fields(new String[]{"COMPUTED_FEATURE_FIELD", "GROUP_BY_FIELD", "DATE_FIELD"})).each(new Fields(new String[]{"DATE_FIELD"}), (Function)new DateToSlotIndex(this.slotLengthMs), new Fields(new String[]{"SLOT_FIELD"})).groupBy(new Fields(new String[]{"GROUP_BY_FIELD", "SLOT_FIELD"})).persistentAggregate(this.stateFactory, new Fields(new String[]{"INSTANCE_FIELD"}), new StatisticAggregator<T>(this.inputMapper, aggregableStatistic), new Fields(new String[]{"STATISTIC_FIELD"}));
        return this.in.each(new Fields(new String[]{"INSTANCE_FIELD"}), (Function)new ExtractFeatures(this.inputMapper, "Date", "Group by"), new Fields(new String[]{"DATE_FIELD", "GROUP_BY_FIELD"})).each(new Fields(new String[]{"DATE_FIELD"}), (Function)new EmitEventSlots(this.windowLengthMs, this.slotLengthMs), new Fields(new String[]{"SLOT_FIELD"})).stateQuery(statistics, new Fields(new String[]{"GROUP_BY_FIELD", "SLOT_FIELD"}), (QueryFunction)new MapGet(), new Fields(new String[]{"SLOT_STATISTIC_FIELD"})).groupBy(new Fields(new String[]{"INSTANCE_FIELD"})).aggregate(new Fields(new String[]{"INSTANCE_FIELD", "SLOT_STATISTIC_FIELD"}), new GlobalStatisticAggregator<T>(aggregableStatistic), new Fields(new String[]{"STATISTIC_FIELD"})).each(new Fields(new String[]{"INSTANCE_FIELD", "STATISTIC_FIELD"}), new StatisticAggregator.AddStatisticFeatures<T>(aggregableStatistic), new Fields(new String[]{"NEW_INSTANCE_FIELD"}));
    }

    private static class GlobalStatisticAggregator<T>
    implements CombinerAggregator<T> {
        private static final long serialVersionUID = -6358197247831377615L;
        private final StatisticAggregator.AggregableStatistic<T> aggregableStatistic;

        public GlobalStatisticAggregator(StatisticAggregator.AggregableStatistic<T> aggregableStatistic) {
            this.aggregableStatistic = aggregableStatistic;
        }

        public T init(TridentTuple tuple) {
            try {
                Object stat = tuple.getValueByField("SLOT_STATISTIC_FIELD");
                return (T)(stat == null ? this.zero() : stat);
            }
            catch (NullPointerException ex) {
                return this.zero();
            }
        }

        public T combine(T val1, T val2) {
            val1 = val1 != null ? val1 : this.zero();
            val2 = val2 != null ? val2 : this.zero();
            return this.aggregableStatistic.combine(val1, val2);
        }

        public T zero() {
            return this.aggregableStatistic.zero();
        }
    }

    private static class DateToSlotIndex
    extends BaseFunction {
        private static final long serialVersionUID = -2823417821288444544L;
        private final long slotPrecisionMs;

        public DateToSlotIndex(long slotPrecisionMs) {
            this.slotPrecisionMs = slotPrecisionMs;
        }

        public void execute(TridentTuple tuple, TridentCollector collector) {
            long date = (Long)tuple.getValueByField("DATE_FIELD");
            long slotIndex = date - date % this.slotPrecisionMs;
            collector.emit((List)new Values(new Object[]{slotIndex}));
        }
    }

    private static class EmitEventSlots
    extends BaseFunction {
        private static final long serialVersionUID = -6347463039949939944L;
        private final long slotLengthMs;
        private final int nbSlots;

        public EmitEventSlots(long windowLengthMs, long slotLengthMs) {
            this.slotLengthMs = slotLengthMs;
            this.nbSlots = Long.valueOf(windowLengthMs / slotLengthMs).intValue();
        }

        public void execute(TridentTuple tuple, TridentCollector collector) {
            long date = (Long)tuple.getValueByField("DATE_FIELD");
            long baseSlotIndex = date - date % this.slotLengthMs;
            for (int i = 0; i < this.nbSlots; ++i) {
                long slotIndex = baseSlotIndex - (long)i * this.slotLengthMs;
                collector.emit((List)new Values(new Object[]{slotIndex}));
            }
        }
    }
}

