/*
 * Decompiled with CFR 0.152.
 */
package com.marklogic.mapreduce;

import com.marklogic.http.HttpChannel;
import com.marklogic.mapreduce.ContentWriter;
import com.marklogic.mapreduce.DocumentURI;
import com.marklogic.mapreduce.LinkedMapWritable;
import com.marklogic.mapreduce.MarkLogicOutputFormat;
import com.marklogic.mapreduce.utilities.AssignmentManager;
import com.marklogic.mapreduce.utilities.AssignmentPolicy;
import com.marklogic.mapreduce.utilities.ForestHost;
import com.marklogic.mapreduce.utilities.ForestInfo;
import com.marklogic.mapreduce.utilities.InternalUtilities;
import com.marklogic.mapreduce.utilities.RestrictedHostsUtil;
import com.marklogic.mapreduce.utilities.TextArrayWritable;
import com.marklogic.xcc.AdhocQuery;
import com.marklogic.xcc.ContentCapability;
import com.marklogic.xcc.ContentSource;
import com.marklogic.xcc.Request;
import com.marklogic.xcc.RequestOptions;
import com.marklogic.xcc.ResultItem;
import com.marklogic.xcc.ResultSequence;
import com.marklogic.xcc.Session;
import com.marklogic.xcc.exceptions.RequestException;
import com.marklogic.xcc.exceptions.XccConfigException;
import com.marklogic.xcc.types.ItemType;
import com.marklogic.xcc.types.XSBoolean;
import com.marklogic.xcc.types.XSInteger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DefaultStringifier;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class ContentOutputFormat<VALUEOUT>
extends MarkLogicOutputFormat<DocumentURI, VALUEOUT> {
    public static final Log LOG = LogFactory.getLog(ContentOutputFormat.class);
    public static final String ID_PREFIX = "#";
    static final String FOREST_HOST_MAP_QUERY = "import module namespace hadoop = \"http://marklogic.com/xdmp/hadoop\" at \"/MarkLogic/hadoop.xqy\";\nhadoop:get-forest-host-map()";
    public static final String FOREST_HOST_QUERY = "import module namespace hadoop = \"http://marklogic.com/xdmp/hadoop\" at \"/MarkLogic/hadoop.xqy\";\ndeclare variable $policy as xs:string external;\ndeclare variable $partition-name as xs:string external;\nhadoop:get-forest-host($policy,$partition-name)";
    public static final String FOREST_REPLICA_HOST_QUERY = "import module namespace hadoop = \"http://marklogic.com/xdmp/hadoop\" at \"/MarkLogic/hadoop.xqy\";\ndeclare variable $policy as xs:string external;\ndeclare variable $partition-name as xs:string external;\nhadoop:get-forest-replica-hosts($policy,$partition-name)";
    public static final String FOREST_REPLICA_HOST_QUERY_WITH_SEGMENT = "import module namespace hadoop = \"http://marklogic.com/xdmp/hadoop\" at \"/MarkLogic/hadoop.xqy\";\ndeclare variable $policy as xs:string external;\ndeclare variable $partition-name as xs:string external;\nhadoop:get-forest-replica-hosts-with-segment($policy,$partition-name)";
    public static final String INIT_QUERY = "import module namespace hadoop = \"http://marklogic.com/xdmp/hadoop\" at \"/MarkLogic/hadoop.xqy\";\nxdmp:host-name(xdmp:host()), \nlet $versionf :=   fn:function-lookup(xs:QName('xdmp:effective-version'),0)\nreturn if (exists($versionf)) then $versionf() else 0, \nlet $repf :=   fn:function-lookup(xs:QName('hadoop:get-forest-replica-hosts'),2)\nreturn exists($repf),let $segRepf := fn:function-lookup(xs:QName('hadoop:get-forest-replica-hosts-with-segment'),2)\nreturn exists($segRepf),let $f :=   fn:function-lookup(xs:QName('hadoop:get-assignment-policy'),0)\nreturn if (exists($f)) then $f() else ()";
    public static final String HEADER_QUERY = "fn:exists(xdmp:get-request-header('x-forwarded-for'))";
    public static final String XDBC_HEADER_QUERY = "let $xdbcHeaderf := fn:function-lookup(xs:QName('xdmp:get-xdbc-request-header'),1)\nreturn if (exists($xdbcHeaderf)) then fn:exists($xdbcHeaderf('x-forwarded-for')) else false()";
    protected AssignmentManager am = AssignmentManager.getInstance();
    protected boolean fastLoad;
    protected boolean allowFastLoad = true;
    protected AssignmentPolicy.Kind policy;
    protected boolean legacy = false;
    protected boolean failover = false;
    protected boolean supportSegment = false;
    protected String initHostName;

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public void checkOutputSpecs(Configuration conf, ContentSource cs) throws IOException {
        Session session = null;
        ResultSequence result = null;
        try {
            session = cs.newSession();
            RequestOptions options = new RequestOptions();
            options.setDefaultXQueryVersion("1.0-ml");
            session.setDefaultRequestOptions(options);
            String outputDir = conf.get("mapreduce.marklogic.output.content.directory");
            if (outputDir != null) {
                AdhocQuery query;
                String queryText;
                String string = outputDir = outputDir.endsWith("/") ? outputDir : outputDir + "/";
                if (conf.getBoolean("mapreduce.marklogic.output.content.cleandir", false)) {
                    queryText = "xdmp:directory-delete(\"{dir}\")".replace("{dir}", outputDir);
                    query = session.newAdhocQuery(queryText);
                    result = session.submitRequest((Request)query);
                } else {
                    queryText = "exists(xdmp:directory(\"{dir}\", \"infinity\"))".replace("{dir}", outputDir);
                    query = session.newAdhocQuery(queryText);
                    result = session.submitRequest((Request)query);
                    if (!result.hasNext()) throw new IllegalStateException("Failed to query directory content.");
                    ResultItem item = result.next();
                    if (((XSBoolean)item.getItem()).asBoolean().booleanValue()) {
                        throw new IllegalStateException("Directory " + outputDir + " already exists");
                    }
                }
            }
            String restrictHostsString = conf.getTrimmed("mapreduce.marklogic.output.restricthosts");
            boolean restrictHosts = false;
            if (restrictHostsString != null && !restrictHostsString.isEmpty()) {
                restrictHosts = Boolean.parseBoolean(restrictHostsString);
            }
            boolean getForwardHeader = restrictHostsString == null;
            restrictHosts = this.initialize(session, restrictHosts, getForwardHeader);
            if (this.fastLoad) {
                LOG.info((Object)"Running in fast load mode");
                DefaultStringifier.store((Configuration)conf, (Object)this.queryForestInfo(cs), (String)"mapreduce.marklogic.output.hostforests");
                AdhocQuery query = session.newAdhocQuery("import module namespace hadoop = \"http://marklogic.com/xdmp/hadoop\" at \"/MarkLogic/hadoop.xqy\";\nhadoop:get-directory-creation()");
                result = session.submitRequest((Request)query);
                if (!result.hasNext()) throw new IllegalStateException("Failed to query directory creation mode.");
                ResultItem item = result.next();
                String dirMode = item.asString();
                if (!dirMode.equals("manual")) {
                    throw new IllegalStateException("Manual directory creation mode is required. The current creation mode is " + dirMode + ".");
                }
            } else {
                TextArrayWritable hostArray = null;
                if (restrictHosts) {
                    String[] outputHosts = conf.getStrings("mapreduce.marklogic.output.host");
                    hostArray = new TextArrayWritable(outputHosts);
                } else {
                    String outputHost = cs.getConnectionProvider().getHostName();
                    hostArray = "local".equals(conf.get("mapreduce.marklogic.mode")) ? this.queryHosts(cs, this.initHostName, outputHost) : this.queryHosts(cs);
                }
                DefaultStringifier.store((Configuration)conf, (Object)((Object)hostArray), (String)"mapreduce.marklogic.output.hostforests");
            }
            String[] perms = conf.getStrings("mapreduce.marklogic.output.content.permission");
            if (perms == null || perms.length <= 0) return;
            if (perms.length % 2 != 0) {
                throw new IllegalStateException("Permissions are expected to be in <role, capability> pairs.");
            }
            int i = 0;
            while (i + 1 < perms.length) {
                String roleName;
                if ((roleName = perms[i++]) == null || roleName.isEmpty()) {
                    throw new IllegalStateException("Illegal role name: " + roleName);
                }
                String perm = perms[i].trim();
                if (!(perm.equalsIgnoreCase(ContentCapability.READ.toString()) || perm.equalsIgnoreCase(ContentCapability.EXECUTE.toString()) || perm.equalsIgnoreCase(ContentCapability.INSERT.toString()) || perm.equalsIgnoreCase(ContentCapability.UPDATE.toString()) || perm.equalsIgnoreCase(ContentCapability.NODE_UPDATE.toString()))) {
                    throw new IllegalStateException("Illegal capability: " + perm);
                }
                ++i;
            }
            return;
        }
        catch (RequestException ex) {
            throw new IOException(ex);
        }
        finally {
            if (session != null) {
                session.close();
            }
            if (result != null) {
                result.close();
            }
        }
    }

    protected Map<String, ContentSource> getSourceMap(boolean fastLoad, TaskAttemptContext context) throws IOException {
        Configuration conf = context.getConfiguration();
        LinkedHashMap<String, ContentSource> sourceMap = new LinkedHashMap<String, ContentSource>();
        if (fastLoad) {
            LinkedMapWritable forestStatusMap = this.getForestStatusMap(conf);
            String[] outputHosts = conf.getStrings("mapreduce.marklogic.output.host");
            boolean restrictHosts = conf.getBoolean("mapreduce.marklogic.output.restricthosts", false);
            RestrictedHostsUtil rhUtil = null;
            if (restrictHosts) {
                rhUtil = new RestrictedHostsUtil(outputHosts);
                for (Writable forestId : forestStatusMap.keySet()) {
                    String forestHost = ((ForestInfo)forestStatusMap.get(forestId)).getHostName();
                    rhUtil.addForestHost(forestHost);
                }
            }
            for (Writable forestId : forestStatusMap.keySet()) {
                ForestInfo fs = (ForestInfo)forestStatusMap.get(forestId);
                List<ForestHost> forestHostList = fs.getReplicas();
                for (int i = 0; i < forestHostList.size(); ++i) {
                    String targetHost;
                    ForestHost fh = forestHostList.get(i);
                    String forestIdStr = fh.getForest();
                    String forestHost = fh.getHostName();
                    String string = targetHost = restrictHosts ? rhUtil.getNextHost(forestHost) : forestHost;
                    if (!fs.getUpdatable()) continue;
                    try {
                        ContentSource cs = (ContentSource)sourceMap.get(targetHost);
                        if (cs == null) {
                            cs = InternalUtilities.getOutputContentSource(conf, targetHost);
                            sourceMap.put(targetHost, cs);
                        }
                        if (!restrictHosts) continue;
                        sourceMap.put(forestHost, cs);
                        continue;
                    }
                    catch (XccConfigException e) {
                        throw new IOException(e);
                    }
                }
            }
        } else {
            TextArrayWritable hosts = this.getHosts(conf);
            for (Writable host : hosts.get()) {
                String hostStr = host.toString();
                try {
                    ContentSource cs = InternalUtilities.getOutputContentSource(conf, hostStr);
                    sourceMap.put(hostStr, cs);
                }
                catch (XccConfigException e) {
                    throw new IOException(e);
                }
            }
        }
        return sourceMap;
    }

    public RecordWriter<DocumentURI, VALUEOUT> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
        Configuration conf = context.getConfiguration();
        this.fastLoad = Boolean.valueOf(conf.get("mapreduce.marklogic.output.content.fastload"));
        Map<String, ContentSource> sourceMap = this.getSourceMap(this.fastLoad, context);
        return new ContentWriter(conf, sourceMap, this.fastLoad, this.am);
    }

    protected LinkedMapWritable getForestStatusMap(Configuration conf) throws IOException {
        String forestHost = conf.get("mapreduce.marklogic.output.hostforests");
        if (forestHost != null) {
            LinkedMapWritable fhmap = (LinkedMapWritable)DefaultStringifier.load((Configuration)conf, (String)"mapreduce.marklogic.output.hostforests", LinkedMapWritable.class);
            String s = conf.get("mapreduce.marklogic.output.assignmentpolicy");
            String mode = conf.get("mapreduce.marklogic.mode", "distributed");
            if ("distributed".equals(mode)) {
                AssignmentPolicy.Kind policy = AssignmentPolicy.Kind.forName(s);
                this.am.initialize(policy, fhmap, conf.getInt("mapreduce.marklogic.output.batchsize", 10));
            }
            return fhmap;
        }
        throw new IOException("Forest host map not found");
    }

    protected boolean initialize(Session session, boolean restrictHosts, boolean getForwardHeader) throws IOException, RequestException {
        String queryText = INIT_QUERY;
        if (getForwardHeader) {
            StringBuilder buf = new StringBuilder();
            buf.append(HEADER_QUERY).append(";\n");
            buf.append(XDBC_HEADER_QUERY).append(";\n");
            buf.append(queryText);
            queryText = buf.toString();
        }
        AdhocQuery query = session.newAdhocQuery(queryText);
        RequestOptions options = new RequestOptions();
        options.setDefaultXQueryVersion("1.0-ml");
        query.setOptions(options);
        ResultSequence result = null;
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("init query: \n" + query.getQuery()));
        }
        result = session.submitRequest((Request)query);
        ResultItem item = result.next();
        if (getForwardHeader) {
            boolean httpForwardHeaderExists = item.asString().equals("true");
            item = result.next();
            boolean xdbcForwardHeaderExists = item.asString().equals("true");
            item = result.next();
            if (httpForwardHeaderExists || xdbcForwardHeaderExists) {
                restrictHosts = true;
                this.conf.setBoolean("mapreduce.marklogic.output.restricthosts", true);
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)"HTTP compliant mode enabled since x-forwarded-for exists");
                }
            } else {
                String inputRestrictHost = this.conf.getTrimmed("mapreduce.marklogic.input.restricthosts");
                if (inputRestrictHost == null || inputRestrictHost.equalsIgnoreCase("false")) {
                    HttpChannel.setUseHTTP((boolean)false);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)"HTTP compliant mode disabled since x-forwarded-for doesn't exist");
                    }
                }
            }
        }
        this.initHostName = item.asString();
        item = result.next();
        this.am.setEffectiveVersion(((XSInteger)item.getItem()).asLong());
        item = result.next();
        this.failover = !restrictHosts && item.asString().equals("true");
        item = result.next();
        this.supportSegment = item.asString().equals("true");
        if (result.hasNext()) {
            item = result.next();
            String policyStr = item.asString();
            this.conf.set("mapreduce.marklogic.output.assignmentpolicy", policyStr);
            this.policy = AssignmentPolicy.Kind.forName(policyStr);
            item = result.next();
            this.allowFastLoad = Boolean.parseBoolean(item.asString());
            if ((this.policy == AssignmentPolicy.Kind.STATISTICAL || this.policy == AssignmentPolicy.Kind.RANGE || this.policy == AssignmentPolicy.Kind.QUERY) && !this.allowFastLoad && this.conf.getBoolean("mapreduce.marklogic.output.content.fastload", false)) {
                throw new IOException("Fastload can't be used: rebalancer is on and forests are imbalanced in a database with statistics-based assignment policy");
            }
        } else {
            this.policy = AssignmentPolicy.Kind.LEGACY;
            this.legacy = true;
        }
        if (this.conf.get("mapreduce.marklogic.output.content.fastload") == null) {
            this.fastLoad = this.conf.get("mapreduce.marklogic.output.content.directory") != null ? (this.conf.get("mapreduce.marklogic.output.partition") == null && (this.policy == AssignmentPolicy.Kind.RANGE || this.policy == AssignmentPolicy.Kind.QUERY) ? false : (this.policy == AssignmentPolicy.Kind.RANGE || this.policy == AssignmentPolicy.Kind.QUERY || this.policy == AssignmentPolicy.Kind.STATISTICAL ? this.allowFastLoad : true)) : false;
        } else {
            this.fastLoad = this.conf.getBoolean("mapreduce.marklogic.output.content.fastload", false);
            if (this.fastLoad && this.conf.get("mapreduce.marklogic.output.partition") == null && (this.policy == AssignmentPolicy.Kind.RANGE || this.policy == AssignmentPolicy.Kind.QUERY)) {
                throw new IllegalArgumentException("output_partition is required for fastload mode.");
            }
        }
        this.conf.setBoolean("mapreduce.marklogic.output.content.fastload", this.fastLoad);
        return restrictHosts;
    }

    protected LinkedMapWritable queryForestInfo(ContentSource cs) throws IOException {
        Session session = null;
        ResultSequence result = null;
        try {
            session = cs.newSession();
            AdhocQuery query = null;
            if (this.legacy) {
                LOG.debug((Object)"Legacy assignment is assumed for older MarkLogic Server.");
                query = session.newAdhocQuery(FOREST_HOST_MAP_QUERY);
            } else {
                query = this.failover ? (this.supportSegment ? session.newAdhocQuery(FOREST_REPLICA_HOST_QUERY_WITH_SEGMENT) : session.newAdhocQuery(FOREST_REPLICA_HOST_QUERY)) : session.newAdhocQuery(FOREST_HOST_QUERY);
                if (this.policy == AssignmentPolicy.Kind.RANGE || this.policy == AssignmentPolicy.Kind.QUERY) {
                    String pName = this.conf.get("mapreduce.marklogic.output.partition");
                    query.setNewStringVariable("partition-name", pName);
                } else {
                    query.setNewStringVariable("partition-name", "");
                }
                query.setNewStringVariable("policy", this.policy.toString().toLowerCase());
            }
            RequestOptions options = new RequestOptions();
            options.setDefaultXQueryVersion("1.0-ml");
            query.setOptions(options);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)query.getQuery());
            }
            result = session.submitRequest((Request)query);
            LinkedMapWritable forestStatusMap = new LinkedMapWritable();
            Text forest = null;
            ArrayList<ForestHost> replicas = new ArrayList<ForestHost>();
            String outputHost = cs.getConnectionProvider().getHostName();
            boolean local = "local".equals(this.conf.get("mapreduce.marklogic.mode"));
            while (result.hasNext()) {
                ResultItem item = result.next();
                if (forest == null) {
                    forest = new Text(item.asString());
                    continue;
                }
                String hostName = item.asString();
                if (local && hostName != null && hostName.equals(this.initHostName)) {
                    hostName = outputHost;
                }
                boolean updatable = true;
                long dc = -1L;
                if (!this.legacy) {
                    if (this.policy == AssignmentPolicy.Kind.BUCKET || this.policy == AssignmentPolicy.Kind.SEGMENT && this.supportSegment) {
                        item = result.next();
                        updatable = Boolean.parseBoolean(item.asString());
                    } else if (this.policy == AssignmentPolicy.Kind.RANGE || this.policy == AssignmentPolicy.Kind.STATISTICAL || this.policy == AssignmentPolicy.Kind.QUERY) {
                        item = result.next();
                        dc = Long.parseLong(item.asString());
                    }
                }
                if (this.failover) {
                    String curForest = "";
                    String curHost = "";
                    int count = 0;
                    while (result.hasNext() && (ItemType.XS_INTEGER != (item = result.next()).getItemType() || ((XSInteger)item.getItem()).asPrimitiveInt() != 0)) {
                        int index = count % 2;
                        if (index == 0) {
                            curForest = item.asString();
                        } else if (index == 1) {
                            curHost = item.asString();
                            ForestHost info = new ForestHost(curForest, curHost);
                            replicas.add(info);
                        }
                        ++count;
                    }
                } else {
                    ForestHost info = new ForestHost(forest.toString(), hostName);
                    replicas.add(info);
                }
                forestStatusMap.put((Writable)forest, new ForestInfo(hostName, dc, updatable, replicas));
                forest = null;
                replicas.clear();
            }
            if (forestStatusMap.size() == 0) {
                throw new IOException("Target database has no forests attached: check forests in database");
            }
            this.am.initialize(this.policy, forestStatusMap, this.conf.getInt("mapreduce.marklogic.output.batchsize", 10));
            LinkedMapWritable linkedMapWritable = forestStatusMap;
            return linkedMapWritable;
        }
        catch (RequestException e) {
            LOG.error((Object)e.getMessage(), (Throwable)e);
            throw new IOException(e);
        }
        finally {
            if (result != null) {
                result.close();
            }
            if (session != null) {
                session.close();
            }
        }
    }
}

