Rows

Get hands-on practice performing various operations on the rows of a DataFrame.

We'll cover the following

A row in Spark is an ordered collection of fields that can be accessed starting at index 0. The row is a generic object of type Row. Columns making up the row can be of the same or different types.

Creating a row

Spark allows us to instantiate rows in all the supported languages. For example, we can create a row and access its individual columns in Scala as follows:

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val row = Row("Upcoming New Movie", 2021, "Comedy")
row: org.apache.spark.sql.Row = [Upcoming New Movie,2021,Comedy]

scala> row(0)
res53: Any = Upcoming New Movie

scala> row(1)
res54: Any = 2021

scala> row(2)
res55: Any = Comedy

We can also create DataFrames from rows for quick exploration of data. An example is shown below:

scala> val rows = Seq(("Tom Cruise Movie", 2021, "Comedy"), ("Rajinikanth Movie", 2021, "Action"))
rows: Seq[(String, Int, String)] = List((Tom Cruise Movie,2021,Comedy), (Rajinikanth Movie,2021,Action))

scala> val newMovies = rows.toDF("Movie Name", "Release Year", "Genre")
newMovies: org.apache.spark.sql.DataFrame = [Movie Name: string, Release Year: int ... 1 more field]

scala> newMovies.show()
+-----------------+------------+------+
|       Movie Name|Release Year| Genre|
+-----------------+------------+------+
| Tom Cruise Movie|        2021|Comedy|
|Rajinikanth Movie|        2021|Action|
+-----------------+------------+------+

Though we can create rows and generate DataFrames from them, we’ll generally read data from files for any serious data analysis problem.

Projections and filters

A projection is a set of rows that match a relational condition. In the context of Spark, a projection can be created using the select() method while a filter can be specified using the where() or filter() methods. Let’s say we want to find all the movies that had a rating of 9 or higher. We can write a query as follows:

movies.select("title")
      .where(col("hitFlop") > 8)
      .show()

Or we could simply find a count of such movies:

movies.select("title")
      .where(col("hitFlop") > 8)
      .count()

The output from the above commands is shown below:

scala> movies.select("title")
             .where(col("hitFlop") > 8).show()
+--------------------+
|               title|
+--------------------+
|Gadar: Ek Prem Katha|
|            3 Idiots|
|            Dhoom: 3|
|                  PK|
+--------------------+


scala> movies.select("title")
             .where(col("hitFlop") > 8)
             .count()
res21: Long = 4

We can answer more complicated questions about our data using filters. For example, we can determine the number of movies that were released after the year 2010 and belong to the genre of romance as follows:

scala> movies.select("title")
             .filter($"genre".contains("Romance"))
             .count()
res34: Long = 372

scala> movies.select("title")
             .filter($"genre".contains("Romance"))
             .where($"releaseYear" > 2010).count()
res35: Long = 116

As another example, say we want to know which years the data covers movie releases for. We can answer that question by selecting the releaseYear column and invoking the distinct() method as follows:

scala> movies.select("releaseYear")
             .distinct()
             .sort($"releaseYear".desc)
             .show()
+-----------+
|releaseYear|
+-----------+
|       2014|
|       2013|
|       2012|
|       2011|
|       2010|
|       2009|
|       2008|
|       2007|
|       2006|
|       2005|
|       2004|
|       2003|
|       2002|
|       2001|
+-----------+

Notice we also sort the output in descending order.

All the queries and commands used in this lesson are reproduced in the widget below for easy copy and pasting into the terminal.

# Creating Row
import org.apache.spark.sql.Row
val row = Row("Upcoming New Movie", 2021, "Comedy")
row(0)
row(1)
row(2)
val rows = (Seq(("Tom Cruise Movie", 2021, "Comedy"),
("Rajinikanth Movie", 2021, "Action")))
val newMovies = rows.toDF("Movie Name", "Release Year", "Genre")
newMovies.show()
# Projections and filters
val movies = (spark.read.format("csv")
.option("header","true")
.option("inferSchema","true")
.load("/data/BollywoodMovieDetail.csv"))
(movies.select("title")
.where(col("hitFlop") > 8)
.show())
(movies.select("title")
.where(col("hitFlop") > 8)
.count())
(movies.select("title")
.where(col("hitFlop") > 8)
.show())
(movies.select("title")
.where(col("hitFlop") > 8)
.count())
(movies.select("title")
.filter($"genre".contains("Romance"))
.count())
(movies.select("title")
.filter($"genre".contains("Romance"))
.where($"releaseYear" > 2010)
.count())
(movies.select("releaseYear")
.distinct()
.sort($"releaseYear".desc)
.show())
Terminal 1
Terminal
Loading...

Spark also allows us to perform more complicated operations with DataFrames, including those that involve aggregate operations such as computing averages across column values. We’ll explore those and other operations in the next lesson.