/*
 * Decompiled with CFR 0.152.
 */
package com.github.pmerienne.trident.cf;

import backtype.storm.Config;
import backtype.storm.tuple.Fields;
import com.github.pmerienne.trident.cf.aggregator.KeepFirst;
import com.github.pmerienne.trident.cf.aggregator.PreferencesAggregator;
import com.github.pmerienne.trident.cf.function.TanimotoCoefficientSimilarity;
import com.github.pmerienne.trident.cf.function.TopNRecommendedItems;
import com.github.pmerienne.trident.cf.function.UserPairCreator;
import com.github.pmerienne.trident.cf.model.RecommendedItem;
import com.github.pmerienne.trident.cf.model.SimilarUser;
import com.github.pmerienne.trident.cf.model.UserPair;
import com.github.pmerienne.trident.cf.model.WeightedPreferences;
import com.github.pmerienne.trident.cf.state.memory.MemoryMapMultimapState;
import com.github.pmerienne.trident.cf.state.memory.MemorySetMultiMapState;
import com.github.pmerienne.trident.cf.state.memory.MemorySetState;
import com.github.pmerienne.trident.cf.state.memory.MemorySortedSetMultiMapState;
import com.github.pmerienne.trident.cf.state.query.PreferenceCountQuery;
import com.github.pmerienne.trident.cf.state.query.SimilarUsersQuery;
import com.github.pmerienne.trident.cf.state.query.UserPreferencesQuery;
import com.github.pmerienne.trident.cf.state.query.UserSimilarityQuery;
import com.github.pmerienne.trident.cf.state.query.UsersWithCoPreferenceCountQuery;
import com.github.pmerienne.trident.cf.state.query.UsersWithPreferenceQuery;
import com.github.pmerienne.trident.cf.state.redis.RedisMapMultimapState;
import com.github.pmerienne.trident.cf.state.redis.RedisSetMultiMapState;
import com.github.pmerienne.trident.cf.state.redis.RedisSetState;
import com.github.pmerienne.trident.cf.state.redis.RedisSortedSetMultiMapState;
import com.github.pmerienne.trident.cf.state.updater.AddToUserList;
import com.github.pmerienne.trident.cf.state.updater.CoPreferenceCountUpdater;
import com.github.pmerienne.trident.cf.state.updater.GetAndClearUpdatedUsers;
import com.github.pmerienne.trident.cf.state.updater.PreferredItemUpdater;
import com.github.pmerienne.trident.cf.state.updater.UserPreferenceUpdater;
import com.github.pmerienne.trident.cf.state.updater.UserSimilarityUpdater;
import storm.trident.Stream;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.CombinerAggregator;
import storm.trident.operation.Function;
import storm.trident.state.QueryFunction;
import storm.trident.state.StateFactory;
import storm.trident.state.StateUpdater;

public class TridentCollaborativeFiltering {
    public static final String USER_FIELD = "user";
    public static final String ITEM_FIELD = "item";
    public static final String USER2_FIELD = "user2";
    public static final String SIMILARITY_FIELD = "similarity";
    public static final String RECOMMENDED_ITEMS_FIELD = "recommendedItems";
    private static final String USER_PAIR_FIELD = "userPair";
    private static final String UNIQUE_USER_PAIR_FIELD = "uniqueUserPair";
    private static final String CO_PREFERENCE_COUNT = "coPreferenceCount";
    private static final String PREFERENCE_COUNT1_FIELD = "preferenceCount1";
    private static final String PREFERENCE_COUNT2_FIELD = "preferenceCount2";
    private static final String USER1_PREFERENCES = "user1Preferences";
    private static final String USER2_PREFERENCES = "user2Preferences";
    private static final String WEIGHTED_PREFERENCES_FIELD = "preferences";
    private StateFactory updatedUsersStateFactory;
    private StateFactory userPreferencesStateFactory;
    private StateFactory preferredItemsStateFactory;
    private StateFactory coPreferenceCountStateFactory;
    private StateFactory userSimilarityStateFactory;
    private TridentState userPreferencesState;
    private TridentState preferredItemsState;
    private TridentState coPreferenceCountState;
    private TridentState userSimilarityState;
    private int singleUserOperationsForPreferenceUpdateParallelism;
    private int userPairOperationsForPreferenceUpdateParallelism;
    private int singleUserOperationsForSimilarityUpdateParallelism;
    private int userPairOperationsForSimilarityUpdateParallelism;

    public TridentCollaborativeFiltering(TridentTopology topology, Options options) {
        this.updatedUsersStateFactory = options.updatedUsersStateFactory;
        this.userPreferencesStateFactory = options.userPreferencesStateFactory;
        this.preferredItemsStateFactory = options.preferredItemsStateFactory;
        this.coPreferenceCountStateFactory = options.coPreferenceCountStateFactory;
        this.userSimilarityStateFactory = options.userSimilarityStateFactory;
        this.singleUserOperationsForPreferenceUpdateParallelism = options.singleUserOperationsForPreferenceUpdateParallelism;
        this.userPairOperationsForPreferenceUpdateParallelism = options.userPairOperationsForPreferenceUpdateParallelism;
        this.singleUserOperationsForSimilarityUpdateParallelism = options.singleUserOperationsForSimilarityUpdateParallelism;
        this.userPairOperationsForSimilarityUpdateParallelism = options.userPairOperationsForSimilarityUpdateParallelism;
        this.initStaticStates(topology);
    }

    public TridentCollaborativeFiltering(TridentTopology topology) {
        this(topology, new Options());
    }

    public void registerKryoSerializers(Config config) {
        config.registerSerialization(RecommendedItem.class);
        config.registerSerialization(SimilarUser.class);
        config.registerSerialization(UserPair.class);
        config.registerSerialization(WeightedPreferences.class);
    }

    protected void initStaticStates(TridentTopology topology) {
        this.userPreferencesState = topology.newStaticState(this.userPreferencesStateFactory);
        this.preferredItemsState = topology.newStaticState(this.preferredItemsStateFactory);
        this.coPreferenceCountState = topology.newStaticState(this.coPreferenceCountStateFactory);
    }

    public void appendCollaborativeFilteringTopology(Stream preferenceStream, Stream similaritiesUpdateStream) {
        this.appendUpdateUserPreferencesTopology(preferenceStream);
        this.appendUpdateUserSimilaritiesTopology(similaritiesUpdateStream);
    }

    public void appendUpdateUserPreferencesTopology(Stream preferenceStream) {
        preferenceStream.partitionPersist(this.userPreferencesStateFactory, new Fields(new String[]{USER_FIELD, ITEM_FIELD}), (StateUpdater)new UserPreferenceUpdater(), new Fields(new String[]{USER_FIELD, ITEM_FIELD})).parallelismHint(this.singleUserOperationsForPreferenceUpdateParallelism).newValuesStream().partitionPersist(this.preferredItemsStateFactory, new Fields(new String[]{USER_FIELD, ITEM_FIELD}), (StateUpdater)new PreferredItemUpdater(), new Fields(new String[]{USER_FIELD, ITEM_FIELD})).parallelismHint(this.singleUserOperationsForPreferenceUpdateParallelism).newValuesStream().partitionPersist(this.updatedUsersStateFactory, new Fields(new String[]{USER_FIELD, ITEM_FIELD}), (StateUpdater)new AddToUserList(), new Fields(new String[]{USER_FIELD, ITEM_FIELD})).parallelismHint(this.singleUserOperationsForPreferenceUpdateParallelism).newValuesStream().stateQuery(this.preferredItemsState, new Fields(new String[]{ITEM_FIELD}), (QueryFunction)new UsersWithPreferenceQuery(), new Fields(new String[]{USER2_FIELD})).parallelismHint(this.singleUserOperationsForPreferenceUpdateParallelism).each(new Fields(new String[]{USER_FIELD, USER2_FIELD}), (Function)new UserPairCreator(), new Fields(new String[]{USER_PAIR_FIELD})).parallelismHint(this.userPairOperationsForPreferenceUpdateParallelism).groupBy(new Fields(new String[]{USER_PAIR_FIELD, ITEM_FIELD})).aggregate(new Fields(new String[]{USER_PAIR_FIELD}), new KeepFirst(), new Fields(new String[]{UNIQUE_USER_PAIR_FIELD})).parallelismHint(this.userPairOperationsForPreferenceUpdateParallelism).partitionPersist(this.coPreferenceCountStateFactory, new Fields(new String[]{UNIQUE_USER_PAIR_FIELD}), (StateUpdater)new CoPreferenceCountUpdater(), new Fields(new String[]{UNIQUE_USER_PAIR_FIELD, CO_PREFERENCE_COUNT})).parallelismHint(this.userPairOperationsForPreferenceUpdateParallelism);
    }

    public void appendUpdateUserSimilaritiesTopology(Stream updateSimilaritiesStream) {
        this.userSimilarityState = updateSimilaritiesStream.partitionPersist(this.updatedUsersStateFactory, (StateUpdater)new GetAndClearUpdatedUsers(), new Fields(new String[]{USER_FIELD})).parallelismHint(this.singleUserOperationsForSimilarityUpdateParallelism).newValuesStream().stateQuery(this.userPreferencesState, new Fields(new String[]{USER_FIELD}), (QueryFunction)new PreferenceCountQuery(), new Fields(new String[]{PREFERENCE_COUNT1_FIELD})).parallelismHint(this.singleUserOperationsForSimilarityUpdateParallelism).stateQuery(this.coPreferenceCountState, new Fields(new String[]{USER_FIELD}), (QueryFunction)new UsersWithCoPreferenceCountQuery(), new Fields(new String[]{USER2_FIELD, CO_PREFERENCE_COUNT})).parallelismHint(this.singleUserOperationsForSimilarityUpdateParallelism).stateQuery(this.userPreferencesState, new Fields(new String[]{USER2_FIELD}), (QueryFunction)new PreferenceCountQuery(), new Fields(new String[]{PREFERENCE_COUNT2_FIELD})).parallelismHint(this.userPairOperationsForSimilarityUpdateParallelism).each(new Fields(new String[]{PREFERENCE_COUNT1_FIELD, PREFERENCE_COUNT2_FIELD, CO_PREFERENCE_COUNT}), (Function)new TanimotoCoefficientSimilarity(), new Fields(new String[]{SIMILARITY_FIELD})).parallelismHint(this.userPairOperationsForSimilarityUpdateParallelism).partitionPersist(this.userSimilarityStateFactory, new Fields(new String[]{USER_FIELD, USER2_FIELD, SIMILARITY_FIELD}), (StateUpdater)new UserSimilarityUpdater()).parallelismHint(this.userPairOperationsForSimilarityUpdateParallelism);
    }

    public Stream createUserSimilarityStream(Stream queryStream) {
        return queryStream.stateQuery(this.userSimilarityState, new Fields(new String[]{USER_FIELD, USER2_FIELD}), (QueryFunction)new UserSimilarityQuery(), new Fields(new String[]{SIMILARITY_FIELD})).project(new Fields(new String[]{SIMILARITY_FIELD}));
    }

    public Stream createItemRecommendationStream(Stream queryStream, int nbItems, int neighborhoodSize) {
        return queryStream.stateQuery(this.userPreferencesState, new Fields(new String[]{USER_FIELD}), (QueryFunction)new UserPreferencesQuery(), new Fields(new String[]{USER1_PREFERENCES})).stateQuery(this.userSimilarityState, new Fields(new String[]{USER_FIELD}), (QueryFunction)new SimilarUsersQuery(neighborhoodSize), new Fields(new String[]{USER2_FIELD, SIMILARITY_FIELD})).stateQuery(this.userPreferencesState, new Fields(new String[]{USER2_FIELD}), (QueryFunction)new UserPreferencesQuery(), new Fields(new String[]{USER2_PREFERENCES})).parallelismHint(neighborhoodSize).aggregate(new Fields(new String[]{USER1_PREFERENCES, USER2_PREFERENCES, SIMILARITY_FIELD}), (CombinerAggregator)new PreferencesAggregator(), new Fields(new String[]{WEIGHTED_PREFERENCES_FIELD})).parallelismHint(neighborhoodSize / 2).each(new Fields(new String[]{WEIGHTED_PREFERENCES_FIELD}), (Function)new TopNRecommendedItems(nbItems), new Fields(new String[]{RECOMMENDED_ITEMS_FIELD})).project(new Fields(new String[]{RECOMMENDED_ITEMS_FIELD}));
    }

    public static class Options {
        public StateFactory updatedUsersStateFactory = new MemorySetState.Factory();
        public StateFactory userPreferencesStateFactory = new MemorySetMultiMapState.Factory();
        public StateFactory preferredItemsStateFactory = new MemorySetMultiMapState.Factory();
        public StateFactory coPreferenceCountStateFactory = new MemoryMapMultimapState.Factory();
        public StateFactory userSimilarityStateFactory = new MemorySortedSetMultiMapState.Factory();
        public int singleUserOperationsForPreferenceUpdateParallelism = 2;
        public int userPairOperationsForPreferenceUpdateParallelism = 10;
        public int singleUserOperationsForSimilarityUpdateParallelism = 2;
        public int userPairOperationsForSimilarityUpdateParallelism = 20;

        public static Options inMemory() {
            return new Options();
        }

        public static Options redis() {
            Options options = new Options();
            options.updatedUsersStateFactory = new RedisSetState.Factory("users");
            options.userPreferencesStateFactory = new RedisSetMultiMapState.Factory("userPreferences");
            options.preferredItemsStateFactory = new RedisSetMultiMapState.Factory("preferredItems");
            options.coPreferenceCountStateFactory = new RedisMapMultimapState.Factory(TridentCollaborativeFiltering.CO_PREFERENCE_COUNT);
            options.userSimilarityStateFactory = new RedisSortedSetMultiMapState.Factory("userSimilarity");
            return options;
        }
    }
}

