/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.sink.internal;

import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.kafka.sink.internal.ReadableBackchannel;
import org.apache.flink.connector.kafka.sink.internal.WritableBackchannel;

@Internal
@ThreadSafe
public final class BackchannelImpl<T> {
    private final Runnable closeAction;
    private final Deque<T> messages = new ConcurrentLinkedDeque<T>();
    private volatile ReadableBackchannel<T> readableBackchannel;
    private volatile WritableBackchannel<T> writableBackchannel;

    BackchannelImpl(Runnable closeAction) {
        this.closeAction = closeAction;
    }

    private boolean isEstablished() {
        return this.readableBackchannel != null && this.writableBackchannel != null;
    }

    private void closeReadableChannel() {
        if (this.readableBackchannel == null) {
            throw new IllegalStateException("Readable backchannel does not exist.");
        }
        this.readableBackchannel = null;
        this.checkClosed();
    }

    private void checkClosed() {
        if (this.readableBackchannel == null && this.writableBackchannel == null) {
            this.closeAction.run();
        }
    }

    ReadableBackchannel<T> createReadableBackchannel() {
        if (this.readableBackchannel != null) {
            throw new IllegalStateException("Readable backchannel already exists.");
        }
        this.readableBackchannel = new Readable();
        return this.readableBackchannel;
    }

    WritableBackchannel<T> createWritableBackchannel() {
        if (this.writableBackchannel != null) {
            throw new IllegalStateException("Writable backchannel already exists.");
        }
        this.writableBackchannel = new Writable();
        return this.writableBackchannel;
    }

    private void closeWritableChannel() {
        if (this.writableBackchannel == null) {
            throw new IllegalStateException("Writable backchannel does not exist.");
        }
        this.writableBackchannel = null;
        this.checkClosed();
    }

    private class Readable
    implements ReadableBackchannel<T> {
        private Readable() {
        }

        @Override
        @Nullable
        public T poll() {
            return BackchannelImpl.this.messages.poll();
        }

        @Override
        public boolean isEstablished() {
            return BackchannelImpl.this.isEstablished();
        }

        @Override
        public void close() {
            BackchannelImpl.this.closeReadableChannel();
        }
    }

    private class Writable
    implements WritableBackchannel<T> {
        private Writable() {
        }

        @Override
        public void send(T message) {
            BackchannelImpl.this.messages.add(message);
        }

        @Override
        public boolean isEstablished() {
            return BackchannelImpl.this.isEstablished();
        }

        @Override
        public void close() {
            BackchannelImpl.this.closeWritableChannel();
        }
    }
}

