/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest.v2;

import io.confluent.kafkarest.ConsumerInstanceId;
import io.confluent.kafkarest.ConsumerReadCallback;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.RestConfigUtils;
import io.confluent.kafkarest.Time;
import io.confluent.kafkarest.entities.ConsumerAssignmentRequest;
import io.confluent.kafkarest.entities.ConsumerAssignmentResponse;
import io.confluent.kafkarest.entities.ConsumerCommittedRequest;
import io.confluent.kafkarest.entities.ConsumerCommittedResponse;
import io.confluent.kafkarest.entities.ConsumerInstanceConfig;
import io.confluent.kafkarest.entities.ConsumerOffsetCommitRequest;
import io.confluent.kafkarest.entities.ConsumerSeekToOffsetRequest;
import io.confluent.kafkarest.entities.ConsumerSeekToRequest;
import io.confluent.kafkarest.entities.ConsumerSubscriptionRecord;
import io.confluent.kafkarest.entities.ConsumerSubscriptionResponse;
import io.confluent.kafkarest.entities.TopicPartitionOffset;
import io.confluent.kafkarest.entities.TopicPartitionOffsetMetadata;
import io.confluent.kafkarest.v2.AvroKafkaConsumerState;
import io.confluent.kafkarest.v2.BinaryKafkaConsumerState;
import io.confluent.kafkarest.v2.JsonKafkaConsumerState;
import io.confluent.kafkarest.v2.KafkaConsumerReadTask;
import io.confluent.kafkarest.v2.KafkaConsumerState;
import io.confluent.rest.exceptions.RestNotFoundException;
import io.confluent.rest.exceptions.RestServerErrorException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.GuardedBy;
import javax.ws.rs.core.Response;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumerManager {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerManager.class);
    private final KafkaRestConfig config;
    private final Time time;
    private final String bootstrapServers;
    private final Map<ConsumerInstanceId, KafkaConsumerState> consumers = new HashMap<ConsumerInstanceId, KafkaConsumerState>();
    private final ExecutorService executor;
    private KafkaConsumerFactory consumerFactory;
    final DelayQueue<RunnableReadTask> delayedReadTasks = new DelayQueue();
    private final ExpirationThread expirationThread;
    private ReadTaskSchedulerThread readTaskSchedulerThread;
    @GuardedBy(value="this")
    private ConsumerInstanceId adminConsumerInstanceId = null;

    public KafkaConsumerManager(KafkaRestConfig config) {
        this.config = config;
        this.time = config.getTime();
        this.bootstrapServers = RestConfigUtils.bootstrapBrokers(config);
        int maxThreadCount = config.getInt("consumer.threads") < 0 ? Integer.MAX_VALUE : config.getInt("consumer.threads");
        this.executor = new KafkaConsumerThreadPoolExecutor(0, maxThreadCount, 60L, TimeUnit.SECONDS, new SynchronousQueue(), new RejectedExecutionHandler(){

            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                log.debug("The runnable {} was rejected execution. The thread pool must be satured or shutiing down", (Object)r);
                if (r instanceof ReadFutureTask) {
                    RunnableReadTask readTask = ((ReadFutureTask)r).readTask;
                    readTask.delayFor(ThreadLocalRandom.current().nextInt(25, 76));
                } else if (!executor.isShutdown()) {
                    r.run();
                }
            }
        });
        this.consumerFactory = null;
        this.expirationThread = new ExpirationThread();
        this.readTaskSchedulerThread = new ReadTaskSchedulerThread();
        this.expirationThread.start();
        this.readTaskSchedulerThread.start();
    }

    KafkaConsumerManager(KafkaRestConfig config, KafkaConsumerFactory consumerFactory) {
        this(config);
        this.consumerFactory = consumerFactory;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String createConsumer(String group, ConsumerInstanceConfig instanceConfig) {
        String name = instanceConfig.getName();
        if (instanceConfig.getId() != null) {
            name = instanceConfig.getId();
        }
        if (name == null) {
            name = "rest-consumer-";
            String serverId = this.config.getString("id");
            if (!serverId.isEmpty()) {
                name = name + serverId + "-";
            }
            name = name + UUID.randomUUID().toString();
        }
        ConsumerInstanceId cid = new ConsumerInstanceId(group, name);
        KafkaConsumerManager kafkaConsumerManager = this;
        synchronized (kafkaConsumerManager) {
            if (this.consumers.containsKey(cid)) {
                throw Errors.consumerAlreadyExistsException();
            }
            this.consumers.put(cid, null);
        }
        boolean succeeded = false;
        try {
            Object consumer;
            log.debug("Creating consumer " + name + " in group " + group);
            Properties props = this.config.getConsumerProperties();
            props.setProperty("bootstrap.servers", this.bootstrapServers);
            props.setProperty("max.poll.records", "30");
            props.setProperty("group.id", group);
            if (instanceConfig.getId() != null) {
                props.setProperty("consumer.id", instanceConfig.getId());
            }
            if (instanceConfig.getAutoCommitEnable() != null) {
                props.setProperty("enable.auto.commit", instanceConfig.getAutoCommitEnable());
            }
            if (instanceConfig.getAutoOffsetReset() != null) {
                props.setProperty("auto.offset.reset", instanceConfig.getAutoOffsetReset());
            }
            props.setProperty("request.timeout.ms", "30000");
            props.setProperty("schema.registry.url", this.config.getString("schema.registry.url"));
            switch (instanceConfig.getFormat()) {
                case AVRO: {
                    props.put("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
                    props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
                    break;
                }
                default: {
                    props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                }
            }
            try {
                consumer = this.consumerFactory == null ? new KafkaConsumer(props) : this.consumerFactory.createConsumer(props);
            }
            catch (ConfigException e) {
                throw Errors.invalidConsumerConfigException((String)e.getMessage());
            }
            KafkaConsumerState state = this.createConsumerState(instanceConfig, cid, (Consumer)consumer);
            Object object = this;
            synchronized (object) {
                this.consumers.put(cid, state);
            }
            succeeded = true;
            object = name;
            return object;
        }
        finally {
            if (!succeeded) {
                KafkaConsumerManager kafkaConsumerManager2 = this;
                synchronized (kafkaConsumerManager2) {
                    this.consumers.remove(cid);
                }
            }
        }
    }

    private KafkaConsumerState createConsumerState(ConsumerInstanceConfig instanceConfig, ConsumerInstanceId cid, Consumer consumer) throws RestServerErrorException {
        KafkaRestConfig newConfig = KafkaRestConfig.newConsumerConfig((KafkaRestConfig)this.config, (ConsumerInstanceConfig)instanceConfig);
        switch (instanceConfig.getFormat()) {
            case BINARY: {
                return new BinaryKafkaConsumerState(newConfig, cid, consumer);
            }
            case AVRO: {
                return new AvroKafkaConsumerState(newConfig, cid, consumer);
            }
            case JSON: {
                return new JsonKafkaConsumerState(newConfig, cid, consumer);
            }
        }
        throw new RestServerErrorException(String.format("Invalid embedded format %s for new consumer.", instanceConfig.getFormat()), Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
    }

    public <KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> void readRecords(String group, String instance, Class<? extends KafkaConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT>> consumerStateType, long timeout, long maxBytes, ConsumerReadCallback<ClientKeyT, ClientValueT> callback) {
        KafkaConsumerState<?, ?, ?, ?> state;
        try {
            state = this.getConsumerInstance(group, instance);
        }
        catch (RestNotFoundException e) {
            callback.onCompletion(null, (Exception)((Object)e));
            return;
        }
        if (!consumerStateType.isInstance(state)) {
            callback.onCompletion(null, (Exception)Errors.consumerFormatMismatch());
            return;
        }
        KafkaConsumerReadTask task = new KafkaConsumerReadTask(state, timeout, maxBytes, callback);
        this.executor.submit(new RunnableReadTask(new ReadTaskState(task, state, callback)));
    }

    public Future commitOffsets(String group, String instance, final String async, final ConsumerOffsetCommitRequest offsetCommitRequest, final CommitCallback callback) {
        KafkaConsumerState<?, ?, ?, ?> state;
        try {
            state = this.getConsumerInstance(group, instance);
        }
        catch (RestNotFoundException e) {
            callback.onCompletion(null, (Exception)((Object)e));
            return null;
        }
        return this.executor.submit(new Runnable(){

            @Override
            public void run() {
                try {
                    List<TopicPartitionOffset> offsets = state.commitOffsets(async, offsetCommitRequest);
                    callback.onCompletion(offsets, null);
                }
                catch (Exception e) {
                    log.error("Failed to commit offsets for consumer " + state.getId().toString(), (Throwable)e);
                    callback.onCompletion(null, e);
                }
                finally {
                    state.updateExpiration();
                }
            }

            public String toString() {
                return String.format("OffsetCommit consumer id: %s; Async: %s;", state.getId(), async);
            }
        });
    }

    public ConsumerCommittedResponse committed(String group, String instance, ConsumerCommittedRequest request) {
        log.debug("Committed offsets for consumer " + instance + " in group " + group);
        ConsumerCommittedResponse response = new ConsumerCommittedResponse();
        KafkaConsumerState<?, ?, ?, ?> state = this.getConsumerInstance(group, instance);
        if (state != null) {
            response = state.committed(request);
        } else {
            response.offsets = new ArrayList<TopicPartitionOffsetMetadata>();
        }
        return response;
    }

    public long getBeginningOffset(String topic, int partition) {
        log.debug("Beginning offset for topic {} and partition {}.", (Object)topic, (Object)partition);
        KafkaConsumerState<?, ?, ?, ?> consumer = this.getAdminConsumerInstance();
        return consumer.getBeginningOffset(topic, partition);
    }

    public long getEndOffset(String topic, int partition) {
        log.debug("End offset for topic {} and partition {}.", (Object)topic, (Object)partition);
        KafkaConsumerState<?, ?, ?, ?> consumer = this.getAdminConsumerInstance();
        return consumer.getEndOffset(topic, partition);
    }

    public Optional<Long> getOffsetForTime(String topic, int partition, Instant timestamp) {
        log.debug("Offset for topic {} and partition {} at timestamp {}.", new Object[]{topic, partition, timestamp});
        KafkaConsumerState<?, ?, ?, ?> consumer = this.getAdminConsumerInstance();
        return consumer.getOffsetForTime(topic, partition, timestamp);
    }

    private String createAdminConsumerGroup() {
        String serverId = this.config.getString("id");
        if (serverId.isEmpty()) {
            return String.format("rest-consumer-group-%s", UUID.randomUUID().toString());
        }
        return String.format("rest-consumer-group-%s-%s", serverId, UUID.randomUUID().toString());
    }

    private synchronized KafkaConsumerState<?, ?, ?, ?> getAdminConsumerInstance() {
        if (this.adminConsumerInstanceId == null || !this.consumers.containsKey(this.adminConsumerInstanceId)) {
            this.adminConsumerInstanceId = this.createAdminConsumerInstance();
        }
        return this.getConsumerInstance(this.adminConsumerInstanceId);
    }

    private ConsumerInstanceId createAdminConsumerInstance() {
        String consumerGroup = this.createAdminConsumerGroup();
        String consumerInstance = this.createConsumer(consumerGroup, new ConsumerInstanceConfig());
        return new ConsumerInstanceId(consumerGroup, consumerInstance);
    }

    public void deleteConsumer(String group, String instance) {
        log.debug("Destroying consumer " + instance + " in group " + group);
        KafkaConsumerState<?, ?, ?, ?> state = this.getConsumerInstance(group, instance, true);
        state.close();
    }

    public void subscribe(String group, String instance, ConsumerSubscriptionRecord subscription) {
        log.debug("Subscribing consumer " + instance + " in group " + group);
        KafkaConsumerState<?, ?, ?, ?> state = this.getConsumerInstance(group, instance);
        if (state != null) {
            state.subscribe(subscription);
        }
    }

    public void unsubscribe(String group, String instance) {
        log.debug("Unsubcribing consumer " + instance + " in group " + group);
        KafkaConsumerState<?, ?, ?, ?> state = this.getConsumerInstance(group, instance);
        if (state != null) {
            state.unsubscribe();
        }
    }

    public ConsumerSubscriptionResponse subscription(String group, String instance) {
        ConsumerSubscriptionResponse response = new ConsumerSubscriptionResponse();
        KafkaConsumerState<?, ?, ?, ?> state = this.getConsumerInstance(group, instance);
        if (state != null) {
            Set<String> topics = state.subscription();
            response.topics = new ArrayList<String>(topics);
        } else {
            response.topics = new ArrayList<String>();
        }
        return response;
    }

    public void seekToBeginning(String group, String instance, ConsumerSeekToRequest seekToRequest) {
        log.debug("seeking to beginning " + instance + " in group " + group);
        KafkaConsumerState<?, ?, ?, ?> state = this.getConsumerInstance(group, instance);
        if (state != null) {
            state.seekToBeginning(seekToRequest);
        }
    }

    public void seekToEnd(String group, String instance, ConsumerSeekToRequest seekToRequest) {
        log.debug("seeking to end " + instance + " in group " + group);
        KafkaConsumerState<?, ?, ?, ?> state = this.getConsumerInstance(group, instance);
        if (state != null) {
            state.seekToEnd(seekToRequest);
        }
    }

    public void seekToOffset(String group, String instance, ConsumerSeekToOffsetRequest seekToOffsetRequest) {
        log.debug("seeking to offset " + instance + " in group " + group);
        KafkaConsumerState<?, ?, ?, ?> state = this.getConsumerInstance(group, instance);
        if (state != null) {
            state.seekToOffset(seekToOffsetRequest);
        }
    }

    public void assign(String group, String instance, ConsumerAssignmentRequest assignmentRequest) {
        log.debug("seeking to end " + instance + " in group " + group);
        KafkaConsumerState<?, ?, ?, ?> state = this.getConsumerInstance(group, instance);
        if (state != null) {
            state.assign(assignmentRequest);
        }
    }

    public ConsumerAssignmentResponse assignment(String group, String instance) {
        log.debug("getting assignment for  " + instance + " in group " + group);
        ConsumerAssignmentResponse response = new ConsumerAssignmentResponse();
        response.partitions = new Vector<io.confluent.kafkarest.entities.TopicPartition>();
        KafkaConsumerState<?, ?, ?, ?> state = this.getConsumerInstance(group, instance);
        if (state != null) {
            Set<TopicPartition> topicPartitions = state.assignment();
            for (TopicPartition t : topicPartitions) {
                response.partitions.add(new io.confluent.kafkarest.entities.TopicPartition(t.topic(), t.partition()));
            }
        }
        return response;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        log.debug("Shutting down consumers");
        this.executor.shutdown();
        log.trace("Shutting down consumer expiration thread");
        this.expirationThread.shutdown();
        this.readTaskSchedulerThread.shutdown();
        KafkaConsumerManager kafkaConsumerManager = this;
        synchronized (kafkaConsumerManager) {
            for (Map.Entry<ConsumerInstanceId, KafkaConsumerState> entry : this.consumers.entrySet()) {
                entry.getValue().close();
            }
            this.consumers.clear();
            this.executor.shutdown();
        }
    }

    private synchronized KafkaConsumerState<?, ?, ?, ?> getConsumerInstance(String group, String instance, boolean toRemove) {
        KafkaConsumerState state;
        ConsumerInstanceId id = new ConsumerInstanceId(group, instance);
        KafkaConsumerState kafkaConsumerState = state = toRemove ? this.consumers.remove(id) : this.consumers.get(id);
        if (state == null) {
            throw Errors.consumerInstanceNotFoundException();
        }
        state.updateExpiration();
        return state;
    }

    KafkaConsumerState<?, ?, ?, ?> getConsumerInstance(String group, String instance) {
        return this.getConsumerInstance(group, instance, false);
    }

    private KafkaConsumerState<?, ?, ?, ?> getConsumerInstance(ConsumerInstanceId consumerInstanceId) {
        return this.getConsumerInstance(consumerInstanceId.getGroup(), consumerInstanceId.getInstance());
    }

    private class ExpirationThread
    extends Thread {
        AtomicBoolean isRunning;
        CountDownLatch shutdownLatch;

        public ExpirationThread() {
            super("Consumer Expiration Thread");
            this.isRunning = new AtomicBoolean(true);
            this.shutdownLatch = new CountDownLatch(1);
            this.setDaemon(true);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                while (this.isRunning.get()) {
                    KafkaConsumerManager kafkaConsumerManager = KafkaConsumerManager.this;
                    synchronized (kafkaConsumerManager) {
                        long now = KafkaConsumerManager.this.time.milliseconds();
                        Iterator itr = KafkaConsumerManager.this.consumers.values().iterator();
                        while (itr.hasNext()) {
                            final KafkaConsumerState state = (KafkaConsumerState)itr.next();
                            if (state == null || !state.expired(now)) continue;
                            log.debug("Removing the expired consumer {}", (Object)state.getId());
                            itr.remove();
                            KafkaConsumerManager.this.executor.submit(new Runnable(){

                                @Override
                                public void run() {
                                    state.close();
                                }
                            });
                        }
                    }
                    Thread.sleep(1000L);
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            this.shutdownLatch.countDown();
        }

        public void shutdown() {
            try {
                this.isRunning.set(false);
                this.interrupt();
                this.shutdownLatch.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Interrupted when shutting down expiration thread.");
            }
        }
    }

    private class ReadTaskSchedulerThread
    extends Thread {
        final AtomicBoolean isRunning;
        final CountDownLatch shutdownLatch;

        ReadTaskSchedulerThread() {
            super("Read Task Scheduler Thread");
            this.isRunning = new AtomicBoolean(true);
            this.shutdownLatch = new CountDownLatch(1);
            this.setDaemon(true);
        }

        @Override
        public void run() {
            try {
                while (this.isRunning.get()) {
                    RunnableReadTask readTask = (RunnableReadTask)KafkaConsumerManager.this.delayedReadTasks.poll(500L, TimeUnit.MILLISECONDS);
                    if (readTask == null) continue;
                    KafkaConsumerManager.this.executor.submit(readTask);
                }
            }
            catch (InterruptedException interruptedException) {
            }
            finally {
                this.shutdownLatch.countDown();
            }
        }

        public void shutdown() {
            try {
                this.isRunning.set(false);
                this.interrupt();
                this.shutdownLatch.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Interrupted when shutting down read task scheduler thread.");
            }
        }
    }

    private static class ReadTaskState {
        final KafkaConsumerReadTask task;
        final KafkaConsumerState consumerState;
        final ConsumerReadCallback callback;

        public ReadTaskState(KafkaConsumerReadTask task, KafkaConsumerState state, ConsumerReadCallback callback) {
            this.task = task;
            this.consumerState = state;
            this.callback = callback;
        }
    }

    public static interface KafkaConsumerFactory {
        public Consumer createConsumer(Properties var1);
    }

    public static interface CommitCallback {
        public void onCompletion(List<TopicPartitionOffset> var1, Exception var2);
    }

    class RunnableReadTask
    implements Runnable,
    Delayed {
        private final ReadTaskState taskState;
        private final KafkaRestConfig consumerConfig;
        private final long started;
        private final long requestExpiration;
        private final int backoffMs;
        private long waitExpirationMs;

        public RunnableReadTask(ReadTaskState taskState) {
            this.taskState = taskState;
            this.started = KafkaConsumerManager.this.config.getTime().milliseconds();
            this.consumerConfig = taskState.consumerState.getConfig();
            this.requestExpiration = this.started + (long)this.consumerConfig.getInt("consumer.request.timeout.ms").intValue();
            this.backoffMs = this.consumerConfig.getInt("consumer.iterator.backoff.ms");
            this.waitExpirationMs = 0L;
        }

        public void delayFor(long delayMs) {
            if (this.requestExpiration <= KafkaConsumerManager.this.config.getTime().milliseconds()) {
                this.taskState.task.finish();
                log.trace("Finished executing  consumer read task ({}) due to request expiry", (Object)this.taskState.task);
                return;
            }
            long delay = delayMs + KafkaConsumerManager.this.config.getTime().milliseconds();
            this.waitExpirationMs = Math.min(delay, this.requestExpiration);
            KafkaConsumerManager.this.delayedReadTasks.add(this);
        }

        public String toString() {
            return String.format("RunnableReadTask consumer id: %s; Read task: %s; Request expiration time: %d; Wait expiration: %d", this.taskState.consumerState.getId(), this.taskState.task, this.requestExpiration, this.waitExpirationMs);
        }

        @Override
        public void run() {
            try {
                log.trace("Executing consumer read task ({})", (Object)this.taskState.task);
                this.taskState.task.doPartialRead();
                this.taskState.consumerState.updateExpiration();
                if (!this.taskState.task.isDone()) {
                    this.delayFor(this.backoffMs);
                } else {
                    log.trace("Finished executing consumer read task ({})", (Object)this.taskState.task);
                }
            }
            catch (Exception e) {
                log.error("Failed to read records from consumer {} while executing read task ({}). {}", new Object[]{this.taskState.consumerState.getId().toString(), this.taskState.task, e});
                this.taskState.callback.onCompletion(null, e);
            }
        }

        @Override
        public long getDelay(TimeUnit unit) {
            long delayMs = this.waitExpirationMs - KafkaConsumerManager.this.config.getTime().milliseconds();
            return unit.convert(delayMs, TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            if (o == null) {
                throw new NullPointerException("Delayed comparator cannot compare with null");
            }
            long otherObjDelay = o.getDelay(TimeUnit.MILLISECONDS);
            long delay = this.getDelay(TimeUnit.MILLISECONDS);
            return Long.compare(delay, otherObjDelay);
        }
    }

    class KafkaConsumerThreadPoolExecutor
    extends ThreadPoolExecutor {
        private KafkaConsumerThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            if (runnable instanceof RunnableReadTask) {
                return new ReadFutureTask((RunnableReadTask)runnable, value);
            }
            return super.newTaskFor(runnable, value);
        }
    }

    private class ReadFutureTask<V>
    extends FutureTask<V> {
        private final RunnableReadTask readTask;

        private ReadFutureTask(RunnableReadTask runnable, V result) {
            super(runnable, result);
            this.readTask = runnable;
        }
    }
}

