/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.bootstrap.server;

import com.linkedin.databus.bootstrap.api.BootstrapEventCallback;
import com.linkedin.databus.bootstrap.api.BootstrapEventProcessResult;
import com.linkedin.databus.bootstrap.api.BootstrapProcessingException;
import com.linkedin.databus.core.Checkpoint;
import com.linkedin.databus.core.DbusEvent;
import com.linkedin.databus.core.DbusEventFactory;
import com.linkedin.databus.core.DbusEventInternalReadable;
import com.linkedin.databus.core.DbusEventV1Factory;
import com.linkedin.databus.core.Encoding;
import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollector;
import com.linkedin.databus2.core.filter.DbusFilter;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.log4j.Logger;

public class BootstrapEventWriter
implements BootstrapEventCallback {
    public static final String MODULE = BootstrapEventWriter.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    public final DbusEventFactory _eventFactory = new DbusEventV1Factory();
    public boolean _debug;
    private WritableByteChannel _writeChannel;
    private long _clientFreeBufferSize;
    private long _bytesSent;
    private long _numRowsWritten;
    private int _sizeOfPendingEvent = -1;
    private DbusEventInternalReadable _event = null;
    private DbusFilter _filter;
    private Encoding _encoding;

    public BootstrapEventWriter(WritableByteChannel writeChannel, long clientFreeBufferSize, DbusFilter filter, Encoding enc) {
        this._writeChannel = writeChannel;
        this._encoding = enc;
        this._clientFreeBufferSize = clientFreeBufferSize;
        this._bytesSent = 0L;
        this._numRowsWritten = 0L;
        this._filter = filter;
        this._debug = LOG.isDebugEnabled();
    }

    public BootstrapEventProcessResult onEvent(ResultSet rs, DbusEventsStatisticsCollector statsCollector) throws BootstrapProcessingException {
        long rid = -1L;
        boolean exceededBufferLimit = false;
        boolean dropped = true;
        try {
            ByteBuffer tmpBuffer;
            if (null == this._event) {
                tmpBuffer = ByteBuffer.wrap(rs.getBytes(4));
                if (this._debug) {
                    LOG.debug((Object)("BUFFER SIZE:" + tmpBuffer.limit()));
                }
                this._event = this._eventFactory.createReadOnlyDbusEventFromBuffer(tmpBuffer, tmpBuffer.position());
            } else {
                tmpBuffer = ByteBuffer.wrap(rs.getBytes(4));
                if (this._debug) {
                    LOG.debug((Object)("Resized BUFFER SIZE:" + tmpBuffer.limit()));
                }
                this._event = this._event.reset(tmpBuffer, 0);
            }
            if (this._debug) {
                LOG.debug((Object)("Event fetched: " + this._event.size() + " for source:" + this._event.srcId()));
            }
            if (!this._event.isValid()) {
                LOG.error((Object)("got an error event :" + this._event.toString()));
                return BootstrapEventProcessResult.getFailedEventProcessingResult((long)this._numRowsWritten);
            }
            rid = rs.getLong(1);
            if (this._debug) {
                LOG.debug((Object)("sending: " + this._event.getDbusEventKey() + " " + this._event.sequence()));
                LOG.debug((Object)("event size:" + this._event.size()));
            }
            if (null == this._filter || this._filter.allow((DbusEvent)this._event)) {
                if (this._debug && null != this._filter) {
                    LOG.debug((Object)("Event :" + this._event.getDbusEventKey() + " passed filter check !!"));
                }
                if (this._bytesSent + (long)this._event.size() < this._clientFreeBufferSize) {
                    int sentBytes = this._event.writeTo(this._writeChannel, this._encoding);
                    if (0 >= sentBytes) {
                        return BootstrapEventProcessResult.getFailedEventProcessingResult((long)this._numRowsWritten);
                    }
                    this._bytesSent += (long)sentBytes;
                    ++this._numRowsWritten;
                    dropped = false;
                    if (this._debug) {
                        LOG.debug((Object)("SENT " + this._bytesSent));
                    }
                    if (null != statsCollector) {
                        statsCollector.registerDataEvent(this._event);
                        if (this._debug) {
                            LOG.debug((Object)("Stats NumEvents :" + statsCollector.getTotalStats().getNumDataEvents()));
                        }
                    }
                } else {
                    exceededBufferLimit = true;
                    this._sizeOfPendingEvent = this._event.size();
                    LOG.info((Object)("Terminating batch with max. size of " + this._clientFreeBufferSize + "; Bytes sent in the current batch is " + this._bytesSent + "; Rows processed in the batch is " + this._numRowsWritten + (this._numRowsWritten <= 0L ? ", Pending Event Size is : " + this._sizeOfPendingEvent : "")));
                }
            } else if (null != statsCollector) {
                statsCollector.registerDataEventFiltered(this._event);
                if (this._debug) {
                    LOG.debug((Object)("Stats NumFilteredEvents :" + statsCollector.getTotalStats().getNumDataEventsFiltered()));
                }
                if (this._debug) {
                    LOG.debug((Object)("Event :" + this._event.getDbusEventKey() + " failed filter check !!"));
                }
            }
        }
        catch (SQLException e) {
            LOG.error((Object)("SQLException encountered while sending to client row " + rid));
            throw new BootstrapProcessingException((Throwable)e);
        }
        return new BootstrapEventProcessResult(this._numRowsWritten, exceededBufferLimit, dropped);
    }

    public void onCheckpointEvent(Checkpoint currentCheckpoint, DbusEventsStatisticsCollector curStatsCollector) {
        this._debug = LOG.isDebugEnabled();
        currentCheckpoint.bootstrapCheckPoint();
        DbusEventInternalReadable checkpointEvent = this._eventFactory.createCheckpointEvent(currentCheckpoint);
        checkpointEvent.writeTo(this._writeChannel, this._encoding);
    }

    public long getNumRowsWritten() {
        return this._numRowsWritten;
    }

    public int getSizeOfPendingEvent() {
        return this._sizeOfPendingEvent;
    }
}

