Streaming Aggregation For SolrCloud

 

Introduction

This blog describes the new Streaming Aggregation API for SolrCloud. The initial implementation of this framework is available now as part of the Heliosearch project.

The Streaming API allows SolrCloud to perform operations that were typically performed in aggregation engines. But there are some very real advantages to using the Streaming API.

  • Responsiveness: The Heliosearch Streaming API pushes the record selection, sorting and stream partitioning (shuffling) down into the data store itself. So when the Tuples hit the network they are already selected, sorted, partitioned and headed directly to the correct reducer. Heliosearch’s distributed sorting engine begins streaming Tuples to the network within milli-seconds of receiving the request.

    This design can lead to sub-second response times for many aggregation use cases.

  • Aggregation over Search Results: The Streaming API operates on search results so you have tremendous power to precisely select which records to aggregate.

  • Lighter Software Stack: SolrCloud is a robust and easy to deploy distributed search engine. People may find it easier to setup and scale than other distributed aggregation tools which often involve multiple software stacks.

  • Scales Up and Scales Down: The Streaming API is designed to be effective and easy to use on a single server or on a large cluster.

  • Big Data Pojo’s (Plain Old Java Objects): The Streaming API is a simple client library modeled after the ubiquitous java.io streaming API. It’s easy to plug into any java application and it’s very easy to add your own custom TupleStream decorators.

 

Tuples, TupleStreams and Sorted Streams

The Streaming API treats search results as a stream of Tuples. A Tuple is a thin wrapper around a Map of key/value pairs that represent a record. The base interface of the Streaming API is the TupleStream. The TupleStream API exposes a simple interface for opening, reading and closing a stream of Tuples.

Another key concept is the sorted stream. Since Solr 4.10 / Heliosearch 0.08, Solr has the capability to efficiently export entire sorted result sets. The Streaming API translates that into a sorted stream of Tuples. This allows the Streaming API to perform fast, memory efficient set operations (union, intersect, complement, join, unique etc…) on sorted TupleStreams.
 

SolrStream

The first TupleStream implementation we’ll review is the SolrStream. The SolrStream is a TupleStream from a single Solr server. Below is sample code for instantiating and reading from a SolrStream:


String baseUrl = "http://localhost:8983/solr/collection1";
Map params = new HashMap();
params.put("q","Hello world");
params.put("fl", "id,price");
params.put("sort","price asc");
params.put("qt", "/export");

SolrStream stream = new SolrStream(baseUrl, params);
try {
  stream.open();
  Tuple tuple = null;
  while(true){
    tuple = stream.read();

    if(tuple.EOF) {
      break;
    }

    String id    = tuple.getString("id");
    Double price = tuple.getDouble("price");
  }
} finally {
  stream.close();
}

Let’s review this code.

The first line sets up the baseUrl of the Solr server that we are connecting to.

Then the query parameters are added to a map. Note the qt parameter which points to “/export” (New in Solr 4.10). This sets the query type to use the “/export” request handler. The “/export” request handler efficiently streams entire sorted result sets.

If you don’t specify the “/export” handler, the request will go to the default /select handler, which will work fine for smaller result sets.

After the parameters are setup, the SolrStream is constructed. Then the stream is opened, and each Tuple is read from the stream until the EOF Tuple is read. Finally the stream is closed.
 

Stream Decorators

Once you have a stream flowing you can perform operations on the stream by wrapping it with Decorator streams. This is known as the Decorator pattern and will be very familiar to anyone who has used the java.io streaming API.

There are typically two types of operations that Stream decorators perform:

  • Streaming Transformations. These types of operations transform one or more TupleStreams. Examples of these operations include streaming set operations such as: Union, Intersect, Complement, Unique, Group By, and Join.

    Streaming set operations can be efficient on very large data sets when the sets are sorted.

  • Streaming Aggregation. These types of operations gather metrics from an underlying tuple stream. These metrics include operations such as: Count, Sum, Ave, Min, Max etc…

    In the initial release there is support for two aggregation models:

    In the first model, metrics are gathered on dynamic buckets (facets) in memory as the stream flows by. In this model the stream itself is not transformed by the metric gathering.

    In the second model, the underlying TupleStream is actually transformed or “rolled up” into aggregates as it flows. This technique relies on the sort order of the tuples and can be used to efficiently gather metrics on high cardinality fields.

 

Streaming Set Operations

The UniqueStream is a simple example of a decorator stream that takes advantage of the sort order to perform the unique operation. The UniqueStream removes duplicate Tuples from a TupleStream based on a Comparator.

The UniqueStream relies on SolrCloud’s sorting engine to sort the Tuples on the fields being compared.

Here is how it works:

String baseUrl = "http://localhost:8983/solr/collection1";
Map params = new HashMap();
params.put("q","Hello world");
params.put("fl", "id,price,fieldA");
params.put("sort","fieldA asc");
params.put("qt", "/export");

SolrStream stream = new SolrStream(baseUrl, params);
UniqueStream ustream = new UniqueStream(stream, new AscFieldComp("fieldA"))

try {
  ustream.open();
  Tuple tuple = null;
  while(true){
    tuple = ustream.read();

    if(tuple.EOF) {
      break;
    }

    String id    = tuple.getString("id");
    Double price = tuple.getDouble("price");
    String fieldA = tuple.getString("fieldA");
  }
} finally {
  ustream.close();
}

Notice three things about this code snippet:

1) fieldA has been added to the field list.
2) The sort is on fieldA
3) The UniqueStream has been wrapped around the SolrStream. The UniqueStream is passed a Comparator that compares the value of fieldA.

In this example the stream coming from Solr will be sorted by fieldA. The UniqueStream can then easily de-duplicate the Tuples, based on the value of fieldA, as it iterates over the TupleStream.

Another stream that is included is the FilterStream. The FilterStream only includes the Tuples in streamA that are also in streamB. This is an implementation of set intersection.

Here is the sample syntax:


/*
* Create streamA which queries collection1.
*/
String baseUrlA = "http://localhost:8983/solr/collection1";
Map paramsA = new HashMap();
paramsA.put("q","Hello world");
paramsA.put("fl", "id,price,fieldA");
paramsA.put("sort","fieldA asc");
paramsA.put("qt", "/export");
SolrStream streamA = new SolrStream(baseUrlA, paramsA);

/*
* Create streamB which queries collection2
*/
String baseUrlB = "http://localhost:8983/solr/collection2";
Map paramsB = new HashMap();
paramsB.put("q","foo");
paramsB.put("fl", "fieldA");
paramsB.put("sort","fieldA asc");
paramsB.put("qt", "/export");
SolrStream streamB = new SolrStream(baseUrlB, paramsB);

Comparator comp =  new AscFieldComp("fieldA");

UniqueStream ustream = new UniqueStream(streamB, comp)
FilterStream fstream = new FilterStream(streamA, ustream, comp);

try {
  fstream.open();
  Tuple tuple = null;
  while(true){
    tuple = fstream.read();

    if(tuple.EOF) {
      break;
    }

    String id    = tuple.getString("id");
    Double price = tuple.getDouble("price");
    String fieldA = tuple.getString("fieldA");
  }
} finally {
  fstream.close();
}

Here is a quick review of the above code snippet:

Two SolrStreams were created from two different collections. Both streams are sorted on fieldA.

Then streamB is wrapped in a UniqueStream, which will create a unique stream of Tuples based on fieldA. Then the FilterStream is used to filter streamA based on the uniqued Tuples in streamB.
 

Streaming Aggregation

TupleStream Decorators can also be used to gather metrics on underlying streams.

There are two stream implementations in the initial release that provide different models for gathering metrics: the MetricStream and RollupStream. This blog will explain the MetricStream in some detail. The RollupStream, which is designed specifically for high cardinality faceting, will be covered in another blog.

The MetricStream wraps an underlying stream and gathers metrics for dynamic buckets (facets) in memory as the tuples flow through it. This technique works great when all the buckets and metrics can fit in memory. The RollupStream can be used when the buckets/metrics cannot fit in memory.

The MetricStream can be configured to collect any number of metrics for an arbitrary set of nested buckets. Both bucketing and metrics are pluggable, and it’s very easy to develop custom bucketing and metric logic.

Below is an example of how the MetricStream is applied:

/*
* Create streamA which queries collection1.
*/
String baseUrlA = "http://localhost:8983/solr/collection1";
Map paramsA = new HashMap();
paramsA.put("q","Hello world");
paramsA.put("fl", "id,price,fieldA");
paramsA.put("sort","fieldA asc");
paramsA.put("qt", "/export");
SolrStream streamA = new SolrStream(baseUrlA, paramsA);

/*
* Create streamB which queries collection2
*/
String baseUrlB = "http://localhost:8983/solr/collection2";
Map paramsB = new HashMap();
paramsB.put("q","foo");
paramsB.put("fl", "fieldA");
paramsB.put("sort","fieldA asc");
paramsB.put("qt", "/export");
SolrStream streamB = new SolrStream(baseUrlB, paramsB);

Comparator comp =  new AscFieldComp("fieldA");

UniqueStream ustream = new UniqueStream(streamB, comp)
FilterStream fstream = new FilterStream(streamA, ustream, comp);


Bucket[] buckets = {new Bucket("fieldA")};

Metric[] metrics = {new SumMetric("price", false),
                    new MeanMetric("price", false),
                    new CountMetric(),
                    new MinMetric("price", false),
                    new MaxMetric("price", false)};
   
MetricStream mstream = new MetricStream(fstream, buckets, metrics, "metric1", new DescBucketComp(0),5);

try {
  mstream.open();
  Tuple tuple = null;
  while(true){
    tuple = mstream.read();

    if(tuple.EOF) {
      break;
    }

    String id    = tuple.getString("id");
    Double price = tuple.getDouble("price");
    String fieldA = tuple.getString("fieldA");
  }
   
  BucketMetrics[] bucketMetrics = mstream.getBucketMetrics();

  for(BucketMetrics bucketMetrics : bucketMetrics) {
     //Loop through the bucketMetrics and pull out the bucket and the metrics.
     String bucket = bucketMetrics.getKey().toString();
     Metric[] metrics = bucketMetrics.getMetrics(); 
     double sumPrice = metrics[0].getValue();
     double meanPrice = metrics[1].getValue();     
  }

} finally {
  mstream.close();
}

In the example above the MetricStream is gathering metrics on a filtered tuple stream. This shows how streaming aggregation can be combined with streaming set operations.

Below are some more details about the MetricStream example.

The MetricStream is passed an array of Buckets. The array of buckets represent a dynamic hierarchical facet structure. The base Bucket implementation simply derives the bucket by pulling a field from the tuples as they stream through. The base Bucket implementation can be sub-classed to do things like range faceting or to programmatically scrub the facets.

The MetricStream is also passed an array of Metrics. The Metric implementations gather the metric data from the underlying stream. The metrics are computed for each of the dynamic hierarchical buckets. Custom Metric implementations can be created to gather custom metrics over the dynamic buckets.

The MetricStream is also passed a Comparator and topN parameter. The MetricStream will use these to return only the topN buckets. There are BucketComparators provided that allow you to sort Buckets based on any of the Metrics that are gathered.

After all the Tuples are read, the topN BucketMetrics are returned from the MetricStream. A BucketMetric class holds all the metric data for a specific bucket.

 

Distributed Searching, Sorting and Streaming: CloudSolrStream

For large data sets, more storage and more searching, sorting and streaming power is needed. Enter CloudSolrStream, which provides distributed searching, sorting and streaming.

CloudSolrStream is a TupleStream from a single SolrCloud collection. CloudSolrStream is SolrCloud aware so you only have to pass it the zkHost and the collection and it will handle all the rest. Here is the prior example using CloudSolrStream rather then SolrStream.

/*
* Create streamA which queries collection1.
*/
String zkHost = "localhost:9983";
Map paramsA = new HashMap();
paramsA.put("q","Hello world");
paramsA.put("fl", "id,price,fieldA");
paramsA.put("sort","fieldA asc");
paramsA.put("qt", "/export");
CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);

/*
* Create streamB which queries collection2
*/
Map paramsB = new HashMap();
paramsB.put("q","foo");
paramsB.put("fl", "fieldA");
paramsB.put("sort","fieldA asc");
paramsB.put("qt", "/export");
CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection2", paramsB);

Comparator comp =  new AscFieldComp("fieldA");

UniqueStream ustream = new UniqueStream(streamB, comp)
FilterStream fstream = new FilterStream(streamA, ustream, comp);


try {
  ftream.open();
  Tuple tuple = null;
  while(true){
    tuple = ftream.read();

    if(tuple.EOF) {
      break;
    }

    String id    = tuple.getString("id");
    Double price = tuple.getDouble("price");
    String fieldA = tuple.getString("fieldA");
  }
  

} finally {
  fstream.close();
}

Notice how easy it is to switch from SolrStream to CloudSolrStream. That’s because all TupleStreams are interchangeable.

Under the covers, CloudSolrStream retrieves the cluster state from ZooKeeper and selects replicas from each shard to send the query to. Then it merges the sorted results from the shards based on the sort parameter. The result is a single sorted tuple stream that can be operated on exactly like a SolrStream.

 

A Bottleneck Rears It’s Ugly Head!

Collecting a CloudSolrStream in one place will likely create a bottleneck. Let’s explore the nature of this bottleneck before we look at how to solve it.

There are two forces working together on the stream. There is a pushing force and a pulling force. When these forces are equal the stream is moving as fast it can.

The pushing forces are the SolrCloud search nodes. These services are pushing tuples as fast as they can to an output stream.

The pulling force is the TupleStream, which is reading tuples as fast as it can from an input stream.

Let’s make a concrete example of this.

Let’s say a Solr node can push tuples out at a max rate of 400,000 docs per second. And that the TupleStream can read Tuples at 400,000 docs per second.

When we have a single Solr node pushing documents to a single TupleStream, we have equilibrium. In this case the documents will be processed at around 400,000 documents per second.

What happens when we move to CloudSolrStream? Let’s say we have a collection with 10 shards. The total pushing power of those ten servers is 10*400,000, or 4 million docs per second.

That’s great! We’ve increased our pushing power by 10!

Unfortunately our pulling power is still only 400,000, actually slightly less because we’ve added friction on the pulling end. The friction is the extra operations it takes to merge the 10 streams. So now we only can read at 390,000 docs per second.

Our total throughput has dropped!

Let’s dig deeper into this bottleneck though to understand it further.

The Solr cluster is capable of pushing out 4 million documents per second. But only 390,000 documents per second are read. How will this bottleneck effect the Solr cluster?

Each Solr server will write as fast at it can to the output stream until it fills up the output stream buffers and then they will block. This is the nature of blocking IO. So for 90% of the time the Solr cluster will essentially be idle as it waits for the TupleStream to catch up.

We can use that idle time wisely.
 

Distributed Set Operations and Aggregation: ParallelStream

The ParallelStream is used to partition TupleStreams across a number of worker nodes. The partitioning is done based on partition keys. All Tuples with the same partition keys will be sent to the same worker. The partitioned TupleStreams are also sorted. This sorting and partitioning of Tuples is similar in nature to a map/reduce “shuffle”.

Heliosearch’s Streaming API pushes this shuffling step down into the search engine itself using extremely fast, hash partitioning query filters.

Heliosearch has now become a very powerful searching and shuffling, distributed file system.

Below is sample code for working with the ParallelStreams:

/*
* Define the number of distributed workers
*/

int numWorkers = 7;

/*
* Create streamA which queries collection1.
*/
String zkHost = "localhost:9983";
Map paramsA = new HashMap();
paramsA.put("q","Hello world");
paramsA.put("fl", "id,price,fieldA");
paramsA.put("sort","fieldA asc");
paramsA.put("qt", "/export");
paramsA.put("partitionKeys", "fieldA");
CloudSolrStream streamA = new CloudSolrStream(zkHost, "collection1", paramsA);

/*
* Create streamB which queries collection2
*/
Map paramsB = new HashMap();
paramsB.put("q","foo");
paramsB.put("fl", "fieldA");
paramsB.put("sort","fieldA asc");
paramsB.put("qt", "/export");
paramsB.put("partitionKeys","fieldA");
CloudSolrStream streamB = new CloudSolrStream(zkHost, "collection2", paramsB);

Comparator comp =  new AscFieldComp("fieldA");

UniqueStream ustream = new UniqueStream(streamB, comp)
FilterStream fstream = new FilterStream(streamA, ustream, comp);


ParallelStream pstream = new ParallelStream(zkHost, "collection3", ftream, numWorkers, comp);

try {
  pstream.open();
  Tuple tuple = null;
  while(true){
    tuple = pstream.read();

    if(tuple.EOF) {
      break;
    }

    String id    = tuple.getString("id");
    Double price = tuple.getDouble("price");
    String fieldA = tuple.getString("fieldA");
  }
} finally {
  pstream.close();
}

First let’s explore what the ParallelStream is actually doing under-the-covers.

  1. ParallelStream is passed (in the constructor) a zkHost, a collection to select the workers from and the number of workers. The ParallelStream then uses SolrCloud’s published state to choose the active worker nodes.

    This is a very flexible approach that allows for SolrCloud collections that only have workers and no-data. .

  2. The ParallelStream then contacts the workers, which have a new SolrCloud request handler called a StreamHandler.

    ParallelStream serializes the TupleStream (also passed in the constructor), and sends it to each worker’s StreamHandler. The StreamHandler deserializes the TupleStream and then prepares the hash partitioning filter.

  3. Notice that there is a new parameter named partitionKeys. This parameter tells the hash partitioning filter which fields to use to partition the stream. Each StreamHandler is also passed the number of workers (N) and is assigned a workerNumber 0-N. Using this information a hash partitioning filter is then added to the queries that ensures that all partitionKeys of the same value, are sent to the same worker.

  4. After the partitioning is setup, the TupleStream is opened, read until completed and then closed on each worker. Tuples from each worker are written to the output stream where they are read in by the ParallelStream which performs the final merge.

Now let’s review some important aspects of the example code:

  1. The partitionKey is defined as “fieldA”. This will ensure that all Tuples with the same value in fieldA will end up on the same distributed worker. This combined with sorting on fieldA allows the UniqueStream and FilterStream set operations to be distributed
  2. In the example all the Tuples are read from the ParallelStream again producing a bottleneck. There are two approaches to handling this.

    *One approach is to wrap a RankStream around the TupleStream sent to the workers. The RankStream will return the top N tuples from each worker. This is typically how search engines work.

    *Or a SaveStream could be used, that writes the Tuples to local disk on the workers and only returns an EOF tuple. This is often how aggregations engines operate.

 

Scaling-up With SolrCloud Replicas

The Streaming API uses SolrCloud replicas in a different manner then typical SolrCloud search does.

A SolrCloud search has a single aggregator node that pulls the top N documents from each shard.

The Streaming API has more then one aggregator node (worker). Each worker node makes one request to each shard. So if there are 3 workers, each shard will be hit with three simultaneous requests.

Each worker will only request it’s partition of the search results. So the actual amount of records that the search nodes need to sort and stream doesn’t go up as you add workers.

For example, with one worker each shard will take a single request and sort and return 900 docs. For arguments sake let’s say this takes 50 milli-seconds.

With 3 workers each shard will take 3 simultaneous requests that sort and return 300 docs. The search nodes are threaded so let’s say that running these in parallel only takes 30 milli-seconds.

Great, we’ve increased our pushing power by running 3 smaller queries in parallel.

At a certain point though this will begin to tip the other direction if we keep adding workers

For example if we have 9 workers, each shard now receives 9 simultaneous queries. Each request will require sorting and streaming 100 results. Let’s say that this takes 60 milli-seconds. Now we have started to slow down from the overhead of running so many simultaneous requests.

How do we scale past this bottleneck?

By adding search replicas. Let’s look a this example again with 3 replicas per-shard.

The 9 workers will automatically spread their requests across the replicas. So instead of 9 concurrent searches on a single search node (each requesting 1/9 of the result set), there will be 3 concurrent searches per replica (each requesting 1/9th of the result set.

So if the number of workers starts to overwhelm the search nodes, replicas can be added to distribute the search requests across a larger base of search servers.