/*
 * Decompiled with CFR 0.152.
 */
package stormy.pythian.component.learner.tridentml;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import com.github.pmerienne.trident.ml.classification.Classifier;
import com.github.pmerienne.trident.ml.classification.ClassifierUpdater;
import com.github.pmerienne.trident.ml.classification.ClassifyQuery;
import java.util.List;
import storm.trident.Stream;
import storm.trident.TridentState;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.Function;
import storm.trident.operation.TridentCollector;
import storm.trident.state.QueryFunction;
import storm.trident.state.StateFactory;
import storm.trident.state.StateUpdater;
import storm.trident.tuple.TridentTuple;
import stormy.pythian.component.common.AddFeature;
import stormy.pythian.model.annotation.ExpectedFeature;
import stormy.pythian.model.annotation.InputStream;
import stormy.pythian.model.annotation.Mapper;
import stormy.pythian.model.annotation.MappingType;
import stormy.pythian.model.annotation.OutputStream;
import stormy.pythian.model.annotation.Property;
import stormy.pythian.model.annotation.State;
import stormy.pythian.model.component.Component;
import stormy.pythian.model.instance.InputUserSelectionFeaturesMapper;
import stormy.pythian.model.instance.Instance;
import stormy.pythian.model.instance.OutputFixedFeaturesMapper;

public abstract class TridentMLClassifier<L>
implements Component {
    private static final long serialVersionUID = 1L;
    public static final String PREDICTION_FEATURE = "Prediction";
    private static final String TRIDENT_ML_INSTANCE_FIELD = "TRIDENT_ML_INSTANCE_FIELD";
    private static final String TRIDENT_ML_PREDICTION_FIELD = "TRIDENT_ML_PREDICTION_FIELD";
    @InputStream(name="update", type=MappingType.USER_SELECTION)
    private Stream update;
    @Mapper(stream="update")
    private InputUserSelectionFeaturesMapper updateInputMapper;
    @InputStream(name="query", type=MappingType.USER_SELECTION)
    private Stream query;
    @Mapper(stream="query")
    private InputUserSelectionFeaturesMapper queryInputMapper;
    @OutputStream(name="prediction", from="query", newFeatures={@ExpectedFeature(name="Prediction")})
    private Stream prediction;
    @Mapper(stream="prediction")
    private OutputFixedFeaturesMapper predictionOutputMapper;
    @State(name="Classifier's state")
    private StateFactory stateFactory;
    @Property(name="Classifier name", mandatory=true)
    private String classifierName;

    public void initClassifierStreams(Classifier<L> classifier) {
        TridentState classifierState = this.update.each(new Fields(new String[]{"INSTANCE_FIELD"}), new TridentMLInstanceCreator(this.updateInputMapper), new Fields(new String[]{TRIDENT_ML_INSTANCE_FIELD})).partitionPersist(this.stateFactory, new Fields(new String[]{TRIDENT_ML_INSTANCE_FIELD}), (StateUpdater)new ClassifierUpdater(this.classifierName, classifier));
        this.prediction = this.query.each(new Fields(new String[]{"INSTANCE_FIELD"}), new TridentMLInstanceCreator(this.queryInputMapper), new Fields(new String[]{TRIDENT_ML_INSTANCE_FIELD})).stateQuery(classifierState, new Fields(new String[]{TRIDENT_ML_INSTANCE_FIELD}), (QueryFunction)new ClassifyQuery(this.classifierName), new Fields(new String[]{TRIDENT_ML_PREDICTION_FIELD})).each(new Fields(new String[]{"INSTANCE_FIELD", TRIDENT_ML_PREDICTION_FIELD}), (Function)new AddFeature(this.predictionOutputMapper, TRIDENT_ML_PREDICTION_FIELD, PREDICTION_FEATURE), new Fields(new String[]{"NEW_INSTANCE_FIELD"}));
    }

    private static class TridentMLInstanceCreator<L>
    extends BaseFunction {
        private static final long serialVersionUID = 1L;
        private final InputUserSelectionFeaturesMapper inputMapper;

        public TridentMLInstanceCreator(InputUserSelectionFeaturesMapper inputMapper) {
            this.inputMapper = inputMapper;
        }

        public void execute(TridentTuple tuple, TridentCollector collector) {
            Instance pythianInstance = Instance.from((TridentTuple)tuple);
            Object label = pythianInstance.getLabel();
            Object[] selectedFeatures = pythianInstance.getSelectedFeatures(this.inputMapper);
            double[] features = new double[selectedFeatures.length];
            for (int i = 0; i < selectedFeatures.length; ++i) {
                features[i] = selectedFeatures[i] instanceof Number ? ((Number)selectedFeatures[i]).doubleValue() : 0.0;
            }
            com.github.pmerienne.trident.ml.core.Instance tridentMLInstance = new com.github.pmerienne.trident.ml.core.Instance(label, features);
            collector.emit((List)new Values(new Object[]{tridentMLInstance}));
        }
    }
}

