/*
 * Decompiled with CFR 0.152.
 */
package storm.cookbook;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import com.github.quintona.KafkaState;
import com.github.quintona.KafkaStateUpdater;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import pattern.ClassifierFunction;
import storm.kafka.KafkaConfig;
import storm.kafka.trident.TransactionalTridentKafkaSpout;
import storm.kafka.trident.TridentKafkaConfig;
import storm.trident.TridentTopology;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.Filter;
import storm.trident.operation.Function;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.builtin.Debug;
import storm.trident.spout.IPartitionedTridentSpout;
import storm.trident.state.StateUpdater;
import storm.trident.tuple.TridentTuple;

public class OrderManagementTopology {
    public static List<String> getFieldNames(int properyCount) {
        ArrayList<String> names = new ArrayList<String>(properyCount);
        int i = 1;
        while (i <= properyCount) {
            names.add("Value" + Integer.toString(i));
            ++i;
        }
        return names;
    }

    public static TridentTopology makeTopology(int properyCount) throws IOException {
        TridentTopology topology = new TridentTopology();
        TridentKafkaConfig spoutConfig = new TridentKafkaConfig((KafkaConfig.BrokerHosts)KafkaConfig.StaticHosts.fromHostString(Arrays.asList("localhost"), (int)2), "orders");
        List<String> valueNames = OrderManagementTopology.getFieldNames(properyCount);
        ArrayList<String> allFields = new ArrayList<String>(1);
        allFields.addAll(valueNames);
        allFields.add("order-id");
        topology.newStream("kafka", (IPartitionedTridentSpout)new TransactionalTridentKafkaSpout(spoutConfig)).each(new Fields(new String[]{"bytes"}), (Function)new CoerceInFunction(), new Fields(allFields)).each(new Fields(valueNames), (Function)new ClassifierFunction("/usr/local/random_forest.xml"), new Fields(new String[]{"prediction"})).each(new Fields(new String[]{"prediction"}), (Filter)new Debug("Prediction")).each(new Fields(new String[]{"prediction"}), (Function)new EnrichFunction(), new Fields(new String[]{"dispatch-to"})).each(new Fields(new String[]{"order-id", "dispatch-to"}), (Function)new CoerceOutFunction(), new Fields(new String[]{"message"})).partitionPersist(KafkaState.transactional((String)"order-output", (KafkaState.Options)new KafkaState.Options()), new Fields(new String[]{"message"}), (StateUpdater)new KafkaStateUpdater("message"), new Fields(new String[]{"message"}));
        return topology;
    }

    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        conf.setDebug(true);
        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopology((String)args[0], (Map)conf, (StormTopology)OrderManagementTopology.makeTopology(10).build());
        } else {
            conf.setMaxTaskParallelism(3);
            conf.put((Object)Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("127.0.0.1"));
            conf.put((Object)Config.STORM_ZOOKEEPER_PORT, (Object)2181);
            conf.put((Object)Config.STORM_ZOOKEEPER_ROOT, (Object)"/storm");
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("transactional-topology", (Map)conf, OrderManagementTopology.makeTopology(10).build());
        }
    }

    public static class CoerceInFunction
    extends BaseFunction {
        public void execute(TridentTuple tuple, TridentCollector collector) {
            String text = new String(tuple.getBinary(0));
            JSONArray array = (JSONArray)JSONValue.parse((String)text);
            ArrayList<Object> values = new ArrayList<Object>(array.size());
            String id = (String)array.get(array.size() - 1);
            array.remove(array.size() - 1);
            for (Object obj : array) {
                values.add(Double.parseDouble((String)obj));
            }
            values.add(id);
            if (array.size() > 0) {
                collector.emit((List)new Values(values.toArray()));
            }
        }
    }

    public static class CoerceOutFunction
    extends BaseFunction {
        public void execute(TridentTuple tuple, TridentCollector collector) {
            JSONObject obj = new JSONObject();
            obj.put((Object)"order-id", (Object)tuple.getStringByField("order-id"));
            obj.put((Object)"dispatch-to", (Object)tuple.getStringByField("dispatch-to"));
            collector.emit((List)new Values(new Object[]{obj.toJSONString()}));
        }
    }

    public static class EnrichFunction
    extends BaseFunction {
        public void execute(TridentTuple tuple, TridentCollector collector) {
            String prediction = tuple.getStringByField("prediction");
            if ("0".equals(prediction)) {
                collector.emit((List)new Values(new Object[]{"Hub1"}));
            }
            if ("1".equals(prediction)) {
                collector.emit((List)new Values(new Object[]{"Hub2"}));
            }
        }
    }
}

