DataFrames

Get an introduction to the DataFrame data structure and its API.

We'll cover the following

When using structured APIs such as DataFrames in favor of RDDs, developers experience expressiveness, simplicity, composability, and uniformity apart from improved performance. The structured APIs (DataFrames and Datasets) facilitate writing computations with common patterns used in data analysis. Consider the following example code written using RDDs to compute the average rating for the movie Gone with the Wind by three analysts.

    sc.parallelize(Seq(("Gone with the Wind", 6), ("Gone with the Wind", 8),
      ("Gone with the Wind", 8)))
      .map(v => (v._1, (v._2, 1)))
      .reduceByKey((k, v) => (k._1 + v._1, k._2 + v._2))
      .map(x => (x._1, x._2._1 / (x._2._2 * 1.0)))
      .collect()

The code appears cryptic. From Spark’s perspective, it can’t determine the intention of what we put in the lambda functions and can’t apply any optimizations. In contrast, look at the same computation expressed using DataFrames below:

    val dataDF = spark.createDataFrame(Seq(("Gone with the Wind", 6),
      ("Gone with the Wind", 8), ("Gone with the Wind", 8))).toDF("movie", "rating")
    val avgDF = dataDF.groupBy("movie").agg(avg("rating"))
    avgDF.show()

Looking at the two listings it is clear that the code snippet using DataFrames is far more readable and expressive. The DataFrame snippet, in contrast to the RDD snippet, tells Spark what we want to do rather than how to do it. For example, using DataFrames, we can instruct Spark to compute the average. When using RDDs, we write out the logic to compute the average. Another benefit of using higher level APIs is consistency and uniformity of code across supported languages. If we were to rewrite the DateFrame snippet in Python, it would resemble what we wrote in Scala, but this may not be true if we were using RDDs.

DataFrames

A DataFrame is the most common Structured API and represents a table with rows and columns. Each column has a type defined in a schema. You can think of the DataFrame as a spreadsheet that is too big to fit on a single machine, so it has parts of it spread across a cluster of machines. Even if the spreadsheet can fit onto a single machine, the desired computations take too long, so the data has to be chunked and processed on multiple machines in parallel.

Another way to describe DataFrames is to think of them as distributed table-like collections with well defined rows and columns. Each column has the same type of data across all the rows. In a sense, DataFrames (and Datasets too, which we'll cover later) are lazily evaluated plans used to perform operations on data distributed across various machines in a cluster.

A DataFrame is broken up into smaller parts called partitions. A partition is a collection of rows from the parent DataFrame that reside on a particular physical machine on the cluster. A DataFrame’s partitions represent how data is physically distributed across the cluster of machines. The number of partitions also dictates the parallelism that can be achieved in a Spark job. With a single partition, only a single executor can process the data, even if several hundred are available. Similarly, if there are many partitions but only a single executor available, there would be no parallelism.

When working with DataFrames, partitions are never manually or individually manipulated. Instead, the user specifies higher level data transformations that the Spark framework then applies to all the partitions across the cluster.

Schema

A schema defines the column names and types of a DataFrame. A schema can be manually defined or read-in from the source. Spark allows schema inference. Spark reads in a few rows and parses the types in those rows to map them to Spark types. We can also examine the inferred schema for a DataFrame object using the schema method.

Spark types

Spark uses an engine called Tungsten that maintains type information within Spark. The Spark types map to corresponding types in supported languages, such asJava or Python. Spark will convert an expression written in one of the supported languages into an equivalent Tungsten representation for the same type. The Tungsten engine applies several optimizations and is continually improved to make executions faster.

In Scala we can define a DataFrame’s column to be of type String, Byte, Long, Map, and so on. The basic types available in Scala are listed below:

Data Type Value Assigned in Scala API to Instantiate
ByteType Byte DataTypes.ByteType
ShortType Short DataTypes.ShortType
IntegerType Int DataTypes.IntegerType
LongType Long DataTypes.LongType
FloatType Float DataTypes.FloatType
DoubleType Double DataTypes.DoubleType
StringType String DataTypes.StringType
BooleanType Boolean DataTypes.BooleanType
DecimalType java.math.BigDecimal DecimalType

All the basic types are subtypes of DataTypes except for DecimalType. Spark supports similar types for Python. Naturally, data analysis can require more complex types beyond the basic ones, and Spark offers robust structures and complex types such as maps, arrays, structs, dates, timestamps, or fields, both in Scala and Python. These are shown below:

Data Type Value Assigned in Scala API to Instantiate
BinaryType Array[Byte] DataTypes.BinaryType
TimestampType java.sql.Timestamp DataTypes.TimestampType
DateType java.sql.Date DataTypes.DateType
ArrayType scala.collection.Seq DataTypes.createArrayType(ElementType)
MapType scala.collection.Map DataTypes.createMapType(keyType, valueType)
StructType org.apache.spark.sql.Row StructType(ArrayType[fieldTypes])
StructField A value type corresponding to the type of this field StructField(name, dataType, [nullable])

All the queries and commands used in this lesson are reproduced in the widget below for easy copy and pasting into the terminal.

Get hands-on with 1000+ tech skills courses.