/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver;

import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.RowProcessorClient;
import org.apache.hadoop.hbase.coprocessor.BaseRowProcessorEndpoint;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos;
import org.apache.hadoop.hbase.regionserver.BaseRowProcessor;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.RowProcessor;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={MediumTests.class})
public class TestRegionProcessRowsWithLocks {
    private static final Log LOG = LogFactory.getLog(TestRegionProcessRowsWithLocks.class);
    private static final TableName TABLE = TableName.valueOf((String)"testtable");
    private static final byte[] ROW = Bytes.toBytes((String)"testrow");
    private static final byte[] FAM = Bytes.toBytes((String)"friendlist");
    private static final byte[] A = Bytes.toBytes((String)"a");
    private static final byte[] B = Bytes.toBytes((String)"b");
    private static final byte[] C = Bytes.toBytes((String)"c");
    private static final byte[] D = Bytes.toBytes((String)"d");
    private static final byte[] E = Bytes.toBytes((String)"e");
    private static final byte[] F = Bytes.toBytes((String)"f");
    private static final byte[] G = Bytes.toBytes((String)"g");
    private static final byte[] COUNTER = Bytes.toBytes((String)"counter");
    private static HBaseTestingUtility util = new HBaseTestingUtility();
    private static volatile int expectedCounter = 0;
    private static volatile Table table = null;
    private static final AtomicBoolean throwsException = new AtomicBoolean(false);

    @BeforeClass
    public static void setupBeforeClass() throws Exception {
        Configuration conf = util.getConfiguration();
        conf.setStrings("hbase.coprocessor.region.classes", new String[]{RowProcessorEndpoint.class.getName()});
        conf.setInt("hbase.client.retries.number", 2);
        conf.setLong("hbase.hregion.row.processor.timeout", 1000L);
        util.startMiniCluster();
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        util.shutdownMiniCluster();
    }

    public void prepareTestData() throws Exception {
        try {
            util.getHBaseAdmin().disableTable(TABLE);
            util.getHBaseAdmin().deleteTable(TABLE);
        }
        catch (Exception exception) {
            // empty catch block
        }
        table = util.createTable(TABLE, FAM);
        Put put = new Put(ROW);
        put.add(FAM, A, Bytes.add((byte[])B, (byte[])C));
        put.add(FAM, B, Bytes.add((byte[])D, (byte[])E, (byte[])F));
        put.add(FAM, C, G);
        table.put(put);
        expectedCounter = 0;
    }

    @Test
    public void testProcessNormal() throws Throwable {
        this.prepareTestData();
        List<HRegion> regions = util.getHBaseCluster().getRegions(TABLE);
        HRegion region = regions.get(0);
        long startMemstoreSize = region.getMemstoreSize();
        long startFlushableSize = region.getStore(FAM).getFlushableSize();
        int finalCounter = this.incrementCounter(table);
        Assert.assertEquals((long)expectedCounter, (long)finalCounter);
        Get get = new Get(ROW);
        Result result = table.get(get);
        LOG.debug((Object)("row keyvalues:" + TestRegionProcessRowsWithLocks.stringifyKvs(result.listCells())));
        int getR = Bytes.toInt((byte[])CellUtil.cloneValue((Cell)result.getColumnLatestCell(FAM, COUNTER)));
        Assert.assertEquals((long)expectedCounter, (long)getR);
        long endMemstoreSize = region.getMemstoreSize();
        long endFlushableSize = region.getStore(FAM).getFlushableSize();
        Assert.assertEquals((String)"Should equal.", (long)(endMemstoreSize - startMemstoreSize), (long)(endFlushableSize - startFlushableSize));
    }

    @Test
    public void testProcessExceptionAndRollBack() throws Throwable {
        this.prepareTestData();
        List<HRegion> regions = util.getHBaseCluster().getRegions(TABLE);
        HRegion region = regions.get(0);
        long startMemstoreSize = region.getMemstoreSize();
        long startFlushableSize = region.getStore(FAM).getFlushableSize();
        WAL wal = region.getWAL();
        wal.registerWALActionsListener((WALActionsListener)new WALActionsListener.Base(){

            public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) throws IOException {
                if (throwsException.get()) {
                    throwsException.set(false);
                    throw new IOException("throw test IOException");
                }
            }
        });
        try {
            this.incrementCounter(table);
            Assert.fail((String)"Should throw IOException.");
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        long endMemstoreSize = region.getMemstoreSize();
        long endFlushableSize = region.getStore(FAM).getFlushableSize();
        LOG.info((Object)("MemstoreSize deta=" + (endMemstoreSize - startMemstoreSize) + ",FlushableSize deta=" + (endFlushableSize - startFlushableSize)));
        Assert.assertEquals((String)"Should equal.", (long)(endMemstoreSize - startMemstoreSize), (long)(endFlushableSize - startFlushableSize));
    }

    private int incrementCounter(Table table) throws Throwable {
        CoprocessorRpcChannel channel = table.coprocessorService(ROW);
        RowProcessorEndpoint.IncrementCounterProcessor processor = new RowProcessorEndpoint.IncrementCounterProcessor(ROW);
        RowProcessorProtos.RowProcessorService.BlockingInterface service = RowProcessorProtos.RowProcessorService.newBlockingStub((BlockingRpcChannel)channel);
        RowProcessorProtos.ProcessRequest request = RowProcessorClient.getRowProcessorPB((RowProcessor)processor);
        RowProcessorProtos.ProcessResponse protoResult = service.process(null, request);
        IncrementCounterProcessorTestProtos.IncCounterProcessorResponse response = IncrementCounterProcessorTestProtos.IncCounterProcessorResponse.parseFrom(protoResult.getRowProcessorResult());
        Integer result = response.getResponse();
        return result;
    }

    static String stringifyKvs(Collection<Cell> kvs) {
        StringBuilder out = new StringBuilder();
        out.append("[");
        if (kvs != null) {
            for (Cell kv : kvs) {
                byte[] col = CellUtil.cloneQualifier((Cell)kv);
                byte[] val = CellUtil.cloneValue((Cell)kv);
                if (Bytes.equals((byte[])col, (byte[])COUNTER)) {
                    out.append(Bytes.toStringBinary((byte[])col) + ":" + Bytes.toInt((byte[])val) + " ");
                    continue;
                }
                out.append(Bytes.toStringBinary((byte[])col) + ":" + Bytes.toStringBinary((byte[])val) + " ");
            }
        }
        out.append("]");
        return out.toString();
    }

    public static class RowProcessorEndpoint<S extends Message, T extends Message>
    extends BaseRowProcessorEndpoint<S, T>
    implements CoprocessorService {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public static void doScan(HRegion region, Scan scan, List<Cell> result) throws IOException {
            try (RegionScanner scanner = null;){
                scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
                scanner = region.getScanner(scan);
                result.clear();
                scanner.next(result);
            }
        }

        public static class IncrementCounterProcessor
        extends BaseRowProcessor<IncrementCounterProcessorTestProtos.IncCounterProcessorRequest, IncrementCounterProcessorTestProtos.IncCounterProcessorResponse> {
            int counter = 0;
            byte[] row = new byte[0];

            public IncrementCounterProcessor() {
            }

            public IncrementCounterProcessor(byte[] row) {
                this.row = row;
            }

            public Collection<byte[]> getRowsToLock() {
                return Collections.singleton(this.row);
            }

            public IncrementCounterProcessorTestProtos.IncCounterProcessorResponse getResult() {
                IncrementCounterProcessorTestProtos.IncCounterProcessorResponse.Builder i = IncrementCounterProcessorTestProtos.IncCounterProcessorResponse.newBuilder();
                i.setResponse(this.counter);
                return i.build();
            }

            public boolean readOnly() {
                return false;
            }

            public void process(long now, HRegion region, List<Mutation> mutations, WALEdit walEdit) throws IOException {
                ArrayList<Cell> kvs = new ArrayList<Cell>();
                Scan scan = new Scan(this.row, this.row);
                scan.addColumn(FAM, COUNTER);
                RowProcessorEndpoint.doScan(region, scan, kvs);
                LOG.info((Object)("kvs.size()=" + kvs.size()));
                this.counter = kvs.size() == 0 ? 0 : Bytes.toInt((byte[])CellUtil.cloneValue((Cell)((Cell)kvs.iterator().next())));
                LOG.info((Object)("counter=" + this.counter));
                Assert.assertEquals((long)expectedCounter, (long)this.counter);
                ++this.counter;
                expectedCounter += 1;
                Put p = new Put(this.row);
                KeyValue kv = new KeyValue(this.row, FAM, COUNTER, now, Bytes.toBytes((int)this.counter));
                p.add((Cell)kv);
                mutations.add((Mutation)p);
                walEdit.add((Cell)kv);
                KeyValue metaKv = new KeyValue(this.row, WALEdit.METAFAMILY, Bytes.toBytes((String)"I just increment counter"), Bytes.toBytes((int)this.counter));
                walEdit.add((Cell)metaKv);
                throwsException.set(true);
            }

            public IncrementCounterProcessorTestProtos.IncCounterProcessorRequest getRequestData() throws IOException {
                IncrementCounterProcessorTestProtos.IncCounterProcessorRequest.Builder builder = IncrementCounterProcessorTestProtos.IncCounterProcessorRequest.newBuilder();
                builder.setCounter(this.counter);
                builder.setRow(ByteStringer.wrap((byte[])this.row));
                return builder.build();
            }

            public void initialize(IncrementCounterProcessorTestProtos.IncCounterProcessorRequest msg) {
                this.row = msg.getRow().toByteArray();
                this.counter = msg.getCounter();
            }
        }
    }
}

