/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.core;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import org.apache.log4j.Logger;

public class ConcurrentAppendableSingleFileInputStream
extends InputStream {
    public static final String MODULE = ConcurrentAppendableSingleFileInputStream.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    public static final int EOF = -1;
    public static final int NULL_CHAR = 0;
    private final File _file;
    private final long _resyncIntervalMs;
    private final long _byteOffset;
    private FileInputStream _inputStream = null;
    private volatile EOFSyncBehavior _eofSync = null;
    private volatile boolean _closed = false;
    private long _lastModifiedTimestamp;
    private long _currOffset = 0L;
    private long _prevOffset = 0L;
    private volatile long _numReadCalls = 0L;
    private volatile long _numReadCallsWithData = 0L;

    public static ConcurrentAppendableSingleFileInputStream createStaticFileInputStream(String fileName, long byteOffset) throws IOException {
        return new ConcurrentAppendableSingleFileInputStream(fileName, EOFSyncBehavior.NO_SYNC, byteOffset, 0L);
    }

    public static ConcurrentAppendableSingleFileInputStream createAppendingFileInputStream(String fileName, long byteOffset, long syncInterval) throws IOException {
        return new ConcurrentAppendableSingleFileInputStream(fileName, EOFSyncBehavior.SYNC_TILL_NEW_DATA, byteOffset, syncInterval);
    }

    private ConcurrentAppendableSingleFileInputStream(String fileName, EOFSyncBehavior eofSyncBehavior, long byteOffset, long resyncIntervalMs) throws IOException {
        this._byteOffset = byteOffset;
        this._file = new File(fileName);
        this._eofSync = eofSyncBehavior;
        this._resyncIntervalMs = resyncIntervalMs;
        this._lastModifiedTimestamp = this._file.lastModified();
        this.syncStreamOnce(true);
        if (this._byteOffset > 0L) {
            this._inputStream.getChannel().position(this._byteOffset);
        }
        this._prevOffset = this._currOffset = this._inputStream.getChannel().position();
    }

    @Override
    public synchronized int read() throws IOException {
        int retVal = this._inputStream.read();
        this._prevOffset = this._currOffset;
        this._currOffset = this._inputStream.getChannel().position();
        ++this._numReadCalls;
        boolean isDebugEnabled = LOG.isDebugEnabled();
        if (retVal != -1 && retVal != 0) {
            ++this._numReadCallsWithData;
            if (isDebugEnabled) {
                LOG.debug((Object)("Byte returned (non-EOF) is :" + retVal + ", State is :" + this.toString()));
            }
            return retVal;
        }
        boolean done = false;
        while (!(retVal != -1 && retVal != 0 || this._closed || done)) {
            if (retVal == 0) {
                this.syncOnceAndSleep();
                retVal = this._inputStream.read();
                this._prevOffset = this._currOffset;
                this._currOffset = this._inputStream.getChannel().position();
                continue;
            }
            switch (this._eofSync) {
                case NO_SYNC: {
                    retVal = this._inputStream.read();
                    this._prevOffset = this._currOffset;
                    this._currOffset = this._inputStream.getChannel().position();
                    if (isDebugEnabled) {
                        LOG.debug((Object)("Byte returned (NO_SYNC) is :" + retVal + ", State is :" + this.toString()));
                    }
                    done = true;
                    break;
                }
                case SYNC_ONCE: {
                    this.syncStreamOnce(true);
                    this._eofSync = EOFSyncBehavior.NO_SYNC;
                    break;
                }
                case SYNC_TILL_NEW_DATA: {
                    this.syncOnceAndSleep();
                    retVal = this._inputStream.read();
                    this._prevOffset = this._currOffset;
                    this._currOffset = this._inputStream.getChannel().position();
                }
            }
        }
        if (retVal != -1) {
            ++this._numReadCallsWithData;
        }
        if (isDebugEnabled) {
            LOG.debug((Object)("Byte returned is :" + retVal + ", State is :" + this.toString()));
        }
        return retVal;
    }

    private boolean isNullByteSeen(byte[] b, int offset, int len) {
        int limit = offset + len;
        for (int i = offset; i < limit; ++i) {
            if (b[i] != 0) continue;
            LOG.warn((Object)("Found NULL character from data read from underlying stream. Offset :" + offset + ", Length :" + len + " Re-Syncing"));
            return true;
        }
        return false;
    }

    @Override
    public synchronized int read(byte[] b) throws IOException {
        int retVal = this._inputStream.read(b);
        this._prevOffset = this._currOffset;
        this._currOffset = this._inputStream.getChannel().position();
        ++this._numReadCalls;
        boolean nullBytesSeen = false;
        boolean isDebugEnabled = LOG.isDebugEnabled();
        if (retVal != -1 && !(nullBytesSeen = this.isNullByteSeen(b, 0, retVal))) {
            ++this._numReadCallsWithData;
            if (isDebugEnabled) {
                LOG.debug((Object)("Num Bytes returned (non-EOF) is :" + retVal + ", State is :" + this.toString()));
            }
            return retVal;
        }
        boolean done = false;
        while (!(retVal != -1 && !nullBytesSeen || this._closed || done)) {
            if (nullBytesSeen) {
                this.syncOnceAndSleep();
                retVal = this._inputStream.read(b);
                this._prevOffset = this._currOffset;
                this._currOffset = this._inputStream.getChannel().position();
            } else {
                switch (this._eofSync) {
                    case NO_SYNC: {
                        retVal = this._inputStream.read(b);
                        this._prevOffset = this._currOffset;
                        this._currOffset = this._inputStream.getChannel().position();
                        if (isDebugEnabled) {
                            LOG.debug((Object)("Num bytes returned (NO_SYNC) is :" + retVal + ", State is :" + this.toString()));
                        }
                        done = true;
                        break;
                    }
                    case SYNC_ONCE: {
                        this.syncStreamOnce(true);
                        this._eofSync = EOFSyncBehavior.NO_SYNC;
                        break;
                    }
                    case SYNC_TILL_NEW_DATA: {
                        this.syncOnceAndSleep();
                        retVal = this._inputStream.read(b);
                        this._prevOffset = this._currOffset;
                        this._currOffset = this._inputStream.getChannel().position();
                    }
                }
            }
            nullBytesSeen = this.isNullByteSeen(b, 0, retVal);
        }
        if (retVal != -1) {
            ++this._numReadCallsWithData;
        }
        if (isDebugEnabled) {
            LOG.debug((Object)("Num Bytes returned is :" + retVal + ", State is :" + this.toString()));
        }
        return retVal;
    }

    @Override
    public synchronized int read(byte[] b, int off, int len) throws IOException {
        int retVal = this._inputStream.read(b, off, len);
        this._prevOffset = this._currOffset;
        this._currOffset = this._inputStream.getChannel().position();
        ++this._numReadCalls;
        boolean isDebugEnabled = LOG.isDebugEnabled();
        boolean nullBytesSeen = false;
        if (retVal != -1 && !(nullBytesSeen = this.isNullByteSeen(b, off, retVal))) {
            ++this._numReadCallsWithData;
            if (isDebugEnabled) {
                LOG.debug((Object)("Num Bytes returned (non-EOF) is :" + retVal + ", State is :" + this.toString()));
            }
            return retVal;
        }
        boolean done = false;
        while (!(retVal != -1 && !nullBytesSeen || this._closed || done)) {
            if (nullBytesSeen) {
                this.syncOnceAndSleep();
                retVal = this._inputStream.read(b, off, len);
                this._prevOffset = this._currOffset;
                this._currOffset = this._inputStream.getChannel().position();
            } else {
                switch (this._eofSync) {
                    case NO_SYNC: {
                        retVal = this._inputStream.read(b, off, len);
                        this._prevOffset = this._currOffset;
                        this._currOffset = this._inputStream.getChannel().position();
                        if (isDebugEnabled) {
                            LOG.debug((Object)("Num bytes returned (NO_SYNC) is :" + retVal + ", State is :" + this.toString()));
                        }
                        done = true;
                    }
                    case SYNC_ONCE: {
                        this.syncStreamOnce(true);
                        this._eofSync = EOFSyncBehavior.NO_SYNC;
                        break;
                    }
                    case SYNC_TILL_NEW_DATA: {
                        this.syncOnceAndSleep();
                        if (this._closed) break;
                        retVal = this._inputStream.read(b, off, len);
                        this._prevOffset = this._currOffset;
                        this._currOffset = this._inputStream.getChannel().position();
                    }
                }
            }
            nullBytesSeen = this.isNullByteSeen(b, off, retVal);
        }
        if (retVal != -1) {
            ++this._numReadCallsWithData;
        }
        if (isDebugEnabled) {
            LOG.debug((Object)("Num Bytes returned is :" + retVal + ", State is :" + this.toString()));
        }
        return retVal;
    }

    private synchronized void syncOnceAndSleep() throws IOException {
        boolean refreshed;
        if (this._eofSync == EOFSyncBehavior.SYNC_TILL_NEW_DATA && !(refreshed = this.syncStreamOnce(false))) {
            try {
                this.wait(this._resyncIntervalMs);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    private synchronized boolean syncStreamOnce(boolean force) throws IOException {
        boolean doSync = force;
        long newLastModifiedTs = this._file.lastModified();
        if (!force && this._lastModifiedTimestamp < newLastModifiedTs) {
            doSync = true;
        }
        if (doSync) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Syncing from file :" + this._file));
            }
            this.closeStream();
            this._inputStream = new FileInputStream(this._file);
            this._inputStream.getChannel().position(this._prevOffset);
            this._lastModifiedTimestamp = newLastModifiedTs;
        } else {
            this._inputStream.getChannel().position(this._prevOffset);
        }
        this._currOffset = this._inputStream.getChannel().position();
        return doSync;
    }

    @Override
    public synchronized void close() throws IOException {
        LOG.info((Object)("Closing ConcurrentAppendableSingleFileInputStream for file :" + this._file));
        this._closed = true;
        this.closeStream();
        this.notifyAll();
    }

    public void closeStream() throws IOException {
        if (null != this._inputStream) {
            this._inputStream.close();
        }
    }

    public EOFSyncBehavior getEOFSyncBehavior() {
        return this._eofSync;
    }

    public synchronized void appendDone() {
        LOG.info((Object)("Marking file appending done for inputStream :" + this.toString()));
        this._eofSync = EOFSyncBehavior.SYNC_ONCE;
        this.notifyAll();
    }

    public String toString() {
        return "ConcurrentAppendableSingleFileInputStream [_file=" + this._file + ", _resyncIntervalMs=" + this._resyncIntervalMs + ", _inputStream=" + this._inputStream + ", _eofSync=" + (Object)((Object)this._eofSync) + ", _closed=" + this._closed + ", _lastModifiedTimestamp=" + this._lastModifiedTimestamp + ", _currOffset=" + this._currOffset + ", _numReadCalls=" + this._numReadCalls + ", _numReadCallsWithData=" + this._numReadCallsWithData + "]";
    }

    public long getNumReadCalls() {
        return this._numReadCalls;
    }

    public long geNumReadCallsWithData() {
        return this._numReadCallsWithData;
    }

    public File getFile() {
        return this._file;
    }

    public synchronized long getCurrPosition() {
        long offset = -1L;
        try {
            offset = this._inputStream.getChannel().position();
        }
        catch (Exception ex) {
            LOG.error((Object)("Got exception when getting the current position. State :" + this.toString()), (Throwable)ex);
        }
        return offset;
    }

    public static enum EOFSyncBehavior {
        NO_SYNC,
        SYNC_ONCE,
        SYNC_TILL_NEW_DATA;

    }
}

