Decorating a high velocity event stream with contextual information – on PNDA

spark-streaming-logo.png

How to contextualise streaming events with PNDA

Often, to do meaningful analytics on data you need to decorate an input event stream with additional information looked up from another data store.

In this post I will discuss a design pattern for doing such a task on PNDA. This will provide an efficient implementation of a context lookup for large data rates that scales horizontally with both the size of the context dataset and the input data rate.

For a ballpark figure, you could expect good results from this technique for GBs of context data and tens of thousands of input events per second on a small PNDA cluster (see ‘Example Performance’ below) and 100s of GBs of context data and millions of input events per second on a large cluster.

The application code and all test resources referred to in this post are available here.

What not to do

Given experience with relational databases or key-value stores that provide fast efficient lookup of records for a given key the first thought when faced with this problem is to perform an indexed lookup for each input event. This will work great for low input data rates, and cope with relatively large context datasets, as a single indexed lookup into a table of billions of records can execute quickly, say 1 to 100 ms.

If we think this through however, at an input rate of 10K events per second (which is a very low data rate for big data) 10,000 lookups at 1ms each = 10 seconds. So for every 1 second of data we need to do 10 seconds of lookups, and that’s assuming the low estimate of 1 ms per lookup.

So now you are thinking, “this this fancy big data system is horizontally scalable right? Let’s parallelise”; but now we just created a DDOS attack on our data store.

Streaming Join

Instead of performing a lookup per input message, let’s follow the Databrix example on how to do a streaming join with Spark Streaming.

The visualised DAG of our process is:

join-dag.png

Our input events are ~100 bytes and look like this:

{
  "id":"1234567910111213",
  "context_id":"12345678",
  "gen_ts":"1501590983000",
  "afield":"avalue"
}

Our context dataset records are ~200 bytes and look like this:

{
  "id": 12345678,
  "cust_id":"cust_1233",
  "device_type":"genco box 2000",
  "dev_id":"f5eeb4a9-cac9-4705-82f9-f169a040bdc7",
  "dev_ip_a":"2001:0db8:85a3:0000:0000:8a2e:1234:5678",
  "connection_ref":"link12"
}

And the outcome we want is a context record joined to each input event on event.context_id == context.id.

The snippets below are taken from the example application I have written to test this out, available here.

First we have to load the context dataset as an RDD extract the ‘id’ field as the key for the join, parallelise it so there is 1 partition per executor core and then cache it:

context_dataset = sc.textFile(properties['component.context_dataset_path']).map(lambda row: (str(json.loads(row)['id']), row)).partitionBy(context_dataset_partitions).cache()

This means that each task we can support in the cluster will receive a partition of the context dataset to process. To reduce the time taken to process this context dataset we have to increase the number of executor cores available and set the number of partitions in the RDD to match. In this way, horizontal parallelism in the cluster can be used to scale for larger context datasets.

Next we have to select which field to perform the join on and extract that as the key of the input DStream;

messages = messages.map(lambda event: (json.loads(event[1]['rawdata'])['context_id'], event[1]['rawdata']))

Then we call Spark’s join function to perform the join:

messages_with_context = messages.transform(lambda rdd: context_dataset.join(rdd, join_output_partitions).map(lambda record: (record[0], json.loads(record[1][1])['gen_ts'])))

After the join, the final operation is to create an output dataset with the number of partitions that we want, with the rows containing information from both the input event and the context dataset in the format we need.

The way a map reduce join works is to iterate over both datasets putting records with the same key in a bucket, then when you reach the end, the per-key buckets contain the records from both datasets for that key. Since the time to run the join is now O(number records in context + number records in input batch) the time taken to run the join is not affected much by the input data rate for any reasonably large context dataset. This means the performance of our join will scale up to very high input data rates before the input rate per second leads the the number of records in the batch contributing much to total, considering the large number we already have in the context dataset.

Example Performance

I created a test scenario with 16M context records at a total dataset size of 3.1GB. I created a pico flavoured PNDA (the smallest viable cluster, intended for development and education purposes only) and gave it 3 datanodes with 16 cores and 25GB of memory each.

I configured the PNDA application, as follows (full properties):

Spark parameters: --conf spark.yarn.executor.memoryOverhead=5000 --executor-memory 7G --num-executors 6 --executor-cores 7
Batch size: 60s
Context partitions: 40
Join output partitions: 40

With this configuration the time taken to do a join to the 3GB context dataset was 35 – 40 seconds and this remains flat with input rates from 1K events/sec up to 20K events/sec. At this point the python-kafka client I was using could not generate a higher load input load so I stopped. It would be interesting to revisit this with better Kafka client code and see how far the rate can be increased before the processing time starts to go up but the results obtained do prove that the processing is not affected by the input rate, matching what we expected to see.

At 20K input events per second, this is 20 * 60 / 40 = 30K event decorations per second, or 30us per lookup.

If we had a need to scale this up

  • the input data rate can be scaled by creating more Kafka partitions
  • the join performance scaled by more partitions in the context dataset RDD

With sufficient executor cores provided to cover both these activities.

Summary

The Spark Streaming Join operation can be used to perform a scalable join of high velocity event streams to large context datasets. On a pico PNDA cluster I have shown lookup times as low as 30us per record with a data rate of 20K input events per second.

*Update 3/8/2017*

I wrote a Kafka client using the Java API (pushed to github) which enabled much faster send rates and found that 20K – 30K events per second was the maximum rate before the processing time started to take longer than the batch window of 60 seconds. So the initial results were already a good indication of the maximum performance of a small PNDA cluster applied to this task.

2 thoughts on “Decorating a high velocity event stream with contextual information – on PNDA

  1. Nice work with the streaming joins, James.
    I demoed PAD over PnDA (link : https://youtu.be/Pj0f68499XU), Path anomaly detection listed under pnda.io/usecases !

    Python kafka dumper doesnt work well. I rather used a java class along with kafka exporter & if memory serves me well, I was getting .05-.1M events/sec

    PM me (deven.walia@gmail.com).
    Wish to collaborate in some innovation.

    TIA. Deven.

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s