Spark Streaming

This topic deals in detail with Spark streaming concepts, architecture and implementation for enabling stream based real time processing in the Big Data Ecosystem.

Need for Streaming Architecture

Sensors, IoT devices, social networks, and online transactions are all generating data that needs to be monitored constantly and acted upon quickly.

Making a purchase online means that all the associated data (e.g. date, time, items, price) need to be stored and made ready for organizations to analyze and make prompt decisions based on the customer’s behavior.

Fraudulent bank transactions requires testing transactions against pre-trained fraud models as the transactions occur (i.e. as data streams) to quickly stop fraud in its track.

Spark Streaming - What is it?

Study of over 1,400 Spark users conducted by Databricks, the company founded by the creators of Spark, shows that: Compared to 2014, 56 percent more Spark users globally ran Spark Streaming applications in 2015.

48 percent of survey respondents noted Spark Streaming as their most-used Spark component. Spark Streaming’s ever-growing user base consists of household names like Uber, Netflix, and Pinterest.

Real time analytics using Spark streaming

Analyze data streams in real time rather than huge batch daily. E.g. Analyze streams of weblog data to react to user behavior - offers and recommendations.

Fraud detection, real time analytics for sensors data from IoT.

Spark streaming helps in transform, summarize, analyze and perform machine learning to predict in real time.

Sources – Files, Flume, Kafka, kinesis, social media and other API’s.

Spark Streaming architecture

Instead of processing the streaming data i.e. one record at a time, Spark Streaming discretizes the streaming data into tiny, sub-second micro-batches.

Spark Streaming’s Receivers accept data in parallel and buffer it in the memory of Spark’s workers nodes. Then the latency optimized Spark engine runs short tasks (tens of milliseconds) to process the batches and output the results to other systems.

Note: Unlike the traditional continuous operator model, where the computation is statically allocated to a node, Spark tasks are assigned dynamically to the workers-based on the locality of the data and available resources. This enables both better load balancing and faster fault recovery

• In addition, each batch of data is a Resilient Distributed Dataset (RDD), which is the basic abstraction of a fault-tolerant dataset in Spark. This allows the streaming data to be processed using any Spark code or library.

Benefits of Discretized Stream Processing

Dynamic Load Balancing

Fast Failure and Straggler Recovery

Unification of Batch, Streaming and Interactive Analytics

Advanced Analytics like Machine Learning and

Interactive SQL

Performance Tuning

Dynamic Load Balancing

Dividing the data into small micro-batches allows for fine-grained allocation of computations to resources.

For example, consider a simple workload where the input data stream needs to partitioned by a key and processed.

In the traditional record-at-a-time approach taken by the most other systems, if one of the partitions is more computationally intensive than the others, the node statically assigned to partition process will become a bottleneck and slow down the pipeline.

In Spark Streaming, the job’s tasks will be naturally load balanced across the workers — some workers will process a few longer tasks, others will process more of the shorter tasks.

Fast Failure and Recovery

In case of node failures, traditional systems have to restart the failed continuous operator on another node and replay some part of the data stream to recompute the lost information. Note: Only one node is handling the recomputation, and the pipeline cannot proceed until the new node has caught up after the replay.

Unification of Batch, Streaming and Interactive Analytics

The key programming abstraction in Spark Streaming is a DStream, or Distributed Stream.

Each batch of streaming data is represented by an RDD, which is Spark’s concept for a distributed dataset.

Therefore a DStream is just a series of RDDs. This common representation allows batch and streaming workloads to interoperate seamlessly. Users can apply arbitrary Spark functions on each batch of streaming data: for example, it’s easy to join a DStream with a precomputed static dataset (as an RDD).

// Create data set from Hadoop file val dataset = sparkContext.hadoopFile(“file”) // Join each batch in stream with the dataset kafkaDStream.transform { batchRDD => batchRDD.join(dataset).filter(…) }

Advanced Analytics - Machine Learning and Interactive SQL

Spark interoperability extends to rich libraries like MLlib (Machine Learning), SQL, DataFrames, and GraphX. Let’s explore a few use cases: Streaming + SQL and DataFrames RDDs generated by DStreams can be converted to DataFrames (the programmatic interface to Spark SQL), and queried with SQL. For example, using Spark SQL’s JDBC server, you can expose the state of the stream to any external application that talks SQL.

Performance

In practice, Spark Streaming’s ability to batch data and leverage the Spark engine leads to comparable or higher throughput to other streaming systems.

In terms of latency, Spark Streaming can achieve latencies as low as a few hundred milliseconds. Developers sometimes ask whether the micro-batching inherently adds too much latency.

In practice, batching latency is only a small component of end-to-end pipeline latency.

For example, many applications compute results over a sliding window, and even in continuous operator systems, this window is only updated periodically (e.g. a 20 second window that slides every 2 seconds).

Many pipelines collect records from multiple sources and wait for a short period to process delayed or out-of-order data. Finally, any automatic triggering algorithm tends to wait for some time period to fire a trigger. Therefore, compared to the end-to-end latency, batching rarely adds significant overheads.

Throughput gains from DStreams often means that you need fewer machines to handle the same workload.

Discretized Streams (DStreams)

Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream.

Internally, a DStream is represented by a continuous series of RDDs, which is Spark’s abstraction of an immutable, distributed dataset.

Each RDD in a DStream contains data from a certain interval.

Any operation applied on a DStream translates to operations on the underlying RDDs.

Input DStreams are DStreams representing the stream of input data received from streaming sources.

Every input DStream (except file stream) is associated with a Receiver object which receives the data from a source and stores it in Spark’s memory for processing.

Spark Streaming provides two categories of built-in streaming sources.

Basic sources: Sources directly available in the StreamingContext API. For example: file systems, socket connections, and Akka actors.

Advanced sources: Sources like Kafka, Flume, Kinesis, Twitter, etc. are available through extra utility classes. These require linking against extra dependencies as discussed in the linking section.

To receive multiple streams of data in parallel in a streaming application, one can create multiple input DStreams. This will create multiple receivers which will simultaneously receive multiple data streams.

Note: A Spark worker/executor is a long-running task, hence it occupies one of the cores allocated to the Spark Streaming application. Therefore, it is important to remember that a Spark Streaming application needs to be allocated enough cores (or threads, if running locally) to process the received data, as well as to run the receiver(s).

Basic Sources- Spark Streaming

In the session Spark Streaming, ssc.socketTextStream(…) creates a DStream from text data received over a TCP socket connection. Besides sockets, the StreamingContext API provides methods for creating DStreams from files and Akka actors as input sources.

• File Streams: For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as: streamingContext.fileStreamKeyClass, ValueClass, InputFormatClass

Spark streaming will monitor the directory dataDirectory and process any files created in that directory (files written is nested directories not supported). Note that: The files must have the same data format.

The files must be created in the dataDirectory by atomically moving or renaming them into the data directory. Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.

For simple text files, there is streamingContext.textFileStream(dataDirectory). And file streams do not require running a receiver, hence does not require allocating cores.

Some of the advanced sources are as follows:

Kafka: Spark Streaming 1.6.0 is compatible with Kafka 0.8.2.1.

Flume: Spark Streaming 1.6.0 is compatible with Flume 1.6.0.

Kinesis: Spark Streaming 1.6.0 is compatible with Kinesis Client Library 1.2.1.

Twitter: Spark Streaming’s TwitterUtils uses Twitter4j to get the public stream of tweets using Twitter’s Streaming API. Authentication information can be provided by any of the methods supported by Twitter4J library. You can either get the public stream, or get the filtered stream based on a keywords.

Receiver Reliability

There are two kinds of data sources based on their reliability.

Sources like Kafka and Flume allow the transferred data to be acknowledged.

If the system receiving data from these reliable sources acknowledges the received data correctly, it can be ensured that no data will be lost due to any kind of failure. This leads to two kinds of receiver:

Reliable Receiver – Correctly sends acknowledgment to a reliable source when the data has been received and stored in Spark with Replication

Unreliable Receiver – Doesn’t send acknowledgement to a source. This can be used for sources that do not support acknowledgement, or even for reliable sources when one does not want or need to go into the complexity of acknowledgement.

Streaming context created from Spark context to enable
streaming.
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(5))
Processing occurs on Dstreams –microbatch window is setup
Each microbatch is an RDD
Global variables can be used to track data across Dstreams
// Create a DStream that will connect to hostname:port, like localhost:9000
val lines = ssc.socketTextStream("localhost", 9000)
Transformation done on Dstream/ Dervied Dstream- RDD’s
// Split each line into words
val words = lines.flatMap(_.split(" ")
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

Transformations on DStreams

map(func) - return a new DStream by passing each element of the source DStream through a function func.

flatMap(func) - each input item can be mapped to 0 or more output items

filter(func) - selecting only the records of the source DStream on which func returns true repartition(numPartitions)- Changes the level of parallelism in this DStream by creating more or fewer partitions

count() - counting the number of elements in each RDD of the source DStream.

reduce(func)- Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func

reduceByKey(func, [numTasks]) - return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function

Windowing functions

Need for long lived state on Dstream e.g. to aggregate session data in web activity. Windowing functions available for computing across multiple Dstreams.

Window slides as time goes on to represent batches within the window interval.

e.g know what happened in last 60(window) seconds every 30(slide) seconds The fold/aggregation is limited to the window and type of input Dstream.

Batch Interval – How often data is captured.

Slide Interval – How often windowed transformation is computed.

Window Interval – how far back in time window interval goes.

updateStateByKey

The updateStateByKey operation allows you to maintain arbitrary state while continuously updating it with new information. To use this, you will have to do two steps. Define the state - The state can be an arbitrary data type.

Define the state update function - Specify with a function how to update the state using the previous state and the new values from an input stream.

def updateFunction(newValues, runningCount): if runningCount is None: runningCount = 0 return sum(newValues, runningCount) # add the new values with the previous running Count to get the new count runningCounts = pairs.updateStateByKey(updateFunction)

Output Operations on DStreams

print() saveAsTextFiles(prefix, [suffix]) foreachRDD(func) Example - foreachRDD():

dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections

val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))

ConnectionPool.returnConnection(connection) // return to the pool for future reuse } }

Data Serialization

Input data: By default, the input data received through Receivers is stored in the executors’ memory with StorageLevel MEMORY_AND_DISK_SER_2.

Persisted RDDs generated by Streaming Operations: RDDs generated by streaming computations may be persisted in memory. For example, window operations persist data in memory as they would be processed multiple times. Persisted RDDs generated by streaming computations are persisted with StorageLevel MEMORY_ONLY_SER(i.e. serialized) by default to minimize GC overheads.

Transform

The transform operation (along with its variations like transformWith) allows arbitrary RDD-to-RDD functions to be applied on a DStream. It can be used to apply any RDD operation that is not exposed in the DStream API. For example, the functionality of joining every batch in a data stream with another dataset is not directly exposed in the DStream API. However, you can easily use transform to do this.

spamInfoRDD = sc.pickleFile(…) # RDD containing spam information

join data stream with spam information to do data cleaning

cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(…))

Dataframe and SQL

This is done by creating a lazily instantiated singleton instance of Spark Session.

Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL.

/** DataFrame operations inside your streaming program / val words: DStream[String] = … words.foreachRDD { rdd => // Get the singleton instance of SparkSession val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate() import spark.implicits._ // Convert RDD[String] to DataFrame val wordsDataFrame = rdd.toDF(“word”) // Create a temporary view wordsDataFrame.createOrReplaceTempView(“words”) // Do word count on DataFrame using SQL and print it val wordCountsDataFrame = spark.sql("select word, count() as total from words group by word") wordCountsDataFrame.show() }