/*
 * Decompiled with CFR 0.152.
 */
package com.marklogic.client.datamovement;

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.datamovement.BatchFailureListener;
import com.marklogic.client.datamovement.Batcher;
import com.marklogic.client.datamovement.DataMovementManager;
import com.marklogic.client.datamovement.FilteredForestConfiguration;
import com.marklogic.client.datamovement.Forest;
import com.marklogic.client.datamovement.ForestConfiguration;
import com.marklogic.client.datamovement.QueryBatch;
import com.marklogic.client.datamovement.QueryBatchException;
import com.marklogic.client.datamovement.QueryBatchListener;
import com.marklogic.client.datamovement.QueryBatcher;
import com.marklogic.client.datamovement.QueryFailureListener;
import com.marklogic.client.datamovement.WriteBatch;
import com.marklogic.client.datamovement.WriteBatcher;
import com.marklogic.client.datamovement.WriteFailureListener;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.net.ssl.SSLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HostAvailabilityListener
implements QueryFailureListener,
WriteFailureListener {
    private static Logger logger = LoggerFactory.getLogger(HostAvailabilityListener.class);
    private DataMovementManager moveMgr;
    private Duration suspendTimeForHostUnavailable = Duration.ofMinutes(10L);
    private int minHosts = 1;
    private ScheduledFuture<?> future;
    Set<QueryBatchListener> retryListenersSet = new HashSet<QueryBatchListener>();
    List<Class<?>> hostUnavailableExceptions = new ArrayList();

    public HostAvailabilityListener(DataMovementManager moveMgr) {
        if (moveMgr == null) {
            throw new IllegalArgumentException("moveMgr must not be null");
        }
        this.moveMgr = moveMgr;
        if (moveMgr.getConnectionType() == DatabaseClient.ConnectionType.DIRECT) {
            this.hostUnavailableExceptions.add(SocketException.class);
            this.hostUnavailableExceptions.add(SSLException.class);
            this.hostUnavailableExceptions.add(UnknownHostException.class);
        }
    }

    public HostAvailabilityListener withSuspendTimeForHostUnavailable(Duration duration) {
        if (duration == null) {
            throw new IllegalArgumentException("duration must not be null");
        }
        this.suspendTimeForHostUnavailable = duration;
        return this;
    }

    public HostAvailabilityListener withMinHosts(int numHosts) {
        if (this.moveMgr.getConnectionType() == DatabaseClient.ConnectionType.GATEWAY) {
            if (numHosts != 1) {
                throw new IllegalArgumentException("numHosts must be 1 when using only the primary host for the connection");
            }
        } else {
            if (numHosts <= 0) {
                throw new IllegalArgumentException("numHosts must be > 0");
            }
            int numConfigHosts = this.moveMgr.readForestConfig().getPreferredHosts().length;
            if (numHosts > numConfigHosts) {
                throw new IllegalArgumentException("numHosts must be less than or equal to the number of hosts in the cluster");
            }
        }
        this.minHosts = numHosts;
        return this;
    }

    public HostAvailabilityListener withHostUnavailableExceptions(Class<Throwable> ... exceptionTypes) {
        this.hostUnavailableExceptions = new ArrayList();
        for (Class<Throwable> exception : exceptionTypes) {
            this.hostUnavailableExceptions.add(exception);
        }
        return this;
    }

    public Throwable[] getHostUnavailableExceptions() {
        return this.hostUnavailableExceptions.toArray(new Throwable[this.hostUnavailableExceptions.size()]);
    }

    public Duration getSuspendTimeForHostUnavailable() {
        return this.suspendTimeForHostUnavailable;
    }

    public int getMinHosts() {
        return this.minHosts;
    }

    @Override
    public void processFailure(WriteBatch batch, Throwable throwable) {
        if (batch.getClient() == null) {
            throw new IllegalStateException("null DatabaseClient");
        }
        boolean isHostUnavailableException = this.processException(batch.getBatcher(), throwable, batch.getClient().getHost());
        if (isHostUnavailableException) {
            try {
                logger.warn("Retrying failed batch: {}, results so far: {}, uris: {}", new Object[]{batch.getJobBatchNumber(), batch.getJobWritesSoFar(), Stream.of(batch.getItems()).map(event -> event.getTargetUri()).collect(Collectors.toList())});
                batch.getBatcher().retryWithFailureListeners(batch);
            }
            catch (RuntimeException e) {
                logger.error("Exception during retry", (Throwable)e);
                this.processFailure(batch, (Throwable)e);
            }
        }
    }

    @Override
    public void processFailure(QueryBatchException queryBatch) {
        if (queryBatch.getClient() == null) {
            throw new IllegalStateException("null DatabaseClient");
        }
        boolean isHostUnavailableException = this.processException(queryBatch.getBatcher(), queryBatch, queryBatch.getClient().getHost());
        if (isHostUnavailableException) {
            try {
                logger.warn("Retrying failed batch: {}, results so far: {}, forest: {}, forestBatch: {}, forest results so far: {}", new Object[]{queryBatch.getJobBatchNumber(), queryBatch.getJobResultsSoFar(), queryBatch.getForest().getForestName(), queryBatch.getForestBatchNumber(), queryBatch.getForestResultsSoFar()});
                queryBatch.getBatcher().retryWithFailureListeners(queryBatch);
            }
            catch (RuntimeException e) {
                logger.error("Exception during retry", (Throwable)e);
                this.processFailure(new QueryBatchException(queryBatch, (Throwable)e));
            }
        }
    }

    private synchronized boolean processException(Batcher batcher, Throwable throwable, String host) {
        return this.moveMgr.getConnectionType() == DatabaseClient.ConnectionType.GATEWAY ? this.processGatewayException(batcher, throwable, host) : this.processForestHostException(batcher, throwable, host);
    }

    private boolean processGatewayException(Batcher batcher, Throwable throwable, String host) {
        return false;
    }

    private boolean processForestHostException(Batcher batcher, Throwable throwable, String host) {
        boolean isHostUnavailableException;
        boolean shouldWeRetry = isHostUnavailableException = this.isHostUnavailableException(throwable, new HashSet<Throwable>());
        if (isHostUnavailableException) {
            String[] preferredHosts;
            ForestConfiguration existingForestConfig = batcher.getForestConfig();
            HashSet<String> preferredHostsList = new HashSet<String>(Arrays.asList(existingForestConfig.getPreferredHosts()));
            if (existingForestConfig instanceof FilteredForestConfiguration) {
                FilteredForestConfiguration existingFilteredForestConfiguration = (FilteredForestConfiguration)existingForestConfig;
                for (Forest forest : existingFilteredForestConfiguration.listForests()) {
                    if (forest.getPreferredHostType() != Forest.HostType.REQUEST_HOST || forest.getHost().toLowerCase().equals(forest.getRequestHost().toLowerCase()) || !preferredHostsList.contains(forest.getHost())) continue;
                    preferredHostsList.remove(forest.getHost());
                }
            }
            if (!Arrays.asList(preferredHosts = preferredHostsList.toArray(new String[preferredHostsList.size()])).contains(host)) {
                return shouldWeRetry;
            }
            if (preferredHosts.length > this.minHosts) {
                logger.error("ERROR: host unavailable \"" + host + "\", black-listing it for " + this.suspendTimeForHostUnavailable.toString(), throwable);
                FilteredForestConfiguration filteredForestConfig = new FilteredForestConfiguration(existingForestConfig);
                if (batcher instanceof WriteBatcher) {
                    filteredForestConfig = filteredForestConfig.withBlackList(host);
                } else if (batcher instanceof QueryBatcher) {
                    List availableHosts = Stream.of(preferredHosts).filter(availableHost -> !availableHost.equals(host)).collect(Collectors.toList());
                    int randomPos = new Random().nextInt(availableHosts.size());
                    String randomAvailableHost = (String)availableHosts.get(randomPos);
                    filteredForestConfig = filteredForestConfig.withRenamedHost(host, randomAvailableHost);
                }
                batcher.withForestConfig(filteredForestConfig);
                this.scheduleForestResynch(batcher, host);
            } else {
                shouldWeRetry = false;
                logger.error("Encountered [" + throwable + "] on host \"" + host + "\" but black-listing it would drop job below minHosts (" + this.minHosts + "), so stopping job \"" + batcher.getJobName() + "\"", throwable);
                this.moveMgr.stopJob(batcher);
            }
        }
        return shouldWeRetry;
    }

    private void scheduleForestResynch(Batcher batcher, String host) {
        if (this.future != null) {
            this.future.cancel(false);
        }
        this.future = Executors.newScheduledThreadPool(1).schedule(() -> {
            if (batcher.isStopped()) {
                logger.debug("Job \"{}\" is stopped, so cancelling re-sync with the server forest config", (Object)batcher.getJobName());
            } else {
                ForestConfiguration updatedForestConfig = this.moveMgr.readForestConfig();
                logger.info("it's been {} since host {} failed, opening communication to all server hosts [{}]", new Object[]{this.suspendTimeForHostUnavailable.toString(), host, Arrays.asList(updatedForestConfig.getPreferredHosts())});
                batcher.withForestConfig(updatedForestConfig);
            }
        }, this.suspendTimeForHostUnavailable.toMillis(), TimeUnit.MILLISECONDS);
    }

    protected boolean isHostUnavailableException(Throwable throwable, Set<Throwable> path) {
        for (Class<?> type : this.hostUnavailableExceptions) {
            if (!type.isInstance(throwable)) continue;
            return true;
        }
        if (throwable.getCause() != null && !path.contains(throwable.getCause())) {
            path.add(throwable.getCause());
            boolean isCauseHostUnavailableException = this.isHostUnavailableException(throwable.getCause(), path);
            if (isCauseHostUnavailableException) {
                return true;
            }
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public BatchFailureListener<QueryBatch> initializeRetryListener(QueryBatchListener queryBatchListener) {
        if (!this.retryListenersSet.contains(queryBatchListener)) {
            HostAvailabilityListener hostAvailabilityListener = this;
            synchronized (hostAvailabilityListener) {
                if (!this.retryListenersSet.contains(queryBatchListener)) {
                    RetryListener retryListener = new RetryListener(queryBatchListener);
                    this.retryListenersSet.add(queryBatchListener);
                    return retryListener;
                }
            }
        }
        return null;
    }

    public static HostAvailabilityListener getInstance(Batcher batcher) {
        if (batcher instanceof WriteBatcher) {
            return HostAvailabilityListener.getInstance((WriteBatcher)batcher);
        }
        if (batcher instanceof QueryBatcher) {
            return HostAvailabilityListener.getInstance((QueryBatcher)batcher);
        }
        throw new IllegalStateException("The Batcher should be either a QueryBatcher instance or a WriteBatcher instance");
    }

    private static HostAvailabilityListener getInstance(WriteBatcher batcher) {
        WriteFailureListener[] writeFailureListeners;
        for (WriteFailureListener writeFailureListener : writeFailureListeners = batcher.getBatchFailureListeners()) {
            if (!(writeFailureListener instanceof HostAvailabilityListener)) continue;
            return (HostAvailabilityListener)writeFailureListener;
        }
        return null;
    }

    private static HostAvailabilityListener getInstance(QueryBatcher batcher) {
        QueryFailureListener[] queryFailureListeners;
        for (QueryFailureListener queryFailureListener : queryFailureListeners = batcher.getQueryFailureListeners()) {
            if (!(queryFailureListener instanceof HostAvailabilityListener)) continue;
            return (HostAvailabilityListener)queryFailureListener;
        }
        return null;
    }

    class RetryListener
    implements BatchFailureListener<QueryBatch> {
        QueryBatchListener queryBatchListener;

        public RetryListener(QueryBatchListener queryBatchListener) {
            this.queryBatchListener = queryBatchListener;
        }

        @Override
        public void processFailure(QueryBatch batch, Throwable throwable) {
            if (batch.getClient() == null) {
                throw new IllegalStateException("null DatabaseClient");
            }
            boolean isHostUnavailableException = HostAvailabilityListener.this.processException(batch.getBatcher(), throwable, batch.getClient().getHost());
            if (isHostUnavailableException) {
                try {
                    logger.warn("Retrying failed listener batch: {}, results so far: {}, uris: {}", new Object[]{batch.getJobBatchNumber(), batch.getJobResultsSoFar(), Arrays.toString(batch.getItems())});
                    batch.getBatcher().retryListener(batch, this.queryBatchListener);
                }
                catch (RuntimeException e) {
                    logger.error("Exception during listener retry", (Throwable)e);
                    this.processFailure(batch, (Throwable)e);
                }
            }
        }
    }
}

