Getting Started with Streaming Expressions!

This article is dedicated to provide a basic foundation to use Streaming expressions which is introduced in Solr 6.x. We provide an introduction, discuss use-cases and challenges which comes along with it.

Introduction:

Streaming Expressions provide a simple yet powerful stream processing language for SolrCloud. We can combine various functions to perform robust parallel tasks. Please refer the official Apache Solr guide to explore all the available functionalities and understand the terminologies used below: Streaming Expressions

Streaming requests and response: SE provide /stream handler which takes stream expression as an input and emit tuples as output, in JSON format. The fields values which are exported from streaming should be DocValues, basically whatever we put in "fl” in the stream requests.
Streaming sources: functions to fetch or index content from/to Solr indexes
Streaming decorators: functions to process on the content fetched by Streaming sources.

There are use-cases where we need to fetch entire/big-size results of single/multiple indexes to do complex computations. Streaming expressions achieve this by using /export handler which emit results in form of tuples (rows). Data returned from indexes, is read from disk. If the amount is large, it may result in swapping in Operating System which can cause system delay or even failure. Streaming expressions can emit up-to multiple K rows per node per second which helps achieving high parallel computation tasks and expressions generated from Solr can be joined with streams originated from outside/third party applications.

Use-cases:

To discuss what we can do with SE, we have three collections as below:

vehicle:
{"id":"v-03","v_id":"v-03","model_name":"m-03"}
{"id":"v-01","v_id":"v-01","model_name":"m-01"}
{"id":"v-02","v_id":"v-02","model_name":"m-02"}
{"id":"v-04","v_id":"v-04","model_name":"m-04"}

defects:
{"id":"d-03","v_id":"v-01","defect_id":"d-03"}
{"id":"d-04","v_id":"v-01","defect_id":"d-04"}
{"id":"d-06","v_id":"v-03","defect_id":"d-06"}
{"id":"d-07","v_id":"v-04","defect_id":"d-07"}
{"id":"d-10","v_id":"v-10","defect_id":"d-10"}
{"id":"d-01","v_id":"v-02","defect_id":"d-01"}
{"id":"d-02","v_id":"v-01","defect_id":"d-02"}
{"id":"d-05","v_id":"v-03","defect_id":"d-05"}

"vehicle" collection depicts vehicle-id and its model name while "defects" depicts vehicle id and the defect-id associated with it.

gettingstarted:
{"id":"1","model_name":"M","model_no":"5","master_s":"master string","worker_s":"string"}
{"id":"2","model_name":"N","model_no":"6","master_s":"amrit","worker_s":"sarkar"}
{"id":"3","model_name":"O","model_no":"4","master_s":"united states of america","worker_s":"states"}

A sample collection with arbitrary values.

Sample queries to start with:

Searching the entire index of "gettingstarted" collection exporting fields "id" and "model_name"

search(gettingstarted,zkHost="localhost:9983",qt="/export",q="*:*", fl="id,model_name",sort="id asc")

Rolling up entire index of "gettingstarted" collection and getting count of unique "model_name" values

rollup(search(gettingstarted,zkHost="localhost:9983",qt="/export",q="*:*", fl="id,model_name",sort="id asc"),over=model_name,count(*))

As evident from screenshot above, 'search' is a stream source (blue) while 'rollup' is a decorator (pink)

Complex queries: querying along different collections performing multiple operations

Getting total number of defects count associated with vehicle's model name:

rollup(hashJoin(search(vehicle, q=*:*, fl="v_id,model_name",qt="/export", sort="v_id asc"),
hashed=search(defects, q=*:*, fl="v_id,defect_id",qt="/export", sort="v_id asc"),
on="v_id"),over="model_name",count(*))

Search both collections "defects" and "vehicle", join both result-sets on "v_id" and rolling up over "model_name" will give us the desired results.


Renaming 'count(*)' to a more soothing field name for the previous use-case discussed:

select(rollup(hashJoin(search(vehicle, q=*:*, fl="v_id,model_name",qt="/export", sort="v_id asc"), hashed=search(defects, q=*:*, fl="v_id,defect_id",qt="/export", sort="v_id asc"), on="v_id"),over="model_name",count(*)),count(*) as defect_count_s,model_name as model_name_s)

Indexing the result-set of previous use-case discussed to a new collection 'destinationCollection':

-------- pre-execute the following commands --------
--- create a new collection 'destinationCollection' ---

solr/bin/solr delete -c destinationCollection
solr/bin/solr create -c destinationCollection -shards 2 -replicationFactor 2
curl http://localhost:8983/solr/destinationCollection/config -H 'Content-type:application/json' -d '{
"set-property": {
"updateHandler.autoCommit.maxTime":1000,
"updateHandler.autoCommit.openSearcher":true
}}'

----------------------------------------------------------

update(destinationCollection,batchSize=5,select(rollup(hashJoin(search(vehicle, q=*:*, fl="v_id,model_name",qt="/export", sort="v_id asc"), hashed=search(defects, q=*:*, fl="v_id,defect_id",qt="/export", sort="v_id asc"),
on="v_id"),over="model_name",count(*)),count(*) as defect_count_s,model_name as model_name_s))

'update' function act as datasource (violet) to feed the streams in batches (5 in this case) to collection 'destinationCollection' for the result-set produced.

Parallelising the Expressions:

Before we move to a sample example, go through the official cwiki page: Parallel Stream Decorator to understand it fully.

parallel(defects,search(defects, q=*:*, fl="id,defect_id,v_id", sort="id desc", partitionKeys="id"), workers="4", zkHost="localhost:9983", sort="v_id desc")

Quoting from official cwiki page:

"The parallel function requires that the partitionKeys parameter be provided to the underlying searches. The partitionKeys parameter will partition the search results (tuples) across the worker nodes. Tuples with the same values in the partitionKeys field will be shuffled to the same worker nodes.
The parallel function maintains the sort order of the tuples returned by the worker nodes, so the sort criteria of the parallel function must match up with the sort order of the tuples returned by the workers."

It seems like a straightforward requirement which can be fulfilled but we will discuss the challenges after introducing workerCollection.

Significance of Worker-Collections:

All the datasources, decorators, metric operations are very useful but what personally caught attention is worker collection in parallel stream-decorators.

Created "workerCollection" collection with 10 shards, 2 replicas each, total 20 nodes.

parallel(workerCollection,search(defects, q=*:*, fl="id,defect_id,v_id", sort="id desc", partitionKeys="id"),workers="20",zkHost="localhost:9983",sort="v_id desc")

Fascinating! We have indexed data on one collection while we are executing the streaming expressions on other collection which neither has any data nor its configuration matters a bit.

Now suppose we have a solr cluster and multiple machines available where we can setup solr nodes; we get 4 machines for 4 solr nodes with X gb heap memory allocated to them and create a collection around those solr nodes; we get 4 more machines of 2X gb heap memory allocated and we use it as the front-faced client collection aka worker collection. We index in one collection while we perform the heavy-duty operations with querying in "worker collection".

We, in Lucidworks, are not sure when the Master-Slave infrastructure will be introduced in Solr Cloud, but streaming expressions has made it possible today, virtually!

Challenges which comes along with Parallel stream decorator:

We were performing the following stream when we deviated to explain the above two:

update(destinationCollection,batchSize=5,select(rollup(hashJoin(search(vehicle, q=*:*, fl="v_id,model_name",qt="/export", sort="v_id asc"), hashed=search(defects, q=*:*, fl="v_id,defect_id",qt="/export", sort="v_id asc"),
on="v_id"),over="model_name",count(*)),count(*) as defect_count_s,model_name as model_name_s))

Let's introduce parallel and worker collection to execute these particular query:

parallel(workerCollection,update(destinationCollection,batchSize=5,select(rollup(hashJoin(search(vehicle, q=*:*, fl="id,v_id,model_name",qt="/export", sort="v_id asc",partitionKeys="id"), hashed=search(defects, q=*:*, fl="id,v_id,defect_id",qt="/export", sort="v_id asc",partitionKeys="id"),on="v_id"),over="model_name",count(*)),count(*) as defect_count_s,model_name as model_name_s)),workers="10",zkHost="localhost:9983",sort="v_id desc")

Looks good, right? But the fact is we have messed up!

parallel(workerCollection,update(destinationCollection,batchSize=5,select(rollup(hashJoin(search(vehicle, q=*:*, fl="id,v_id,model_name",qt="/export", sort="v_id asc",partitionKeys="id"), hashed=search(defects, q=*:*, fl="id,v_id,defect_id",qt="/export", sort="v_id asc",partitionKeys="id"),on="v_id"),over="model_name",count(*)),count(*) as defect_count_s,model_name as model_name_s)),workers="10",zkHost="localhost:9983",sort="v_id desc")

Both the search operations are partitioned on field "id". If we pick the first parallel stream which is getting executed on worker-1 and analyse that:

Partitioning means result-set of same hashed-value of particular key value will be computed in the stream of the same worker. Stream on worker-1 will have result-set of 'defects' and 'vehicle' whose hashed-value of 'id' has computed to be same. So what are the odds when we join both the result-sets, not on 'id' field but on 'v_id' field and get the desired results? Other way to put it, do the result-set calculated for collection 'vehicle' will have all the tuples needed to do a successful join with result-set of collection 'defects' and not in other worker nodes? Answer is a Big NO! It is highly unlikely we will get the exact match/associated results for both result-set when we have partitioned on one key and joining on other just because it is not necessary we will get all the 'v_id's on one worker node to do the join. Solution would be to specify the partition key to be same on which we are are joining 'v_id', but then what about we are rolling up on i.e. 'model_name'? Do the stream on each worker node will be divided such that the all the tuples with same 'model_name' be placed in the stream of same worker node? We cannot specify two partition keys, so what could be the robust solution?

Well the solution is simply, parallel decorator cannot be applied with every other stream decorator in every case. 

The use-cases which involves lots of complex operations revolving around multiple operations, think once-twice-thrice before parallelising the entire streaming expression like above.

Correct updated stream:

update(destinationCollection,batchSize=5,select(rollup(hashJoin(parallel(workerCollection,search(vehicle, q=*:*, fl="id,v_id,model_name",qt="/export", sort="v_id asc",partitionKeys="id"),workers="5",zkHost="localhost:9983",sort="v_id asc"), hashed=parallel(workerCollection,search(defects, q=*:*, fl="id,v_id,defect_id",qt="/export", sort="v_id asc",partitionKeys="id"),workers="5",zkHost="localhost:9983",sort="v_id asc"), on="v_id"),over="model_name",count(*)),count(*) as defect_count_s,model_name as model_name_s))

We leave the reader to conclude why this stream expression will be sophisticated-enough to get the job done. If you think of a better one, kindly provide the same on the comments' section.

Use-case for 'daemon' stream decorator to be listed shortly.

NOTE: The index-backup of solr collections discussed are uploaded in the article.

Have more questions? Submit a request

0 Comments

Please sign in to leave a comment.
Powered by Zendesk