Fusion Working for You: A Custom RSS Crawler JavaScript Stage

One of the most powerful features of Fusion is the built-in JavaScript stage.  However, you shouldn't really think of this stage as merely JavaScript stage.   Fusion uses the Nashorn JavaScript engine, which means you have at your fingertips access to all the Java class libraries used in the application.  What this means is that you can effectively script your own customized Java and/or JavaScript processing stages, and really make the Indexing Pipeline work for you, the way you want it to work. 

So first off, we need to get our feet wet with Nashorn JavaScript.  Nashorn (German for Rhinoceros) was released around the end of 2012.   While much faster than it's predecessor 'Rhino', it also incorporated one long-desired feature:  The combining of Java and JavaScript.  Though two entirely separate languages, these two have, as the saying goes, "had a date with destiny" for a long time. 

 

Hello World

To start with, let us take a look at the most fundamental function in a Fusion JavaScript Stage:

function(doc){
   logger.info("Doc ID: "+doc.getId());
return doc;
}

At the heart of things, this is the most basic function. Note that you will pass in a 'doc' argument, which will always be a PipelineDocument, and you will return that document (or, as an alternative, an array of documents, but that's another story we'll cover in separate article).  The 'id' of this document will be the URL being crawled; and, thanks to the Tika Parser, the 'body' attribute will be the raw XML of our RSS feed. 

To that end, the first thing you'll want to do is open your Pipeline Editor and select the Apache Tika Parser.  Make sure the "Return parsed content as XML or HTML" checkbox is checked.  The Tika Parser in this case will really only be used to initially pull in the RSS XML from the feed you want to crawl.  The remainder of the processing will be handled in our custom JavaScript stage. 

Now let's add the stage to our Indexing Pipeline.  Click "Add Stage" and select "JavaScript" stage from the menu. 

Our function will operate in two stages.  The first stage will pull the raw xml from the document, and use Jsoup to parse the XML and create an java.util.ArrayList of urls to be crawled.  The second phase will take that ArrayList, loop through and crawl each url using Jsoup, and then spin up a CloudSolrClient to save the results.   

So now that we've defined our processes, let's show the work:

The overall architecture of our function will be to create two nested functions within the main function that will handle the processing.  The super-structure will look like this:

 

    function(doc){
        var processedDocs = java.util.ArrayList;
        
          var parseXML = function(doc){
               return docList;
           }
           processedDocs = parseXML(doc);

          var saveCrawledUrls = function(docList){
        
            return docList;
        }
       saveCrawledUrls(processedDocs);
       return doc;
     }

 So what is happening here is that we're taking in the PipelineDocument, parsing the XML and pulling out the URLs, and passing that off to a separate method that will crawl the list.  One point of note:  the "processedDocs" variable declared at the top of the function is a Java ArrayList.  This is a simple example of Nashorn's Java/JavaScript interoperability. 

 

  Parsing the XML

 var jsoupXmlParser = function(doc){
     var Jsoup = org.jsoup.Jsoup;
     var jdoc = org.jsoup.nodes.Document;
     var ex   = java.lang.Exception;
     var Parser = org.jsoup.parser.Parser;
     var element = org.jsoup.Element;
     var xmlstr = java.lang.String;
     var docs = java.util.ArrayList;
     var outdocs = java.util.ArrayList;
     var pipelineDoc = com.lucidworks.apollo.common.pipeline.PipelineDocument;
     var docurl = java.lang.String;
     var elements = org.jsoup.select.Elements;
     var extractedText = java.lang.String;
     var ele = org.jsoup.Element;
       
     
     try{
         docs = new java.util.ArrayList();
         
         xmlstr = doc.getFirstFieldValue("body");
         jdoc = Jsoup.parse(xmlstr, "", Parser.xmlParser());
         for each(element in jdoc.select("loc")) {
             docurl = element.ownText();
             if(docurl !== null && docurl !== ""){
             logger.info("Parsed URL: "+element.ownText());
             pipelineDoc = new com.lucidworks.apollo.common.pipeline.PipelineDocument(element.ownText());
             docs.add(pipelineDoc);
             }
             
          }
          
          outdocs = new java.util.ArrayList;
          // now crawl each doc in the feed
          for each(pipelineDoc in docs){
              docurl = pipelineDoc.getId();
              jdoc = Jsoup.connect(docurl).get();
              if(jdoc !== null){
                  logger.info("FOUND a JSoup document for url  "+doc.getId());
                  extractedText = new java.lang.String();
                   elements = jdoc.select("p");
                        logger.info("ITERATE OVER ELEMENTS");
                        // then parse elements and pull just the text
                        for each (ele in elements) {
                            if (ele !== null && ele.ownText() != null) {
                                    extractedText += ele.ownText();
                            }
                        }
                        pipelineDoc.addField('content_text', extractedText);
                        //pipelineDoc.addField("_raw_content_", jdoc.toString());
                        pipelineDoc.addMetadata("Content-Type", "text/html");
                        logger.info("Extracted: "+extractedText);
                        outdocs.add(pipelineDoc);
                  
              } else {
                  logger.warn("Jsoup Document was NULL **** ");
              }
          }
     }catch(ex){
         logger.error(ex);
     }
     return outdocs;
 }

 So in the above function the first step is to parse the raw XML into a Jsoup Document.  From there, we iterate over the elements found in the document (jdoc.select("loc"))   Once we have a list of urls, we pass that on to a bit of script that loops through this list and uses Jsoup to extract all the text from the 'p' elements (jdoc.select("p"))

Once we've extracted the text, we spin up a new PipelineDocument and set whatever fields are relevant to our collection.  Here I've used "content_text," but really you can use whatever fields you find appropriate.  Note that I've commented out saving the raw text.   You want to avoid putting raw text into your collection unless you have a specific need to do so.   It's best to just extract all the critical data/metadata and discard the raw text. 

 

Saving the Results to Solr

Moving forward, now that we have our list of crawled pipeline documents, we're going to want to save them to the Solr index.   This is done by spinning up a CloudSolrClient in our JavaScript stage, like so:

 

var solrCloudClient = function(doc){
       var client = org.apache.http.client.HttpClient;
       var cloudServer = org.apache.solr.client.solrj.impl.CloudSolrClient;
       var DefaultHttpClient = org.apache.http.impl.client.DefaultHttpClient;
       var ClientConnectionManager = org.apache.http.conn.ClientConnectionManager;
       var PoolingClientConnectionManager = org.apache.http.impl.conn.PoolingClientConnectionManager;
       var CloudSolrClient = org.apache.solr.client.solrj.impl.CloudSolrClient;
       var cm = org.apache.http.impl.conn.PoolingClientConnectionManager;
       var String = java.lang.String;
       var pdoc  = com.lucidworks.apollo.common.pipeline.PipelineDocument;
       
       var ZOOKEEPER_URL = new String("localhost:9983");
       var DEFAULT_COLLECTION = new String("cityofsacramento");
       var server = ZOOKEEPER_URL;
       var collection = DEFAULT_COLLECTION;
       var docList = java.util.ArrayList;
       var inputDoc = org.apache.solr.common.SolrInputDocument;
       var pingResp = org.apache.solr.client.solrj.response.SolrPingResponse;
       var res = org.apache.solr.client.solrj.response.UpdateResponse;
       var SolrInputDocument = org.apache.solr.common.SolrInputDocument;
       var UUID = java.util.UUID;
         
       
       try{
           // PoolingClientConnectionManager cm = new PoolingClientConnectionManager();
            cm = new PoolingClientConnectionManager();
            client = new DefaultHttpClient(cm);
            cloudServer = new CloudSolrClient(server, client);
            cloudServer.setDefaultCollection(collection);
            logger.info("CLOUD SERVER INIT OK...");
            docList = new java.util.ArrayList();
            pingResp = cloudServer.ping();
            logger.info(pingResp);
            docList = new java.util.ArrayList();
            for each(pdoc in doc){
                inputDoc = new SolrInputDocument();
                inputDoc.addField("id", UUID.randomUUID().toString());
                inputDoc.addField("q_txt", pdoc.getFirstFieldValue("extracted_text"));
                docList.add(inputDoc);
            }
            
            logger.info(" DO SUBMIT OF "+docList.size()+" DOCUMENTS TO SOLR **** ");
            cloudServer.add(docList);
            res = cloudServer.commit();
            logger.info(res);
            
           
       }catch(ex){
           logger.error(ex);
       }
     
     return doc;
 }
    

Here you can see we make extensive use of Nashorn's Java/JavaScript interoperability. For all practical intents and purposes this is a Java class running in a JavaScript content.  Note the rather lengthy stack of declarations at the top of this method.  In any case, what we're doing here is spinning up a CloudSolrClient, iterating over our PipelineDocument ArrayList, and turning the Pipeline documents into SolrInputDocuments and then committing them as a batch to Solr.  

 

Putting It All Together

function(doc){
       var parsedDocs = java.util.ArrayList;
    
      
     var jsoupXmlParser = function(doc){
     var Jsoup = org.jsoup.Jsoup;
     var jdoc = org.jsoup.nodes.Document;
     var ex   = java.lang.Exception;
     var Parser = org.jsoup.parser.Parser;
     var element = org.jsoup.Element;
     var xmlstr = java.lang.String;
     var docs = java.util.ArrayList;
     var outdocs = java.util.ArrayList;
     var pipelineDoc = com.lucidworks.apollo.common.pipeline.PipelineDocument;
     var docurl = java.lang.String;
     var elements = org.jsoup.select.Elements;
     var extractedText = java.lang.String;
     var ele = org.jsoup.Element;

     
     try{
         docs = new java.util.ArrayList();
         
         xmlstr = doc.getFirstFieldValue("body");
         jdoc = Jsoup.parse(xmlstr, "", Parser.xmlParser());
         for each(element in jdoc.select("loc")) {
             docurl = element.ownText();
             if(docurl !== null && docurl !== ""){
             logger.info("Parsed URL: "+element.ownText());
             pipelineDoc = new com.lucidworks.apollo.common.pipeline.PipelineDocument(element.ownText());
             docs.add(pipelineDoc);
             }
             
          }
          
          outdocs = new java.util.ArrayList();
          // now crawl each doc in the feed
          for each(pipelineDoc in docs){
              docurl = pipelineDoc.getId();
              jdoc = Jsoup.connect(docurl).get();
              if(jdoc !== null){
                  logger.info("FOUND a JSoup document for url  "+doc.getId());
                  extractedText = new java.lang.String();
                   elements = jdoc.select("p");
                        logger.info("ITERATE OVER ELEMENTS");
                        // then parse elements and pull just the text
                        for each (ele in elements) {
                            if (ele !== null) {
                                if (ele.ownText() !== null) {
                                    extractedText += ele.ownText();
                                }
                            }
                        }
                        pipelineDoc.addField('extracted_text', extractedText);
                        logger.info("Extracted: "+extractedText);
                        outdocs.add(pipelineDoc);
                  
              } else {
                  logger.warn("Jsoup Document was NULL **** ");
              }
          }
     }catch(ex){
         logger.error(ex);
     }
     return outdocs;
 };
 
   parsedDocs = jsoupXmlParser(doc);
   logger.info(" SUBMITTING "+parsedDocs.size()+" to solr index... ****** ");
   
 var solrCloudClient = function(doc){
       var client = org.apache.http.client.HttpClient;
       var cloudServer = org.apache.solr.client.solrj.impl.CloudSolrClient;
       var DefaultHttpClient = org.apache.http.impl.client.DefaultHttpClient;
       var ClientConnectionManager = org.apache.http.conn.ClientConnectionManager;
       var PoolingClientConnectionManager = org.apache.http.impl.conn.PoolingClientConnectionManager;
       var CloudSolrClient = org.apache.solr.client.solrj.impl.CloudSolrClient;
       var cm = org.apache.http.impl.conn.PoolingClientConnectionManager;
       var String = java.lang.String;
       var pdoc  = com.lucidworks.apollo.common.pipeline.PipelineDocument;
       
       var ZOOKEEPER_URL = new String("localhost:9983");
       var DEFAULT_COLLECTION = new String("cityofsacramento");
       var server = ZOOKEEPER_URL;
       var collection = DEFAULT_COLLECTION;
       var docList = java.util.ArrayList;
       var inputDoc = org.apache.solr.common.SolrInputDocument;
       var pingResp = org.apache.solr.client.solrj.response.SolrPingResponse;
       var res = org.apache.solr.client.solrj.response.UpdateResponse;
       var SolrInputDocument = org.apache.solr.common.SolrInputDocument;
       var UUID = java.util.UUID;
         
       
       try{
           // PoolingClientConnectionManager cm = new PoolingClientConnectionManager();
            cm = new PoolingClientConnectionManager();
            client = new DefaultHttpClient(cm);
            cloudServer = new CloudSolrClient(server, client);
            cloudServer.setDefaultCollection(collection);
            logger.info("CLOUD SERVER INIT OK...");
            docList = new java.util.ArrayList();
            pingResp = cloudServer.ping();
            logger.info(pingResp);
            docList = new java.util.ArrayList();
            for each(pdoc in doc){
                inputDoc = new SolrInputDocument();
                inputDoc.addField("id", UUID.randomUUID().toString());
                inputDoc.addField("q_txt", pdoc.getFirstFieldValue("extracted_text"));
                docList.add(inputDoc);
            }
            
            logger.info(" DO SUBMIT OF "+docList.size()+" DOCUMENTS TO SOLR **** ");
            cloudServer.add(docList);
            res = cloudServer.commit();
            logger.info(res);
            
           
       }catch(ex){
           logger.error(ex);
       }
     
     return doc;
 };
 
 
    solrCloudClient(parsedDocs);
    logger.info("RSS CRAWL COMPLETE...");
    return doc;
}

And that's really all there is to it.  This implementation has been tested on Fusion 2.4.2. 

Have more questions? Submit a request

0 Comments

Please sign in to leave a comment.
Powered by Zendesk