/*
 * Decompiled with CFR 0.152.
 */
package stormy.pythian.component.preprocessor;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import com.google.common.util.concurrent.AtomicDouble;
import java.util.List;
import storm.trident.Stream;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.Function;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
import stormy.pythian.model.annotation.InputStream;
import stormy.pythian.model.annotation.Mapper;
import stormy.pythian.model.annotation.MappingType;
import stormy.pythian.model.annotation.OutputStream;
import stormy.pythian.model.component.Component;
import stormy.pythian.model.instance.FeatureFunction;
import stormy.pythian.model.instance.FeatureProcedure;
import stormy.pythian.model.instance.InputUserSelectionFeaturesMapper;
import stormy.pythian.model.instance.Instance;

public class Normalizer
implements Component {
    private static final long serialVersionUID = 3259186264532961799L;
    @InputStream(name="in", type=MappingType.USER_SELECTION)
    private Stream in;
    @OutputStream(name="out")
    private Stream out;
    @Mapper(stream="in")
    private InputUserSelectionFeaturesMapper mapper;

    public void init() {
        this.out = this.in.each(new Fields(new String[]{"INSTANCE_FIELD"}), (Function)new NormalizerFunction(this.mapper), new Fields(new String[]{"NEW_INSTANCE_FIELD"}));
    }

    private static class NormalizerFunction
    extends BaseFunction {
        private static final long serialVersionUID = -5433556581807138157L;
        private final InputUserSelectionFeaturesMapper mapper;

        public NormalizerFunction(InputUserSelectionFeaturesMapper mapper) {
            this.mapper = mapper;
        }

        public void execute(TridentTuple tuple, TridentCollector collector) {
            Instance original = Instance.from((TridentTuple)tuple);
            final AtomicDouble atomicMagnitude = new AtomicDouble(0.0);
            original.process(this.mapper, (FeatureProcedure)new FeatureProcedure<Double>(){

                public void process(Double feature) {
                    atomicMagnitude.getAndAdd(feature * feature);
                }
            });
            final double magnitude = Math.sqrt(atomicMagnitude.get());
            Instance newInstance = original.transform(this.mapper, (FeatureFunction)new FeatureFunction<Double>(){

                public Double transform(Double feature) {
                    return feature / magnitude;
                }
            });
            collector.emit((List)new Values(new Object[]{newInstance}));
        }
    }
}

