/*
 * Decompiled with CFR 0.152.
 */
package com.marklogic.contentpump;

import com.marklogic.contentpump.ContentWithFileNameWritable;
import com.marklogic.contentpump.ImportRecordReader;
import com.marklogic.contentpump.RDFWritable;
import com.marklogic.contentpump.utilities.FileIterator;
import com.marklogic.contentpump.utilities.IdGenerator;
import com.marklogic.contentpump.utilities.PermissionUtil;
import com.marklogic.mapreduce.LinkedMapWritable;
import com.marklogic.mapreduce.utilities.InternalUtilities;
import com.marklogic.xcc.AdhocQuery;
import com.marklogic.xcc.ContentCapability;
import com.marklogic.xcc.ContentPermission;
import com.marklogic.xcc.ContentSource;
import com.marklogic.xcc.Request;
import com.marklogic.xcc.RequestOptions;
import com.marklogic.xcc.ResultSequence;
import com.marklogic.xcc.Session;
import com.marklogic.xcc.exceptions.RequestException;
import com.marklogic.xcc.exceptions.XccConfigException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Vector;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.jena.graph.Node;
import org.apache.jena.graph.Triple;
import org.apache.jena.query.Dataset;
import org.apache.jena.query.DatasetFactory;
import org.apache.jena.rdf.model.Literal;
import org.apache.jena.rdf.model.RDFNode;
import org.apache.jena.rdf.model.Resource;
import org.apache.jena.rdf.model.Statement;
import org.apache.jena.rdf.model.StmtIterator;
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.RDFParserBuilder;
import org.apache.jena.riot.lang.RiotParsers;
import org.apache.jena.riot.system.AsyncParser;
import org.apache.jena.riot.system.ErrorHandler;
import org.apache.jena.riot.system.ParserProfile;
import org.apache.jena.riot.system.RiotLib;
import org.apache.jena.riot.system.StreamRDF;
import org.apache.jena.riot.system.StreamRDFLib;
import org.apache.jena.sparql.core.DatasetGraph;
import org.apache.jena.sparql.core.Quad;

public class RDFReader<VALUEIN>
extends ImportRecordReader<VALUEIN> {
    public static final Log LOG = LogFactory.getLog(RDFReader.class);
    public static final String HASHALGORITHM = "SHA-256";
    public static final String DEFAULT_GRAPH = "http://marklogic.com/semantics#default-graph";
    public static final String JENA_DEFAULT_GRAPH = "urn:x-arq:DefaultGraphNode";
    protected static Pattern[] patterns = new Pattern[]{Pattern.compile("&"), Pattern.compile("<"), Pattern.compile(">")};
    protected int MAXTRIPLESPERDOCUMENT = 100;
    protected int MAXGRAPHSPERREQUEST = 100;
    protected int countPerBatch = 0;
    protected long INMEMORYTHRESHOLD = 1024000L;
    protected long INGESTIONNOTIFYSTEP = 10000L;
    protected Dataset dataset = null;
    protected StmtIterator statementIter = null;
    protected Iterator<String> graphNameIter = null;
    protected String collection = null;
    protected RunnableParser jenaStreamingParser = null;
    protected Iterator rdfIter;
    protected Stream rdfInputStream;
    protected Lang lang;
    protected Hashtable<String, Vector> collectionHash = new Hashtable();
    protected int collectionCount = 0;
    private static final int MAX_COLLECTIONS = 100;
    protected boolean ignoreCollectionQuad = false;
    protected boolean hasOutputCol = false;
    protected String outputGraph;
    protected String outputOverrideGraph;
    protected StringBuilder buffer;
    protected boolean hasNext = true;
    protected IdGenerator idGen;
    protected Random random = new Random();
    protected long randomValue = this.random.nextLong();
    protected long milliSecs;
    private long HASH64_STEP = 15485863L;
    protected String origFn;
    protected String inputFn;
    protected long splitStart;
    protected long start;
    protected long pos;
    protected long end;
    protected boolean compressed;
    protected long ingestedTriples = 0L;
    protected HashSet<String> newGraphs;
    protected HashMap<String, ContentPermission[]> existingMapPerms;
    protected Iterator<String> graphItr;
    protected String version;
    protected LinkedMapWritable roleMap;
    protected ContentPermission[] defaultPerms;
    protected StringBuilder graphQry;
    protected boolean roleMapExists;
    protected boolean graphSupported;
    private static final Object jenaLock = new Object();

    public RDFReader(String version, LinkedMapWritable roleMap) {
        Calendar cal = Calendar.getInstance();
        this.milliSecs = cal.getTimeInMillis();
        this.compressed = false;
        this.version = version;
        this.roleMap = roleMap;
        this.roleMapExists = roleMap != null && roleMap.size() > 0;
        this.graphQry = new StringBuilder();
        this.existingMapPerms = new HashMap();
        this.newGraphs = new HashSet();
    }

    @Override
    public void close() throws IOException {
        LOG.info((Object)("Ingested " + this.ingestedTriples + " triples from " + this.origFn));
        if (this.rdfInputStream != null) {
            this.rdfInputStream.close();
        }
        this.dataset = null;
        if (this.graphQry.length() == 0) {
            return;
        }
        this.submitGraphQuery();
    }

    protected void submitGraphQuery() throws IOException {
        try (Session session = null;){
            ContentSource cs = InternalUtilities.getOutputContentSource(this.conf, this.conf.getStrings("mapreduce.marklogic.output.host")[0]);
            session = cs.newSession();
            RequestOptions options = new RequestOptions();
            options.setDefaultXQueryVersion("1.0-ml");
            session.setDefaultRequestOptions(options);
            AdhocQuery query = session.newAdhocQuery(this.graphQry.toString());
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)this.graphQry.toString());
            }
            query.setOptions(options);
            session.submitRequest((Request)query);
        }
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        if (!this.hasNext) {
            return 1.0f;
        }
        return this.pos > this.end ? 1.0f : (float)(this.pos - this.start) / (float)(this.end - this.start);
    }

    @Override
    public void initialize(InputSplit inSplit, TaskAttemptContext context) throws IOException, InterruptedException {
        Object localValue;
        String fnAsColl;
        if (this.version == null) {
            throw new IOException("Server Version is null");
        }
        String majorVersion = this.version.substring(0, this.version.indexOf(46));
        this.graphSupported = Integer.valueOf(majorVersion) >= 8;
        this.conf = context.getConfiguration();
        String rdfopt = this.conf.get("rdf_streaming_memory_threshold");
        if (rdfopt != null) {
            this.INMEMORYTHRESHOLD = Long.parseLong(rdfopt);
        }
        if ((rdfopt = this.conf.get("rdf_triples_per_document")) != null) {
            this.MAXTRIPLESPERDOCUMENT = Integer.parseInt(rdfopt);
        }
        if ((fnAsColl = this.conf.get("mapreduce.marklogic.output.filenameascollection")) != null) {
            LOG.warn((Object)"The -filename_as_collection has no effect with input_type RDF, use -output_collections instead.");
        }
        String[] collections = this.conf.getStrings("mapreduce.marklogic.output.content.collection");
        this.outputGraph = this.conf.get("mapreduce.marklogic.output.rdf.graph");
        this.outputOverrideGraph = this.conf.get("mapreduce.marklogic.output.rdf.overridegraph");
        this.ignoreCollectionQuad = this.outputGraph == null && collections != null || this.outputOverrideGraph != null;
        this.hasOutputCol = collections != null;
        Class<RDFWritable> valueClass = RDFWritable.class;
        this.value = localValue = ReflectionUtils.newInstance(valueClass, (Configuration)this.conf);
        this.encoding = this.conf.get("mapreduce.marklogic.output.content.encoding", "UTF-8");
        this.setFile(((FileSplit)inSplit).getPath());
        this.fs = this.file.getFileSystem(context.getConfiguration());
        FileStatus status = this.fs.getFileStatus(this.file);
        if (status.isDirectory()) {
            this.iterator = new FileIterator((FileSplit)inSplit, context);
            inSplit = (InputSplit)this.iterator.next();
        }
        try {
            this.initStream(inSplit);
        }
        catch (IOException e) {
            LOG.error((Object)("Invalid input: " + this.file.getName() + ": " + e.getMessage()));
            throw e;
        }
        String[] perms = this.conf.getStrings("mapreduce.marklogic.output.content.permission");
        if (perms != null) {
            this.defaultPerms = PermissionUtil.getPermissions(perms).toArray(new ContentPermission[perms.length >> 1]);
        } else {
            List<ContentPermission> tmp = PermissionUtil.getDefaultPermissions(this.conf, this.roleMap);
            if (tmp != null) {
                this.defaultPerms = tmp.toArray(new ContentPermission[tmp.size()]);
            }
        }
        if (this.roleMapExists) {
            this.initExistingMapPerms();
        }
    }

    protected void initStream(InputSplit inSplit) throws IOException, InterruptedException {
        FSDataInputStream in = this.openFile(inSplit, false);
        if (in == null) {
            return;
        }
        long size = inSplit.getLength();
        this.initParser(this.file.toUri().toASCIIString(), size);
        this.parse(this.file.getName(), in);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void initParser(String fsname, long size) throws IOException {
        int pos;
        this.start = 0L;
        this.pos = 0L;
        this.end = 1L;
        this.jenaStreamingParser = null;
        this.dataset = null;
        this.statementIter = null;
        this.graphNameIter = null;
        String ext = null;
        if (fsname.contains(".") && ".gz".equals(ext = fsname.substring(pos = fsname.lastIndexOf(".")))) {
            ext = (pos = (fsname = fsname.substring(0, pos)).lastIndexOf(".")) >= 0 ? fsname.substring(pos) : null;
        }
        this.origFn = fsname;
        this.inputFn = Long.toHexString(this.fuse(this.scramble(this.random.nextLong()), this.fuse(this.scramble(this.milliSecs), this.random.nextLong())));
        this.idGen = new IdGenerator(this.inputFn + "-" + this.splitStart);
        this.lang = null;
        this.lang = ".rdf".equals(ext) ? Lang.RDFXML : (".ttl".equals(ext) ? Lang.TURTLE : (".json".equals(ext) ? Lang.RDFJSON : (".n3".equals(ext) ? Lang.N3 : (".nt".equals(ext) ? Lang.NTRIPLES : (".nq".equals(ext) ? Lang.NQUADS : (".trig".equals(ext) ? Lang.TRIG : Lang.RDFXML))))));
        Object object = jenaLock;
        synchronized (object) {
            if (size < this.INMEMORYTHRESHOLD) {
                this.dataset = DatasetFactory.createTxnMem();
            }
        }
    }

    protected void parse(String fsname, FSDataInputStream in) throws IOException {
        try {
            this.loadModel(fsname, (InputStream)in);
        }
        catch (Exception e) {
            LOG.error((Object)("Failed to parse(please check intactness and encoding): " + this.origFn));
        }
    }

    protected void loadModel(String fsname, InputStream in) throws IOException {
        if (this.dataset == null) {
            this.jenaStreamingParser = new RunnableParser(this.origFn, fsname, in, this.lang);
            this.jenaStreamingParser.run();
        } else {
            StreamRDF dest = StreamRDFLib.dataset((DatasetGraph)this.dataset.asDatasetGraph());
            ParserErrorHandler handler = new ParserErrorHandler(fsname);
            try {
                AsyncParser.of((RDFParserBuilder)RDFParserBuilder.create().source(in).lang(this.lang).errorHandler((ErrorHandler)handler).base(fsname)).asyncParseSources(dest);
            }
            catch (Throwable e) {
                LOG.error((Object)("Parse error in RDF document(please check intactness and encoding); skipping this document:" + fsname + " " + e.getMessage()));
            }
            in.close();
            this.graphNameIter = this.dataset.listNames();
            this.statementIter = this.dataset.getDefaultModel().listStatements();
        }
        this.pos = 0L;
        this.end = 1L;
    }

    protected void write(String str) {
        if (this.buffer == null) {
            this.buffer = new StringBuilder();
        }
        this.buffer.append(str);
    }

    private long rotl(long x, long y) {
        return x << (int)y ^ x >> (int)(64L - y);
    }

    private long fuse(long a, long b) {
        return this.rotl(a, 8L) ^ b;
    }

    private long scramble(long x) {
        return x ^ this.rotl(x, 20L) ^ this.rotl(x, 40L);
    }

    private long hash64(long value, String str) {
        char[] arr = str.toCharArray();
        for (int i = 0; i < str.length(); ++i) {
            value = (value + (long)Character.getNumericValue(arr[i])) * this.HASH64_STEP;
        }
        return value;
    }

    protected String resource(Node rsrc) {
        if (rsrc.isBlank()) {
            return "http://marklogic.com/semantics/blank/" + Long.toHexString(this.hash64(this.fuse(this.scramble(this.milliSecs), this.randomValue), rsrc.getBlankNodeLabel()));
        }
        return RDFReader.escapeXml(rsrc.toString());
    }

    protected String resource(Node rsrc, String tag) {
        String uri = this.resource(rsrc);
        return "<sem:" + tag + ">" + uri + "</sem:" + tag + ">";
    }

    private String resource(Resource rsrc) {
        if (rsrc.isAnon()) {
            return "http://marklogic.com/semantics/blank/" + Long.toHexString(this.hash64(this.fuse(this.scramble(this.milliSecs), this.randomValue), rsrc.getId().getLabelString()));
        }
        return RDFReader.escapeXml(rsrc.toString());
    }

    protected String resource(Resource rsrc, String tag) {
        String uri = this.resource(rsrc);
        return "<sem:" + tag + ">" + uri + "</sem:" + tag + ">";
    }

    protected String subject(Node subj) {
        return this.resource(subj, "subject");
    }

    protected String subject(Resource subj) {
        return this.resource(subj, "subject");
    }

    protected String predicate(Node subj) {
        return this.resource(subj, "predicate");
    }

    protected String predicate(Resource subj) {
        return this.resource(subj, "predicate");
    }

    protected String object(Node node) {
        if (node.isLiteral()) {
            String text = node.getLiteralLexicalForm();
            Object type = node.getLiteralDatatypeURI();
            Object lang = node.getLiteralLanguage();
            lang = lang == null || "".equals(lang) ? "" : " xml:lang='" + RDFReader.escapeXml((String)lang) + "'";
            if ("".equals(lang)) {
                if (type == null) {
                    type = "http://www.w3.org/2001/XMLSchema#string";
                }
                type = " datatype='" + RDFReader.escapeXml((String)type) + "'";
            } else {
                type = "";
            }
            return "<sem:object" + (String)type + (String)lang + ">" + RDFReader.escapeXml(text) + "</sem:object>";
        }
        if (node.isBlank()) {
            return "<sem:object>http://marklogic.com/semantics/blank/" + Long.toHexString(this.hash64(this.fuse(this.scramble(this.milliSecs), this.randomValue), node.getBlankNodeLabel())) + "</sem:object>";
        }
        return "<sem:object>" + RDFReader.escapeXml(node.toString()) + "</sem:object>";
    }

    private String object(RDFNode node) {
        if (node.isLiteral()) {
            Literal lit = node.asLiteral();
            String text = lit.getString();
            Object lang = lit.getLanguage();
            Object type = lit.getDatatypeURI();
            if ("".equals(lang = lang == null || "".equals(lang) ? "" : " xml:lang='" + RDFReader.escapeXml((String)lang) + "'")) {
                if (type == null) {
                    type = "http://www.w3.org/2001/XMLSchema#string";
                }
                type = " datatype='" + RDFReader.escapeXml((String)type) + "'";
            } else {
                type = "";
            }
            return "<sem:object" + (String)type + (String)lang + ">" + RDFReader.escapeXml(text) + "</sem:object>";
        }
        if (node.isAnon()) {
            return "<sem:object>http://marklogic.com/semantics/blank/" + Long.toHexString(this.hash64(this.fuse(this.scramble(this.milliSecs), this.randomValue), node.toString())) + "</sem:object>";
        }
        return "<sem:object>" + RDFReader.escapeXml(node.toString()) + "</sem:object>";
    }

    protected static String escapeXml(String _in) {
        if (null == _in) {
            return "";
        }
        return patterns[2].matcher(patterns[1].matcher(patterns[0].matcher(_in).replaceAll("&amp;")).replaceAll("&lt;")).replaceAll("&gt;");
    }

    protected void setKey() {
        this.setKey(this.idGen.incrementAndGet() + ".xml", 0, 0, true);
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        boolean result = false;
        if (this.jenaStreamingParser == null || !this.jenaStreamingParser.failed()) {
            result = this.statementIter == null ? this.nextStreamingKeyValue() : this.nextInMemoryKeyValue();
        }
        return result;
    }

    public void initExistingMapPerms() throws IOException {
        Session session = null;
        ResultSequence result = null;
        try {
            ContentSource cs = InternalUtilities.getOutputContentSource(this.conf, this.conf.getStrings("mapreduce.marklogic.output.host")[0]);
            session = cs.newSession();
            RequestOptions options = new RequestOptions();
            options.setDefaultXQueryVersion("1.0-ml");
            session.setDefaultRequestOptions(options);
            StringBuilder sb = new StringBuilder();
            sb.append("xquery version \"1.0-ml\";\n");
            sb.append("for $doc in fn:collection(\"http://marklogic.com/semantics#graphs\")");
            sb.append("return (fn:base-uri($doc),for $p in $doc/*:graph/*:permissions/*:permission return ($p/*:role-id/text(),$p/*:capability/text()),\"0\")");
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)sb.toString());
            }
            AdhocQuery query = session.newAdhocQuery(sb.toString());
            query.setOptions(options);
            result = session.submitRequest((Request)query);
            while (result.hasNext()) {
                String uri = result.next().asString();
                String tmp = result.next().asString();
                ArrayList<ContentPermission> perms = new ArrayList<ContentPermission>();
                while (!tmp.equals("0")) {
                    Text roleid = new Text(tmp);
                    if (!result.hasNext()) {
                        throw new IOException("Invalid role map");
                    }
                    String roleName = this.roleMap.get(roleid).toString();
                    String cap = result.next().asString();
                    ContentCapability capability = PermissionUtil.getCapbility(cap);
                    perms.add(new ContentPermission(capability, roleName));
                    tmp = result.next().asString();
                }
                this.existingMapPerms.put(uri, perms.toArray(new ContentPermission[perms.size()]));
            }
        }
        catch (XccConfigException e) {
            throw new IOException(e);
        }
        catch (RequestException e) {
            throw new IOException(e);
        }
        finally {
            if (result != null) {
                result.close();
            }
            if (session != null) {
                session.close();
            }
        }
    }

    public ContentPermission[] insertGraphDoc(String graph) throws IOException {
        ArrayList perms = new ArrayList();
        ContentPermission[] permissions = this.defaultPerms;
        StringBuilder sb = this.graphQry;
        if (this.countPerBatch >= this.MAXGRAPHSPERREQUEST) {
            this.countPerBatch = 0;
            this.submitGraphQuery();
            this.graphQry.setLength(0);
        }
        String escapedGraph = RDFReader.escapeXml(graph);
        sb.append("if(fn:empty(fn:doc(\"").append(escapedGraph).append("\"))) then sem:create-graph-document(sem:iri(\"").append(escapedGraph).append("\"),(");
        if (permissions != null && permissions.length > 0) {
            for (int i = 0; i < permissions.length; ++i) {
                ContentPermission cp = permissions[i];
                if (i > 0) {
                    sb.append(",");
                }
                sb.append("xdmp:permission(\"");
                sb.append(cp.getRole());
                sb.append("\",\"");
                sb.append(cp.getCapability());
                sb.append("\")");
            }
            sb.append(")");
        } else {
            sb.append("xdmp:default-permissions())");
        }
        sb.append(") else ();\n");
        ++this.countPerBatch;
        return perms.toArray(new ContentPermission[0]);
    }

    public boolean nextInMemoryKeyValue() throws IOException, InterruptedException {
        if (this.lang == Lang.NQUADS || this.lang == Lang.TRIG) {
            return this.nextInMemoryQuadKeyValue();
        }
        return this.nextInMemoryTripleKeyValue();
    }

    public boolean nextInMemoryTripleKeyValue() throws IOException, InterruptedException {
        if (this.statementIter == null) {
            return false;
        }
        try {
            if (!this.statementIter.hasNext()) {
                this.hasNext = false;
                return false;
            }
        }
        catch (Exception ex) {
            LOG.error((Object)("Parse error in RDF document(please check intactness and encoding); skipping this document: " + this.origFn + " " + ex.getMessage()));
            return false;
        }
        this.setKey();
        this.write("<sem:triples xmlns:sem='http://marklogic.com/semantics'>\n");
        this.write("<sem:origin>" + this.origFn + "</sem:origin>\n");
        for (int max = this.MAXTRIPLESPERDOCUMENT; max > 0 && this.statementIter.hasNext(); --max) {
            Statement stmt = this.statementIter.nextStatement();
            this.write("<sem:triple>");
            this.write(this.subject(stmt.getSubject()));
            this.write(this.predicate((Resource)stmt.getPredicate()));
            this.write(this.object(stmt.getObject()));
            this.write("</sem:triple>\n");
            this.notifyUser();
        }
        this.write("</sem:triples>\n");
        if (!this.statementIter.hasNext()) {
            this.pos = 1L;
        }
        this.writeValue();
        return true;
    }

    public boolean nextInMemoryQuadKeyValue() throws IOException, InterruptedException {
        if (this.ignoreCollectionQuad) {
            return this.nextInMemoryQuadKeyValueIgnoreCollections();
        }
        return this.nextInMemoryQuadKeyValueWithCollections();
    }

    public boolean nextInMemoryQuadKeyValueWithCollections() throws IOException, InterruptedException {
        if (this.statementIter == null) {
            return false;
        }
        try {
            while (!this.statementIter.hasNext()) {
                if (this.graphNameIter.hasNext()) {
                    this.collection = this.graphNameIter.next();
                    this.statementIter = this.dataset.getNamedModel(this.collection).listStatements();
                    continue;
                }
                this.hasNext = false;
                this.collection = null;
                return false;
            }
        }
        catch (Exception ex) {
            LOG.error((Object)("Parse error in RDF document(please check intactness and encoding); skipping this document: " + this.origFn + " " + ex.getMessage()));
            return false;
        }
        this.setKey();
        this.write("<sem:triples xmlns:sem='http://marklogic.com/semantics'>");
        for (int max = this.MAXTRIPLESPERDOCUMENT; max > 0 && this.statementIter.hasNext(); --max) {
            Statement stmt = this.statementIter.nextStatement();
            this.write("<sem:triple>");
            this.write(this.subject(stmt.getSubject()));
            this.write(this.predicate((Resource)stmt.getPredicate()));
            this.write(this.object(stmt.getObject()));
            this.write("</sem:triple>");
            this.notifyUser();
        }
        this.write("</sem:triples>\n");
        if (!this.statementIter.hasNext()) {
            this.pos = 1L;
        }
        this.writeValue(this.collection);
        return true;
    }

    public boolean nextInMemoryQuadKeyValueIgnoreCollections() throws IOException, InterruptedException {
        if (this.statementIter == null) {
            return false;
        }
        try {
            while (!this.statementIter.hasNext()) {
                if (this.graphNameIter.hasNext()) {
                    this.collection = this.graphNameIter.next();
                    this.statementIter = this.dataset.getNamedModel(this.collection).listStatements();
                    continue;
                }
                this.hasNext = false;
                return false;
            }
        }
        catch (Exception ex) {
            LOG.error((Object)("Parse error in RDF document(please check intactness and encoding); skipping this document: " + this.origFn + " " + ex.getMessage()));
            return false;
        }
        this.setKey();
        this.write("<sem:triples xmlns:sem='http://marklogic.com/semantics'>");
        int max = this.MAXTRIPLESPERDOCUMENT;
        while (max > 0 && this.statementIter.hasNext()) {
            Statement stmt = this.statementIter.nextStatement();
            this.write("<sem:triple>");
            this.write(this.subject(stmt.getSubject()));
            this.write(this.predicate((Resource)stmt.getPredicate()));
            this.write(this.object(stmt.getObject()));
            this.write("</sem:triple>");
            this.notifyUser();
            --max;
            boolean moreTriples = this.statementIter.hasNext();
            while (!moreTriples) {
                moreTriples = true;
                if (!this.graphNameIter.hasNext()) continue;
                this.collection = this.graphNameIter.next();
                this.statementIter = this.dataset.getNamedModel(this.collection).listStatements();
                moreTriples = this.statementIter.hasNext();
            }
        }
        this.write("</sem:triples>\n");
        if (!this.statementIter.hasNext()) {
            this.pos = 1L;
        }
        this.writeValue();
        return true;
    }

    public boolean nextStreamingKeyValue() throws IOException, InterruptedException {
        if (this.rdfIter == null) {
            return false;
        }
        if (!this.rdfIter.hasNext() && this.collectionHash.size() == 0) {
            if (this.compressed) {
                this.hasNext = false;
                return false;
            }
            if (this.iterator != null && this.iterator.hasNext()) {
                this.close();
                this.initStream((InputSplit)this.iterator.next());
            } else {
                this.hasNext = false;
                return false;
            }
        }
        if (this.lang == Lang.NQUADS || this.lang == Lang.TRIG) {
            return this.nextStramingQuadKeyValue();
        }
        return this.nextStreamingTripleKeyValue();
    }

    protected boolean nextStreamingTripleKeyValue() throws IOException, InterruptedException {
        if (this.rdfIter == null) {
            return false;
        }
        this.setKey();
        this.write("<sem:triples xmlns:sem='http://marklogic.com/semantics'>");
        try {
            for (int max = this.MAXTRIPLESPERDOCUMENT; max > 0 && this.rdfIter.hasNext(); --max) {
                Triple triple = (Triple)this.rdfIter.next();
                this.write("<sem:triple>");
                this.write(this.subject(triple.getSubject()));
                this.write(this.predicate(triple.getPredicate()));
                this.write(this.object(triple.getObject()));
                this.write("</sem:triple>");
                this.notifyUser();
            }
            this.write("</sem:triples>\n");
        }
        catch (Exception ex) {
            LOG.error((Object)("Parse error in RDF document(please check intactness and encoding); skipping this document: " + this.origFn + " " + ex.getMessage()));
            return false;
        }
        if (!this.rdfIter.hasNext()) {
            this.pos = 1L;
        }
        this.writeValue();
        return true;
    }

    public boolean nextStramingQuadKeyValue() throws IOException, InterruptedException {
        if (this.ignoreCollectionQuad) {
            return this.nextStreamingQuadKeyValueIgnoreCollections();
        }
        return this.nextStreamingQuadKeyValueWithCollections();
    }

    protected boolean nextStreamingQuadKeyValueIgnoreCollections() throws IOException, InterruptedException {
        if (this.rdfIter == null) {
            return false;
        }
        this.setKey();
        this.write("<sem:triples xmlns:sem='http://marklogic.com/semantics'>");
        try {
            for (int max = this.MAXTRIPLESPERDOCUMENT; max > 0 && this.rdfIter.hasNext(); --max) {
                Quad quad = (Quad)this.rdfIter.next();
                this.write("<sem:triple>");
                this.write(this.subject(quad.getSubject()));
                this.write(this.predicate(quad.getPredicate()));
                this.write(this.object(quad.getObject()));
                this.write("</sem:triple>");
                this.notifyUser();
            }
            this.write("</sem:triples>\n");
        }
        catch (Exception ex) {
            LOG.error((Object)("Parse error in RDF document(please check intactness and encoding); skipping this document: " + this.origFn + " " + ex.getMessage()));
            return false;
        }
        if (!this.rdfIter.hasNext()) {
            this.pos = 1L;
        }
        this.writeValue();
        return true;
    }

    public boolean nextStreamingQuadKeyValueWithCollections() throws IOException, InterruptedException {
        if (this.rdfIter == null) {
            return false;
        }
        if (!this.rdfIter.hasNext() && this.collectionHash.isEmpty()) {
            this.hasNext = false;
            return false;
        }
        String collection = null;
        boolean overflow = false;
        try {
            while (!overflow && this.rdfIter.hasNext()) {
                Quad quad = (Quad)this.rdfIter.next();
                Node graph = quad.getGraph();
                collection = graph == null ? DEFAULT_GRAPH : this.resource(quad.getGraph());
                String triple = this.subject(quad.getSubject()) + this.predicate(quad.getPredicate()) + this.object(quad.getObject());
                if (!this.collectionHash.containsKey(collection)) {
                    this.collectionHash.put(collection, new Vector());
                    ++this.collectionCount;
                }
                Vector triples = this.collectionHash.get(collection);
                triples.add("<sem:triple>" + triple + "</sem:triple>");
                if (triples.size() == this.MAXTRIPLESPERDOCUMENT) {
                    overflow = true;
                    continue;
                }
                if (this.collectionCount <= 100) continue;
                collection = this.largestCollection();
                overflow = true;
            }
        }
        catch (Exception ex) {
            LOG.error((Object)("Parse error in RDF document(please check intactness and encoding); skipping this document:" + this.origFn + " " + ex.getMessage()));
            return false;
        }
        if (!overflow && (ex = this.collectionHash.keySet().iterator()).hasNext()) {
            String c;
            collection = c = ex.next();
        }
        Vector triples = this.collectionHash.get(collection);
        this.setKey();
        this.write("<sem:triples xmlns:sem='http://marklogic.com/semantics'>");
        for (String t : triples) {
            this.write(t);
            this.notifyUser();
        }
        this.write("</sem:triples>\n");
        this.collectionHash.remove(collection);
        --this.collectionCount;
        if (!this.rdfIter.hasNext()) {
            this.pos = 1L;
        }
        this.writeValue(collection);
        return true;
    }

    public void writeValue() throws IOException {
        this.writeValue(null);
    }

    public void writeValue(String collection) throws IOException {
        if (this.value instanceof Text) {
            ((Text)this.value).set(this.buffer.toString());
        } else if (this.value instanceof RDFWritable) {
            ((RDFWritable)this.value).set(this.buffer.toString());
            if (collection != null && collection.equals(JENA_DEFAULT_GRAPH)) {
                collection = null;
            }
            if (this.hasOutputCol) {
                if (this.outputOverrideGraph != null) {
                    collection = this.outputOverrideGraph;
                } else if (this.outputGraph != null) {
                    if (collection == null) {
                        collection = this.outputGraph;
                    }
                } else {
                    String[] outCols = this.conf.getStrings("mapreduce.marklogic.output.content.collection");
                    collection = outCols[0];
                }
            } else if (collection == null) {
                String string = collection = this.outputGraph != null ? this.outputGraph : this.outputOverrideGraph;
                if (collection == null) {
                    collection = DEFAULT_GRAPH;
                }
            }
            if (this.roleMapExists && this.existingMapPerms.containsKey(collection)) {
                ((RDFWritable)this.value).setPermissions(this.existingMapPerms.get(collection));
            } else {
                ((RDFWritable)this.value).setPermissions(this.defaultPerms);
            }
            if (this.graphSupported && !this.newGraphs.contains(collection)) {
                this.newGraphs.add(collection);
                this.insertGraphDoc(collection);
            }
            ((RDFWritable)this.value).setGraph(collection);
        } else {
            ((Text)((ContentWithFileNameWritable)this.value).getValue()).set(this.buffer.toString());
        }
        this.buffer.setLength(0);
    }

    protected String largestCollection() {
        String collection = "";
        int count = -1;
        for (String c : this.collectionHash.keySet()) {
            if (this.collectionHash.get(c).size() <= count) continue;
            count = this.collectionHash.get(c).size();
            collection = c;
        }
        return collection;
    }

    protected void notifyUser() {
        ++this.ingestedTriples;
        if (this.ingestedTriples % this.INGESTIONNOTIFYSTEP == 0L) {
            LOG.info((Object)("Ingested " + this.ingestedTriples + " triples from " + this.origFn));
        }
    }

    protected class RunnableParser {
        final String fsname;
        final InputStream in;
        final String origFn;
        final Lang lang;
        private boolean failed = false;

        public RunnableParser(String origFn, String fsname, InputStream in, Lang lang) {
            this.fsname = fsname;
            this.in = in;
            this.origFn = origFn;
            this.lang = lang;
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("O:" + origFn + " : " + fsname));
            }
        }

        public boolean failed() {
            return this.failed;
        }

        public void run() {
            try {
                ParserErrorHandler handler = new ParserErrorHandler(this.fsname);
                ParserProfile prof = RiotLib.profile((Lang)this.lang, (String)this.fsname, (ErrorHandler)handler);
                if (this.lang == Lang.TRIG) {
                    RDFReader.this.rdfInputStream = AsyncParser.of((RDFParserBuilder)RDFParserBuilder.create().source(this.in).lang(this.lang).errorHandler((ErrorHandler)handler).base(this.fsname)).streamQuads();
                    RDFReader.this.rdfIter = RDFReader.this.rdfInputStream.iterator();
                } else if (this.lang == Lang.NTRIPLES) {
                    RDFReader.this.rdfIter = RiotParsers.createIteratorNTriples((InputStream)this.in, (ParserProfile)prof);
                } else if (this.lang == Lang.NQUADS) {
                    RDFReader.this.rdfIter = RiotParsers.createIteratorNQuads((InputStream)this.in, (ParserProfile)prof);
                } else {
                    RDFReader.this.rdfInputStream = AsyncParser.of((RDFParserBuilder)RDFParserBuilder.create().source(this.in).lang(this.lang).errorHandler((ErrorHandler)handler).base(this.fsname)).streamTriples();
                    RDFReader.this.rdfIter = RDFReader.this.rdfInputStream.iterator();
                }
            }
            catch (Exception ex) {
                this.failed = true;
                LOG.error((Object)("Parse error in RDF document(please check intactness and encoding); skipping this document: " + this.origFn + " " + ex.getMessage()));
            }
        }
    }

    protected class ParserErrorHandler
    implements ErrorHandler {
        String inputfn = "";

        public ParserErrorHandler(String inputfn) {
            this.inputfn = inputfn;
        }

        private String formatMessage(String message, long line, long col) {
            String msg = this.inputfn + ":";
            if (line >= 0L) {
                msg = msg + line;
            }
            if (line >= 0L && col >= 0L) {
                msg = msg + ":" + col;
            }
            msg = msg + " " + message;
            return msg;
        }

        public void warning(String message, long line, long col) {
            if (message.contains("Bad IRI:") || message.contains("Illegal character in IRI") || message.contains("Not advised IRI")) {
                LOG.debug((Object)this.formatMessage(message, line, col));
            } else {
                LOG.warn((Object)this.formatMessage(message, line, col));
            }
        }

        public void error(String message, long line, long col) {
            if (message.contains("Bad character in IRI") || message.contains("Problem setting StAX property")) {
                LOG.debug((Object)this.formatMessage(message, line, col));
            } else {
                LOG.error((Object)this.formatMessage(message, line, col));
            }
        }

        public void fatal(String message, long line, long col) {
            LOG.fatal((Object)this.formatMessage(message, line, col));
        }
    }
}

