/*
 * Decompiled with CFR 0.152.
 */
package com.github.pmerienne.trident.ml.stats;

import backtype.storm.tuple.Values;
import com.github.pmerienne.trident.ml.core.Instance;
import com.github.pmerienne.trident.ml.stats.StreamStatistics;
import com.github.pmerienne.trident.ml.util.KeysUtil;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import storm.trident.operation.TridentCollector;
import storm.trident.state.BaseStateUpdater;
import storm.trident.state.map.MapState;
import storm.trident.tuple.TridentTuple;

public class StreamStatisticsUpdater
extends BaseStateUpdater<MapState<StreamStatistics>> {
    private static final long serialVersionUID = 1740717206768121351L;
    private String streamName;
    private StreamStatistics initialStatitics;

    public StreamStatisticsUpdater() {
    }

    public StreamStatisticsUpdater(String streamName, StreamStatistics initialStatitics) {
        this.streamName = streamName;
        this.initialStatitics = initialStatitics;
    }

    public void updateState(MapState<StreamStatistics> state, List<TridentTuple> tuples, TridentCollector collector) {
        StreamStatistics streamStatistics = this.getStreamStatistics(state);
        List<Instance<?>> instances = this.extractInstances(tuples);
        this.updateStatistics(streamStatistics, instances);
        state.multiPut(KeysUtil.toKeys(this.streamName), Arrays.asList(streamStatistics));
        for (Instance<?> instance : instances) {
            collector.emit((List)new Values(new Object[]{instance, streamStatistics}));
        }
    }

    protected List<Instance<?>> extractInstances(List<TridentTuple> tuples) {
        ArrayList instances = new ArrayList();
        for (TridentTuple tuple : tuples) {
            Instance instance = (Instance)tuple.get(0);
            instances.add(instance);
        }
        return instances;
    }

    protected void updateStatistics(StreamStatistics streamStatistics, List<Instance<?>> instances) {
        for (Instance<?> instance : instances) {
            streamStatistics.update(instance.features);
        }
    }

    protected StreamStatistics getStreamStatistics(MapState<StreamStatistics> state) {
        List streamStatisticss = state.multiGet(KeysUtil.toKeys(this.streamName));
        StreamStatistics streamStatistics = null;
        if (streamStatisticss != null && !streamStatisticss.isEmpty()) {
            streamStatistics = (StreamStatistics)streamStatisticss.get(0);
        }
        if (streamStatistics == null) {
            streamStatistics = this.initialStatitics;
        }
        return streamStatistics;
    }
}

