/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.core;

import com.linkedin.databus.core.ConcurrentAppendableSingleFileInputStream;
import com.linkedin.databus.core.TrailFileNotifier;
import com.linkedin.databus.core.util.NamedThreadFactory;
import com.linkedin.databus.core.util.RateMonitor;
import com.linkedin.databus.monitoring.mbean.GGParserStatistics;
import com.linkedin.databus2.core.DatabusException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;

public class ConcurrentAppendableCompositeFileInputStream
extends InputStream
implements TrailFileNotifier.TrailFileListener {
    public static final String MODULE = ConcurrentAppendableCompositeFileInputStream.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    private final File _dir;
    private SequenceInputStream _seqStream = null;
    private final TrailFileNotifier _trailFileLocator;
    private final TrailFileNotifier.TrailFileManager _trailFileManager;
    private File _latestFile;
    private final long _initialFileOffset;
    private volatile long _numReadCalls = 0L;
    private volatile long _numReadCallsWithData = 0L;
    private InputStreamEnumerator _streamEnumerator = null;
    private boolean _initDone = false;
    private final RateMonitor _rateMonitor = new RateMonitor("ConcurrentAppendableCompositeFileInputStream");
    private final boolean _staticStream;
    private boolean _closed = false;
    private boolean _firstTrailFile = true;
    private ScheduledThreadPoolExecutor _executor;
    private final long STATS_COLLECTION_PERIOD = 5000L;
    private GGParserStatistics _ggParserStats = null;

    public void setGGParserStats(GGParserStatistics st) {
        this._ggParserStats = st;
    }

    public ConcurrentAppendableCompositeFileInputStream(String dir, TrailFileNotifier.TrailFileManager filter, boolean staticStream) throws IOException {
        this(dir, null, -1L, filter, staticStream);
    }

    public String toString() {
        return "ConcurrentAppendableCompositeFileInputStream{_dir=" + this._dir + ", _seqStream=" + this._seqStream + ", _trailFileLocator=" + (Object)((Object)this._trailFileLocator) + ", _trailFileManager=" + this._trailFileManager + ", _latestFile=" + this._latestFile + ", _initialFileOffset=" + this._initialFileOffset + ", _numReadCalls=" + this._numReadCalls + ", _numReadCallsWithData=" + this._numReadCallsWithData + ", _streamEnumerator=" + this._streamEnumerator + ", _initDone=" + this._initDone + ", _rateMonitor=" + this._rateMonitor + ", _staticStream=" + this._staticStream + ", _closed=" + this._closed + ", _firstTrailFile=" + this._firstTrailFile + '}';
    }

    public ConcurrentAppendableCompositeFileInputStream(String dir, String startFile, long offset, TrailFileNotifier.TrailFileManager filter, boolean staticStream) throws IOException {
        this.validateDir(dir);
        this._dir = new File(dir);
        this._trailFileManager = filter;
        this._latestFile = startFile == null ? null : new File(this._dir.getAbsolutePath() + "/" + startFile);
        this._initialFileOffset = offset;
        this._staticStream = staticStream;
        this._trailFileLocator = new TrailFileNotifier(this._dir, this._trailFileManager, this._latestFile, 10L, this);
        this._rateMonitor.start();
        this._rateMonitor.suspend();
        this._executor = new ScheduledThreadPoolExecutor(1, (ThreadFactory)new NamedThreadFactory("StatsTime for " + dir, true));
        this._executor.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                ConcurrentAppendableCompositeFileInputStream.this.updateParserStats();
            }
        }, 5000L, 5000L, TimeUnit.MILLISECONDS);
    }

    private void validateDir(String dir) throws IOException {
        File d = new File(dir);
        if (!d.isDirectory()) {
            throw new IOException("Path (" + dir + ") does not exist or is not a directory !!");
        }
    }

    public void initializeStream() throws IOException {
        this._streamEnumerator = new InputStreamEnumerator(this._staticStream, this._trailFileManager, this._ggParserStats);
        boolean success = this._trailFileLocator.fetchOneTime();
        if (!success) {
            return;
        }
        this._seqStream = new SequenceInputStream(this._streamEnumerator);
        if (!this._staticStream) {
            this._trailFileLocator.start();
        }
        this._initDone = true;
    }

    public RateMonitor getRateMonitor() {
        return this._rateMonitor;
    }

    private int readData(byte[] b, int off, int len, ReadCall readCall) throws IOException {
        if (!this._initDone) {
            this.initializeStream();
        }
        int ret = -1;
        int numBytesRead = 0;
        this._rateMonitor.resume();
        if (null != this._seqStream) {
            switch (readCall) {
                case READ_BULK: {
                    numBytesRead = ret = this._seqStream.read(b);
                    break;
                }
                case READ_BULK_WITH_OFFSETS: {
                    numBytesRead = ret = this._seqStream.read(b, off, len);
                    break;
                }
                case READ_ONE_BYTE: {
                    ret = this._seqStream.read();
                    numBytesRead = 1;
                }
            }
        }
        if (ret != -1) {
            ++this._numReadCallsWithData;
            this._rateMonitor.ticks((long)numBytesRead);
            if (this._ggParserStats != null) {
                this._ggParserStats.addBytesParsed(this._rateMonitor.getNumTicks());
            }
        }
        this._rateMonitor.suspend();
        ++this._numReadCalls;
        return ret;
    }

    @Override
    public int read() throws IOException {
        return this.readData(null, -1, -1, ReadCall.READ_ONE_BYTE);
    }

    @Override
    public int read(byte[] b) throws IOException {
        return this.readData(b, -1, -1, ReadCall.READ_BULK);
    }

    @Override
    public int read(byte[] b, int off, int len) throws IOException {
        return this.readData(b, off, len, ReadCall.READ_BULK_WITH_OFFSETS);
    }

    @Override
    public void close() throws IOException {
        if (this._executor != null) {
            this._executor.shutdownNow();
        }
        if (null != this._trailFileLocator && this._trailFileLocator.isAlive()) {
            this._trailFileLocator.shutdown();
        }
        this.closeStream();
    }

    private void closeStream() throws IOException {
        if (this._closed) {
            return;
        }
        LOG.info((Object)"Closing Stream !!");
        if (this._streamEnumerator != null) {
            this._streamEnumerator.close();
        }
        if (null != this._seqStream) {
            this._seqStream.close();
        }
        LOG.info((Object)"Stream closed !!");
        this._closed = true;
    }

    @Override
    public void onNewTrailFile(File file) throws DatabusException {
        long offset = 0L;
        if (this._firstTrailFile) {
            offset = this._initialFileOffset;
            if (null != this._latestFile && !file.equals(this._latestFile)) {
                throw new DatabusException("Expected to get " + this._latestFile + " but got " + file);
            }
            this._firstTrailFile = false;
        } else if (!this._trailFileManager.isNextFileInSequence(this._latestFile, file)) {
            throw new DatabusException("Expected to get next file to " + this._latestFile + " but got " + file);
        }
        this._latestFile = file;
        this._streamEnumerator.enqueueNewTrailFile(new FilePosition(file, offset));
    }

    @Override
    public void onError(Throwable ex) {
        LOG.error((Object)"Closing the inputStream because of error", ex);
        try {
            this.closeStream();
        }
        catch (IOException e) {
            LOG.error((Object)"Unable to close the inputStream", (Throwable)e);
        }
    }

    protected TrailFileNotifier getTrailFileNotifier() {
        return this._trailFileLocator;
    }

    public long getNumReadCalls() {
        return this._numReadCalls;
    }

    public long getNumReadCallsWithData() {
        return this._numReadCallsWithData;
    }

    public File getCurrentFile() {
        ConcurrentAppendableSingleFileInputStream s = this._streamEnumerator.getCurrStream();
        if (null == s) {
            return null;
        }
        return s.getFile();
    }

    public long getCurrentPosition() {
        ConcurrentAppendableSingleFileInputStream s = this._streamEnumerator.getCurrStream();
        if (null == s) {
            return -1L;
        }
        return s.getCurrPosition();
    }

    public boolean isClosed() {
        return this._closed;
    }

    private void updateParserStats() {
        if (this._streamEnumerator == null || this._ggParserStats == null) {
            return;
        }
        InputStreamEnumerator.ParserLag pLag = this._streamEnumerator.calculateLag();
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("running updateParserStats. lag=" + pLag));
        }
        this._ggParserStats.setBytesLag(pLag.getBytesLag());
        this._ggParserStats.setFilesLag(pLag.getFilesLag());
        this._ggParserStats.setModTimeLag(pLag.getTSBegin(), pLag.getTSEnd());
    }

    public static class InputStreamEnumerator
    implements Enumeration<InputStream> {
        private ConcurrentAppendableSingleFileInputStream _currStream = null;
        private final Queue<FilePosition> _newTrailFiles = new ConcurrentLinkedQueue<FilePosition>();
        private final boolean _isStaticStream;
        private final Lock _lock = new ReentrantLock();
        private final Condition _notEmpty = this._lock.newCondition();
        private boolean _firstStream = true;
        private volatile boolean _closed = false;
        private final InputStream _beginStream = new EOFStream();
        private final InputStream _endStream = new EOFStream();
        private final Map<String, Long> _inputFileSizes = Collections.synchronizedMap(new HashMap());
        private final Map<String, Long> _inputFileModTimes = Collections.synchronizedMap(new HashMap());
        private final TrailFileNotifier.TrailFileManager _trailFileFilter;
        private final GGParserStatistics _ggParserStats;
        private File _mostRecentParsedFile;
        private File _mostRecentFileAdded = null;
        private long _mostRecentFileAddedSize = 0L;

        public InputStreamEnumerator(boolean staticStream, TrailFileNotifier.TrailFileManager trailFileFilter, GGParserStatistics ggParserStats) throws IOException {
            this._isStaticStream = staticStream;
            this._trailFileFilter = trailFileFilter;
            this._ggParserStats = ggParserStats;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public ParserLag calculateLag() {
            File trailFile = null;
            int totalBytes = 0;
            int totalFiles = 0;
            long modTSFirst = 0L;
            long modTSLast = 0L;
            Long size = null;
            this._lock.lock();
            try {
                Long modTime;
                if (this._currStream != null) {
                    File fc = this._currStream.getFile();
                    size = this._inputFileSizes.get(fc.getAbsolutePath());
                    if (size == null || this._newTrailFiles.size() == 0) {
                        size = fc.length();
                        this._inputFileSizes.put(fc.getAbsolutePath(), size);
                    }
                    totalBytes = (int)((long)totalBytes + (size - this._currStream.getCurrPosition()));
                    modTime = this._inputFileModTimes.get(fc.getAbsolutePath());
                    if (modTime == null || this._newTrailFiles.size() == 0) {
                        modTime = fc.lastModified();
                        this._inputFileModTimes.put(fc.getAbsolutePath(), modTime);
                    }
                    modTSFirst = modTime;
                    modTSLast = modTime;
                }
                for (FilePosition fp : this._newTrailFiles) {
                    trailFile = fp.getFile();
                    size = this._inputFileSizes.get(trailFile.getAbsolutePath());
                    if (size == null) {
                        size = trailFile.length();
                        this._inputFileSizes.put(trailFile.getAbsolutePath(), size);
                    }
                    totalBytes = (int)((long)totalBytes + size);
                    ++totalFiles;
                    if (modTSFirst != 0L) continue;
                    modTime = this._inputFileModTimes.get(trailFile.getAbsolutePath());
                    if (modTime == null) {
                        modTime = trailFile.lastModified();
                        this._inputFileModTimes.put(trailFile.getAbsolutePath(), modTime);
                    }
                    modTSFirst = modTime;
                }
                if (trailFile != null) {
                    totalBytes = (int)((long)totalBytes - size);
                    size = trailFile.length();
                    this._inputFileSizes.remove(trailFile.getAbsolutePath());
                    totalBytes = (int)((long)totalBytes + size);
                    modTime = trailFile.lastModified();
                    this._inputFileModTimes.remove(trailFile.getAbsolutePath());
                    modTSLast = modTime;
                }
            }
            finally {
                this._lock.unlock();
            }
            return new ParserLag(totalBytes, modTSFirst, modTSLast, totalFiles);
        }

        public String toString() {
            StringBuffer toString = new StringBuffer("InputStreamEnumerator{_currStream=" + this._currStream + ", numberOfNewTrailFiles=" + this._newTrailFiles.size() + ", _isStaticStream=" + this._isStaticStream + ", _lock=" + this._lock + ", _notEmpty=" + this._notEmpty + ", _firstStream=" + this._firstStream + ", _closed=" + this._closed + ", _beginStream=" + this._beginStream + ", _endStream=" + this._endStream);
            if (LOG.isDebugEnabled()) {
                toString.append(", _newTrailFiles =  " + this._newTrailFiles);
            }
            return toString.append('}').toString();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public boolean hasMoreElements() {
            this._lock.lock();
            try {
                if (this._closed) {
                    boolean bl = false;
                    return bl;
                }
                if (!this._isStaticStream) {
                    boolean bl = true;
                    return bl;
                }
                boolean bl = !this._newTrailFiles.isEmpty();
                return bl;
            }
            finally {
                this._lock.unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public InputStream nextElement() {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)"Moving to next InputStream !!");
            }
            InputStream stream = null;
            this._lock.lock();
            try {
                if (this._firstStream) {
                    stream = this._beginStream;
                    this._firstStream = false;
                    InputStream inputStream = stream;
                    return inputStream;
                }
                if (this._closed) {
                    InputStream inputStream = this._endStream;
                    return inputStream;
                }
                while (this._newTrailFiles.isEmpty() && !this._closed) {
                    this._notEmpty.await();
                }
                if (this._closed) {
                    InputStream inputStream = null;
                    return inputStream;
                }
                if (this._currStream != null) {
                    LOG.info((Object)("about to switch to the next file from " + this._currStream.getFile()));
                    this._inputFileSizes.remove(this._currStream.getFile().getAbsolutePath());
                    this._inputFileModTimes.remove(this._currStream.getFile().getAbsolutePath());
                }
                FilePosition newFilePos = this._newTrailFiles.poll();
                try {
                    this._currStream = this._isStaticStream || !this._newTrailFiles.isEmpty() ? ConcurrentAppendableSingleFileInputStream.createStaticFileInputStream(newFilePos.getFile().getAbsolutePath(), newFilePos.getOffset()) : ConcurrentAppendableSingleFileInputStream.createAppendingFileInputStream(newFilePos.getFile().getAbsolutePath(), newFilePos.getOffset(), 100L);
                    stream = this._currStream;
                    this.parserFileStarted(newFilePos.getFile());
                }
                catch (Exception e) {
                    LOG.error((Object)("Unable to construct stream for trail filePos (" + newFilePos + ")"), (Throwable)e);
                    throw new RuntimeException("Unable to construct stream for trail filePos (" + newFilePos + ")", e);
                }
            }
            catch (InterruptedException ie) {
                LOG.error((Object)"Got interrupted while waiting for new trail files !!", (Throwable)ie);
            }
            finally {
                this._lock.unlock();
            }
            return stream;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void enqueueNewTrailFile(FilePosition filePos) {
            this._lock.lock();
            try {
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Enqueueing new trail file Position: " + filePos));
                }
                if (this._closed) {
                    LOG.error((Object)("Trying to insert new filePos (" + filePos + ") when in closed state. Skipping !!"));
                    return;
                }
                this.parserAddNewFile(filePos.getFile());
                if (this._newTrailFiles.isEmpty() && null != this._currStream) {
                    this._currStream.appendDone();
                }
                this._newTrailFiles.offer(filePos);
                this._notEmpty.signal();
            }
            finally {
                this._lock.unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void close() {
            this._lock.lock();
            try {
                this._closed = true;
                this._notEmpty.signal();
            }
            finally {
                this._lock.unlock();
            }
        }

        public ConcurrentAppendableSingleFileInputStream getCurrStream() {
            return this._currStream;
        }

        public void parserAddNewFile(File f) {
            if (this._mostRecentFileAdded != null && this._trailFileFilter.compareFileName(f, this._mostRecentFileAdded) <= 0) {
                LOG.warn((Object)("adding file that has been added before=" + f + ". ignoring."));
                return;
            }
            if (this._ggParserStats != null) {
                this._ggParserStats.addNewFile(1);
            }
        }

        public void parserFileStarted(File f) {
            if (this._mostRecentParsedFile != null && this._trailFileFilter.compareFileName(f, this._mostRecentParsedFile) <= 0) {
                LOG.warn((Object)("skipping update for the file(" + f + ") because it has been recorded already. most recent file=" + this._mostRecentParsedFile));
                return;
            }
            if (this._ggParserStats != null) {
                this._ggParserStats.addParsedFile(1);
            }
            this._mostRecentParsedFile = f;
        }

        public static class ParserLag {
            private final long _bytesLag;
            private final long _tsBegin;
            private final long _tsEnd;
            private final int _filesLag;

            public ParserLag(long byteLag, long tsBegin, long tsEnd, int fileLag) {
                this._bytesLag = byteLag;
                this._tsBegin = tsBegin;
                this._tsEnd = tsEnd;
                this._filesLag = fileLag;
            }

            public long getBytesLag() {
                return this._bytesLag;
            }

            public long getTSBegin() {
                return this._tsBegin;
            }

            public long getTSEnd() {
                return this._tsEnd;
            }

            public int getFilesLag() {
                return this._filesLag;
            }

            public String toString() {
                return "LAG:bytes=" + this._bytesLag + ";files=" + this._filesLag + "; tsBegin=" + this._tsBegin + ";tsEnd=" + this._tsEnd;
            }
        }
    }

    private static class FilePosition {
        private final File file;
        private final long offset;

        public String toString() {
            return "FilePosition [file=" + this.file + ", offset=" + this.offset + "]";
        }

        public FilePosition(File file, long offset) {
            this.file = file;
            this.offset = offset;
        }

        public File getFile() {
            return this.file;
        }

        public long getOffset() {
            return this.offset;
        }
    }

    public static class EOFStream
    extends InputStream {
        @Override
        public int read() throws IOException {
            return -1;
        }
    }

    public static enum ReadCall {
        READ_ONE_BYTE,
        READ_BULK,
        READ_BULK_WITH_OFFSETS;

    }
}

