More Operations with DataFrames

Get hands-on practice exploring various operations that can be performed on DataFrames.

We can also rename, drop or change the data type of DataFrame columns. Let’s see examples of these.

Changing column names

Our data has a rather awkward name for the column that represents movie rating: hitFlop. We can rename the column to the more appropriate name, “Rating,” using the withColumnRenamed method.

scala> val moviesNewColDF = movies.withColumnRenamed("hitFlop","Rating")
moviesNewColDF: org.apache.spark.sql.DataFrame = [imdbId: string, title: string ... 8 more fields]

scala> moviesNewColDF.printSchema
root
 |-- imdbId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- releaseYear: string (nullable = true)
 |-- releaseDate: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- writers: string (nullable = true)
 |-- actors: string (nullable = true)
 |-- directors: string (nullable = true)
 |-- sequel: integer (nullable = true)
 |-- Rating: integer (nullable = true)

The original DataFrame movies isn’t changed, rather we add a new DataFrame that’s created with the changed column name.

Changing column types

In our original movies DataFrame, the column releaseDate is inferred as string type instead of date type if we don’t use the samplingRatio option. To fix this, we can create a new column from the releaseDate column and interpret it as a date type using the withColumn method.

scala> val newDF = movies.withColumn("launchDate", to_date($"releaseDate", "d MMM yyyy"))
                         .drop("releaseDate")
k: org.apache.spark.sql.DataFrame = [imdbId: string, title: string ... 8 more fields]

scala> newDF.printSchema
root
 |-- imdbId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- releaseYear: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- writers: string (nullable = true)
 |-- actors: string (nullable = true)
 |-- directors: string (nullable = true)
 |-- sequel: integer (nullable = true)
 |-- hitFlop: integer (nullable = true)
 |-- launchDate: date (nullable = true)

Spark offers to-and-from methods for date and timestamp types. In the above snippet, we also drop the column releaseDate. The to_date method takes in the column we want to read from and the format of the date to parse, which is d MMM yyyy (for example, “20 Apr 2010”). You can see the date patterns for formatting and parsing listed here.

There may have been failures for some rows when converting from string to date. We can check for those failures as follows:

scala> newDF.select("releaseDate","launchDate").where($"launchDate".isNull).show(5,false)
+-----------+----------+
|releaseDate|launchDate|
+-----------+----------+
|N/A        |null      |
|N/A        |null      |
|N/A        |null      |
|28 Feb,2002|null      |
|N/A        |null      |
+-----------+----------+
only showing top 5 rows

We can see the conversion failed for those rows which didn’t have a valid value for releaseDate or were not in the format we passed-in. We can find the total number of failures as follows:

scala> newDF.select("releaseDate","launchDate")
            .where($"launchDate".isNull)
            .count()
res80: Long = 54

We can now use the year(), month(), and day() methods on the launchDate column. For instance, we can rewrite the query from the previous lesson to list all the distinct release years in our data but have it use the launchDate column instead of the releaseYear column. The query and its output is shown below:

scala> newDF.select(year($"launchDate"))
            .distinct()
            .orderBy(year($"launchDate"))
            .show()
+----------------+
|year(launchDate)|
+----------------+
|            null|
|            2001|
|            2002|
|            2003|
|            2004|
|            2005|
|            2006|
|            2007|
|            2008|
|            2009|
|            2010|
|            2011|
|            2012|
|            2013|
|            2014|
+----------------+

Note the result also includes null as an entry in the output since the rows which didn’t have the correct date format or were missing data returned null for the to_date method.

Aggregations

A lot of data analysis questions require aggregations to be performed on the data. For example, consider the query to calculate the number of movies released per year. We can do so using the groupBy method to group the rows by releaseYear and then ask for a count of rows in each group. The query is shown below:

scala> movies.select("releaseYear")
             .groupBy("releaseYear")
             .count()
             .orderBy("releaseYear")
             .show
+-----------+-----+
|releaseYear|count|
+-----------+-----+
|       2001|   62|
|       2002|   79|
|       2003|   95|
|       2004|   88|
|       2005|  106|
|       2006|   60|
|       2007|   66|
|       2008|   98|
|       2009|   91|
|       2010|  116|
|       2011|  112|
|       2012|   99|
|       2013|  102|
|       2014|  110|
+-----------+-----+

We also orderBy the results by releaseYear so that the output is more readable. Spark also offers methods such as max(), min(), avg(),and sum() that can be used for mathematical operations. Some of the examples that use these methods are as follows:

Finding maximum value in the hitFlop column:

scala> movies.select(max($"hitFlop"))
             .show
+------------+
|max(hitFlop)|
+------------+
|           9|
+------------+

Finding minimum value in hitFlop column.

scala> movies.select(min($"hitFlop"))
             .show
+------------+
|min(hitFlop)|
+------------+
|           1|
+------------+

Finding the sum of all the values in the hitFlop column.

scala> movies.select(sum($"hitFlop"))
             .show
+------------+
|sum(hitFlop)|
+------------+
|        2753|
+------------+

Finding the average rating for each movie.

scala> movies.select(avg($"hitFlop"))
             .show
+------------------+
|      avg(hitFlop)|
+------------------+
|2.1440809968847354|
+------------------+

Other methods for advanced analysis also exist, such as stat(), describe(), correlation(), covariance(), sampleBy(), approxQuantile(), and frequentItems().

A more interesting query would be to find the average rating for the movies released in each year. We’ll need to group the rows by releaseYear and then find the average rating for movies for each year.

scala> movies.select("releaseYear","hitFlop")
             .groupBy("releaseYear")
             .avg("hitFlop")
             .orderBy("releaseYear")
             .show
+-----------+------------------+
|releaseYear|      avg(hitFlop)|
+-----------+------------------+
|       2001| 2.306451612903226|
|       2002|1.9620253164556962|
|       2003|2.0105263157894737|
|       2004|1.9545454545454546|
|       2005| 2.009433962264151|
|       2006|2.9833333333333334|
|       2007| 2.621212121212121|
|       2008|  2.13265306122449|
|       2009| 1.835164835164835|
|       2010|1.8620689655172413|
|       2011|2.0535714285714284|
|       2012| 2.393939393939394|
|       2013| 2.343137254901961|
|       2014| 2.081818181818182|
+-----------+------------------+

Take vs collect

We’ll end the discussion on DataFrame APIs with two methods: take() and collect(). When we invoke collect on a DataFrame, we are returned all the rows that make up the DataFrame. This can be a time consuming and memory intensive operation, potentially resulting in out of memory (OOM) errors if the DataFrame is large enough. In such situations, if the intent is to peek at a few records, it is better to use the take(n) method that returns first n row objects of the DataFrame.

All the queries and commands used in this lesson are reproduced in the widget below for easy copy and pasting into the terminal.

Get hands-on with 1000+ tech skills courses.