package com.twitter.elephantbird.cascading2.scheme;

import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.tuple.Fields;
import com.twitter.elephantbird.util.Codecs;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/twitter/elephantbird/cascading2/scheme/LzoB64LineScheme.class */
public abstract class LzoB64LineScheme extends LzoTextLine {
    public static final String LINE_FIELD_NAME = "message";
    private static final String ENCODING = "UTF-8";
    private static final Logger LOG = LoggerFactory.getLogger(LzoB64LineScheme.class);
    private transient Base64 base64;

    public LzoB64LineScheme() {
        super(new Fields(new Comparable[]{LINE_FIELD_NAME}));
    }

    protected abstract Object decodeMessage(byte[] bArr);

    protected abstract byte[] encodeMessage(Object obj);

    private Base64 getBase64() {
        if (this.base64 == null) {
            this.base64 = Codecs.createStandardBase64();
        }
        return this.base64;
    }

    public void sink(HadoopFlowProcess hadoopFlowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
        ((OutputCollector) sinkCall.getOutput()).collect((Object) null, new Text(new String(getBase64().encode(encodeMessage(sinkCall.getOutgoingEntry().getTuple().getObject(0))), ENCODING)));
    }

    public boolean source(HadoopFlowProcess hadoopFlowProcess, SourceCall<Object[], RecordReader> sourceCall) throws IOException {
        Object[] objArr = (Object[]) sourceCall.getContext();
        if (!((RecordReader) sourceCall.getInput()).next(objArr[0], objArr[1])) {
            return false;
        }
        boolean z = false;
        Text text = (Text) objArr[1];
        try {
            byte[] decode = getBase64().decode(text.toString().getBytes(ENCODING));
            Object decodeMessage = decodeMessage(decode);
            if (decodeMessage == null) {
                LOG.info("Couldn't decode " + text + " " + Arrays.toString(decode));
            } else {
                sourceCall.getIncomingEntry().getTuple().set(0, decodeMessage);
                z = true;
            }
        } catch (UnsupportedEncodingException e) {
            LOG.info(e.toString());
        } catch (ArrayIndexOutOfBoundsException e2) {
            LOG.info("Could not decode " + text);
        }
        return z;
    }

    public void sourcePrepare(HadoopFlowProcess hadoopFlowProcess, SourceCall<Object[], RecordReader> sourceCall) {
        sourceCall.setContext(new Object[2]);
        ((Object[]) sourceCall.getContext())[0] = ((RecordReader) sourceCall.getInput()).createKey();
        ((Object[]) sourceCall.getContext())[1] = ((RecordReader) sourceCall.getInput()).createValue();
    }

    public /* bridge */ /* synthetic */ void sink(FlowProcess flowProcess, SinkCall sinkCall) throws IOException {
        sink((HadoopFlowProcess) flowProcess, (SinkCall<Object[], OutputCollector>) sinkCall);
    }

    public /* bridge */ /* synthetic */ boolean source(FlowProcess flowProcess, SourceCall sourceCall) throws IOException {
        return source((HadoopFlowProcess) flowProcess, (SourceCall<Object[], RecordReader>) sourceCall);
    }

    public /* bridge */ /* synthetic */ void sourcePrepare(FlowProcess flowProcess, SourceCall sourceCall) {
        sourcePrepare((HadoopFlowProcess) flowProcess, (SourceCall<Object[], RecordReader>) sourceCall);
    }
}
