/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.container.request;

import com.linkedin.databus.container.netty.HttpRelay;
import com.linkedin.databus.core.data_model.LogicalSource;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.core.container.ChunkedWritableByteChannel;
import com.linkedin.databus2.core.container.monitoring.mbean.HttpStatisticsCollector;
import com.linkedin.databus2.core.container.request.DatabusRequest;
import com.linkedin.databus2.core.container.request.InvalidRequestParamValueException;
import com.linkedin.databus2.core.container.request.RegisterResponseEntry;
import com.linkedin.databus2.core.container.request.RegisterResponseMetadataEntry;
import com.linkedin.databus2.core.container.request.RequestProcessingException;
import com.linkedin.databus2.core.container.request.RequestProcessor;
import com.linkedin.databus2.schemas.SchemaId;
import com.linkedin.databus2.schemas.SchemaRegistryService;
import com.linkedin.databus2.schemas.VersionedSchema;
import com.linkedin.databus2.schemas.VersionedSchemaSet;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;

public class RegisterRequestProcessor
implements RequestProcessor {
    public static final String MODULE = RegisterRequestProcessor.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    public static final String COMMAND_NAME = "register";
    public static final String SOURCES_PARAM = "sources";
    private final ExecutorService _executorService;
    private final HttpRelay _relay;

    public RegisterRequestProcessor(ExecutorService executorService, HttpRelay relay) {
        this._executorService = executorService;
        this._relay = relay;
    }

    public ExecutorService getExecutorService() {
        return this._executorService;
    }

    public DatabusRequest process(DatabusRequest request) throws IOException, RequestProcessingException {
        try {
            int registerRequestProtocolVersion = 3;
            String registerRequestProtocolVersionStr = request.getParams().getProperty("protocolVersion");
            if (registerRequestProtocolVersionStr != null) {
                try {
                    registerRequestProtocolVersion = Integer.parseInt(registerRequestProtocolVersionStr);
                }
                catch (NumberFormatException e) {
                    LOG.error((Object)("Could not parse /register request protocol version: " + registerRequestProtocolVersionStr));
                    throw new InvalidRequestParamValueException(COMMAND_NAME, "protocolVersion", registerRequestProtocolVersionStr);
                }
                if (registerRequestProtocolVersion < 2 || registerRequestProtocolVersion > 4) {
                    LOG.error((Object)("Out-of-range /register request protocol version: " + registerRequestProtocolVersionStr));
                    throw new InvalidRequestParamValueException(COMMAND_NAME, "protocolVersion", registerRequestProtocolVersionStr);
                }
            }
            ArrayList<LogicalSource> logicalSources = null;
            HttpStatisticsCollector relayStatsCollector = this._relay.getHttpStatisticsCollector();
            String sources = request.getParams().getProperty(SOURCES_PARAM);
            if (null == sources) {
                logicalSources = this._relay.getSourcesIdNameRegistry().getAllSources();
            } else {
                String[] sourceIds = sources.split(",");
                logicalSources = new ArrayList<LogicalSource>(sourceIds.length);
                for (String sourceId : sourceIds) {
                    String trimmedSourceId = sourceId.trim();
                    try {
                        int srcId = Integer.valueOf(trimmedSourceId);
                        LogicalSource lsource = this._relay.getSourcesIdNameRegistry().getSource(Integer.valueOf(srcId));
                        if (null == lsource) {
                            LOG.error((Object)("No source name for source id: " + srcId));
                            throw new InvalidRequestParamValueException(COMMAND_NAME, SOURCES_PARAM, sourceId);
                        }
                        logicalSources.add(lsource);
                    }
                    catch (NumberFormatException nfe) {
                        if (relayStatsCollector != null) {
                            relayStatsCollector.registerInvalidRegisterCall();
                        }
                        throw new InvalidRequestParamValueException(COMMAND_NAME, SOURCES_PARAM, sourceId);
                    }
                }
            }
            SchemaRegistryService schemaRegistry = this._relay.getSchemaRegistryService();
            ArrayList<RegisterResponseEntry> registeredSources = new ArrayList<RegisterResponseEntry>(20);
            for (LogicalSource lsource : logicalSources) {
                this.getSchemas(schemaRegistry, lsource.getName(), lsource.getId(), sources, registeredSources);
            }
            StringWriter out = new StringWriter(102400);
            ObjectMapper mapper = new ObjectMapper();
            int registerResponseProtocolVersion = registerRequestProtocolVersion;
            if (registerRequestProtocolVersion == 4) {
                LOG.debug((Object)"Got version 4 /register request; fetching metadata schema.");
                ArrayList<RegisterResponseMetadataEntry> registeredMetadata = new ArrayList<RegisterResponseMetadataEntry>(2);
                this.getMetadataSchemas(schemaRegistry, registeredMetadata);
                HashMap<String, ArrayList<Object>> responseMap = new HashMap<String, ArrayList<Object>>(4);
                responseMap.put("sourceSchemas", registeredSources);
                if (registeredMetadata.size() > 0) {
                    LOG.debug((Object)"Sending v4 /register response with metadata schema.");
                    responseMap.put("metadataSchemas", registeredMetadata);
                } else {
                    LOG.debug((Object)"No metadata schema available; sending v4 /register response without.");
                }
                mapper.writeValue((Writer)out, responseMap);
            } else {
                mapper.writeValue((Writer)out, registeredSources);
            }
            ChunkedWritableByteChannel responseContent = request.getResponseContent();
            byte[] resultBytes = out.toString().getBytes(Charset.defaultCharset());
            responseContent.addMetadata("x-dbus-protocol-version", (Object)registerResponseProtocolVersion);
            responseContent.write(ByteBuffer.wrap(resultBytes));
            if (null != relayStatsCollector) {
                HttpStatisticsCollector connStatsCollector = (HttpStatisticsCollector)request.getParams().get(relayStatsCollector.getName());
                if (null != connStatsCollector) {
                    connStatsCollector.registerRegisterCall(registeredSources);
                } else {
                    relayStatsCollector.registerRegisterCall(registeredSources);
                }
            }
            return request;
        }
        catch (InvalidRequestParamValueException e) {
            HttpStatisticsCollector relayStatsCollector = this._relay.getHttpStatisticsCollector();
            if (null != relayStatsCollector) {
                relayStatsCollector.registerInvalidRegisterCall();
            }
            throw e;
        }
    }

    private void getSchemas(SchemaRegistryService schemaRegistry, String name, Integer sourceId, String sources, ArrayList<RegisterResponseEntry> registeredSources) throws RequestProcessingException {
        Map versionedSchemas = null;
        try {
            versionedSchemas = schemaRegistry.fetchAllSchemaVersionsBySourceName(name);
        }
        catch (DatabusException ie) {
            HttpStatisticsCollector relayStatsCollector = this._relay.getHttpStatisticsCollector();
            if (relayStatsCollector != null) {
                relayStatsCollector.registerInvalidRegisterCall();
            }
            throw new RequestProcessingException((Throwable)ie);
        }
        if (null == versionedSchemas || versionedSchemas.isEmpty()) {
            HttpStatisticsCollector relayStatsCollector = this._relay.getHttpStatisticsCollector();
            if (relayStatsCollector != null) {
                relayStatsCollector.registerInvalidRegisterCall();
            }
            LOG.error((Object)("Problem fetching schema for sourceId " + sourceId + "; sources string = " + sources));
        } else {
            for (Map.Entry e : versionedSchemas.entrySet()) {
                registeredSources.add(new RegisterResponseEntry(sourceId.longValue(), ((Short)e.getKey()).shortValue(), (String)e.getValue()));
            }
        }
    }

    private void getMetadataSchemas(SchemaRegistryService schemaRegistry, ArrayList<RegisterResponseMetadataEntry> registeredMetadata) throws RequestProcessingException {
        Map versionedSchemas = null;
        try {
            VersionedSchemaSet schemaSet = schemaRegistry.fetchAllMetadataSchemaVersions();
            if (schemaSet != null) {
                versionedSchemas = schemaSet.getAllVersionsWithSchemaId("metadata-source");
            }
        }
        catch (DatabusException ie) {
            HttpStatisticsCollector relayStatsCollector = this._relay.getHttpStatisticsCollector();
            if (relayStatsCollector != null) {
                relayStatsCollector.registerInvalidRegisterCall();
            }
            throw new RequestProcessingException((Throwable)ie);
        }
        if (versionedSchemas != null && !versionedSchemas.isEmpty()) {
            for (SchemaId id : versionedSchemas.keySet()) {
                VersionedSchema entry = (VersionedSchema)versionedSchemas.get(id);
                if (entry.getOrigSchemaStr() == null) {
                    throw new RequestProcessingException("Null schema string for metadata version " + entry.getVersion());
                }
                registeredMetadata.add(new RegisterResponseMetadataEntry((short)entry.getVersion(), entry.getOrigSchemaStr(), id.getByteArray()));
            }
        }
    }
}

