Search⌘ K
AI Features

Joins, Unions, and Window Functions

Explore how to execute joins and unions between DataFrames or tables in Spark SQL. Understand different join types and practice manipulating string data for effective joins. Discover window functions to perform ranking and analytic operations within grouped data. This lesson helps you master advanced Spark SQL techniques for querying and analyzing structured big data.

We'll cover the following...

Spark offers more involved and complex functions that support aggregation, collection, datetime, math, string, sorting, windowing, etc., functionality. We’ll see examples of joins, unions and windowing in this lesson.

Join

Joins can be performed between DataFrames or tables in Spark. By default, Spark executes an inner join between tables but has support for cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti joins. Let’s work an example of executing an inner join on our Bollywood movies data set. We’ll read the following two files and then join their respective DataFrames.

  1. BollywoodMovieDetail.csv (the file we have already been reading in the previous lessons). The column names appear below:
imdbId title releaseYear releaseDate genre writers actors directors sequel hitFlop
X
  1. BollywoodActorRanking.csv. The column names appear below:
actorId actorName movieCount ratingSum normalizedMovieRank googleHits normalizedGoogleRank normalizedRating
X

We’ll want to join the two tables data on the actor names’ column. However, recall that the column actors in BollywoodMovieDetail.csv appears as pipe delimited string of names, so we’ll need to massage the data in this column first in a way to make it amenable for a join. We can then attempt a join with the actorName column of the BollywoodActorRanking.csv file.

Additionally, we’ll switch between the DataFrame API and Spark SQL to demonstrate the flexibility of Spark. Read the listing below to see how the join is performed.


## Read the first file as a DataFrame
scala> val movies = spark.read.format("csv")
                         .option("header", "true")
                         .option("samplingRatio", 0.001)
                         .option("inferSchema", "true")
                         .load("/data/BollywoodMovieDetail.csv")


## Read the second file as a DataFrame
scala> val actors = spark.read.format("csv")
                         .option("header", "true")
                         .option("samplingRatio", 0.001)
                         .option("inferSchema", "true")
                         .load("/data/BollywoodActorRanking.csv")

## We create a temporary table tempTbl1 that converts the pipe-delimited string of actor names into an array of string tokens
scala> spark.sql("""CREATE TABLE tempTbl1 AS SELECT title, releaseYear, hitFlop, split(actors,"[|]") AS actors FROM movies""")

## Next we create another temporary table tempTbl2 that explodes the actors' array in tempTbl2 so that we get a column with a single actor name. We'll later join this column with the actorName column in the other DataFrame. Also, notice that we trim the actor name for any preceding or trailing spaces and convert the name to upper case.
scala> spark.sql("""CREATE TABLE tempTbl2 AS SELECT title, releaseYear, hitFlop, upper(trim(actor)) AS actor FROM (SELECT  title, releaseYear, hitFlop, explode(actors) AS actor FROM tempTbl1)""")

## Now we create a new DataFrame by running trim and upper functions on the column actorName for the data read in for the second data file. 
scala> val actorsDF = actors.withColumn("actorName",trim(upper($"actorName")))

## Create a DataFrame from tempTbl2
scala> val moviesDF = spark.sql("""SELECT * FROM tempTbl2""")

## Finally, we join on the columns containing the actor names and display 5 rows.
scala> moviesDF.join(actorsDF, $"actor" === $"actorName").show(5,false)

The result of executing the above queries is captured in the widget below:

Shell
scala> moviesDF.join(actorsDF, $"actor" === $"actorName").show(5,false)
+---------------------------------+-----------+-------+----------------------+-------+----------------------+----------+---------+-------------------+----------+--------------------+----------------+
|title |releaseYear|hitFlop|actor |actorId|actorName |movieCount|ratingSum|normalizedMovieRank|googleHits|normalizedGoogleRank|normalizedRating|
+---------------------------------+-----------+-------+----------------------+-------+----------------------+----------+---------+-------------------+----------+--------------------+----------------+
|Albela |2001 |2 |GOVINDA |413 |GOVINDA |17 |680 |2.9565200805664062 |1430000 |4.685649871826172 |4.78557014465332|
|Albela |2001 |2 |AISHWARYA RAI BACHCHAN|394 |AISHWARYA RAI BACHCHAN|16 |780 |3.8125 |1930000 |5.9755401611328125 |6.00856018066406|
|Albela |2001 |2 |JACKIE SHROFF |510 |JACKIE SHROFF |25 |740 |1.9391299486160278 |349000 |1.8969099521636963 |2.61633992195129|
|Albela |2001 |2 |NAMRATA SHIRODKAR |583 |NAMRATA SHIRODKAR |6 |140 |1.3260899782180786 |95400 |1.2426799535751343 |1.89409005641937|
|Lagaan: Once Upon a Time in India|2001 |6 |AAMIR KHAN |373 |AAMIR KHAN |11 |1170 |9.448619842529297 |2460000 |7.342830181121826 |10.0 |
+---------------------------------+-----------+-------+----------------------+-------+----------------------+----------+---------+-------------------+----------+--------------------+----------------+
only showing top 5 rows

Union

The union operation allows us to combine two DataFrames with the same schema together. As a concocted example, consider we want to find all the movies except those that were released in the year 2010. We can first create a DataFrame of movies that were released prior to 2010 and then create a second DataFrame of movies released after the year 2010. Finally, to get our answer we can union the two DataFrames. The listing appears below:

## Create a DataFrame of all the movies that were released after 2010
scala> val df1 = movies.select("title").where( $"releaseYear" > 2010).limit(2)
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [title: string]


## Create a DataFrame of all the movies that were released before 2010
scala> val df2 = movies.select("title").where( $"releaseYear" < 2010).limit(2)
df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [title: string]

## Union the two DataFrames
scala> df1.union(df2).show(false)
+---------------------------------+
|title                            |
+---------------------------------+
|Action Jackson                   |
|Bhopal: A Prayer for Rain        |
|Albela                           |
|Lagaan: Once Upon a Time in India|
+---------------------------------+

Windowing

Windowing functions allow us to answer questions that may span multiple individual queries or are hard to express without the windowing functions. There are three kinds of windowing functions, namely:

  1. Ranking Functions
  2. Analytic Functions
  3. Aggregate Functions

As an example, we’ll work with dense_rank() function which is a ranking function. Suppose we want to write a query to output all the movies that had the best or the second best rating for the year that they were released in. Recall that the column hitFlop represents the rating for each movie. In this case, we want to rank within a window of movies released in a given year, i.e. we aren’t looking for ranking across all the data set rather a subset of it. The windowing functions work on a group of rows, referred to as a window, and calculate a return value for each row based on the group of rows. Consider the following query and its output:

scala> spark.sql("SELECT title, hitFlop, releaseYear, dense_rank() OVER (PARTITION BY releaseYear ORDER BY hitFLop DESC) as rank  FROM MOVIES").show(3)
+-------+-------+-----------+----+
|  title|hitFlop|releaseYear|rank|
+-------+-------+-----------+----+
|Baghban|      6|       2003|   1|
|  Bhoot|      6|       2003|   1|
| Andaaz|      6|       2003|   1|
+-------+-------+-----------+----+
only showing top 3 rows

The three movies are all ranked as number 1 for the bucket of movies released in the year 2003 even though their hitFlop rating is 6. On the contrary, if we execute the same query for the year 2013 the result is as follows:

scala> spark.sql("SELECT title, hitFlop, releaseYear, dense_rank() OVER (PARTITION BY releaseYear ORDER BY hitFLop DESC) as rank  FROM MOVIES WHERE releaseYear=2013").show(3)
+---------------+-------+-----------+----+
|          title|hitFlop|releaseYear|rank|
+---------------+-------+-----------+----+
|       Dhoom: 3|      9|       2013|   1|
|       Krrish 3|      8|       2013|   2|
|Chennai Express|      8|       2013|   2|
+---------------+-------+-----------+----+
only showing top 3 rows

Here the movie with hitFlop rating of 8 is ranked as number 2 which makes sense since the ranking is restricted to the movies released in the year 2013. In the above two queries, we mark the function dense_rank() to be used as a window function by the OVER clause. The clause following OVER is called the window specification that describes to Spark what constitutes each window. Within window specification the PARTITION BY is known as the partition specification and controls which rows will be in the same partition with the given row. The ORDER BY specification is required to control the way that rows in a partition are ordered, determining the position of the given row in its partition. The third element that makes up window specification and not used in our queries is called the frame specification. It is used in the context of other window functions.

Going back to our original question, we can now find all the movies that were ranked as best or second year for their year of release as follows:

scala> spark.sql("SELECT * FROM (SELECT title, hitFlop, releaseYear, dense_rank() OVER (PARTITION BY releaseYear ORDER BY hitFLop DESC) as rank  FROM MOVIES) tmp WHERE rank <=2 ORDER BY releaseYEar").show(5)
+--------------------+-------+-----------+----+
|               title|hitFlop|releaseYear|rank|
+--------------------+-------+-----------+----+
|Gadar: Ek Prem Katha|      9|       2001|   1|
|Kabhi Khushi Kabh...|      8|       2001|   2|
|Mere Yaar Ki Shaa...|      5|       2002|   2|
|              Devdas|      6|       2002|   1|
|Ek Chhotisi Love ...|      6|       2002|   1|
+--------------------+-------+-----------+----+
only showing top 5 rows

All the queries and commands used in this lesson are reproduced in the widget below for easy copy/pasting into the terminal.

Shell
# Join example
val movies = (spark.read.format("csv")
.option("header", "true")
.option("samplingRatio", 0.001)
.option("inferSchema", "true")
.load("/data/BollywoodMovieDetail.csv"))
val actors = (spark.read.format("csv")
.option("header", "true")
.option("samplingRatio", 0.001)
.option("inferSchema", "true")
.load("/data/BollywoodActorRanking.csv"))
spark.sql("""CREATE OR REPLACE TEMPORARY VIEW movies USING csv OPTIONS (path "/data/BollywoodMovieDetail.csv", header "true", inferSchema "true", mode "FAILFAST")""")
spark.sql("""CREATE TABLE tempTbl1 AS SELECT title, releaseYear, hitFlop, split(actors,"[|]") AS actors FROM movies""")
spark.sql("""CREATE TABLE tempTbl2 AS SELECT title, releaseYear, hitFlop, upper(trim(actor)) AS actor FROM (SELECT title, releaseYear, hitFlop, explode(actors) AS actor FROM tempTbl1)""")
val actorsDF = actors.withColumn("actorName",trim(upper($"actorName")))
val moviesDF = spark.sql("""SELECT * FROM tempTbl2""")
moviesDF.join(actorsDF, $"actor" === $"actorName").show(5,false)
# Union example
val df1 = movies.select("title").where( $"releaseYear" > 2010).limit(2)
val df2 = movies.select("title").where( $"releaseYear" < 2010).limit(2)
df1.union(df2).show(false)
# Window function example
spark.sql("SELECT title, hitFlop, releaseYear, dense_rank() OVER (PARTITION BY releaseYear ORDER BY hitFLop DESC) as rank FROM MOVIES").show(3)
spark.sql("SELECT title, hitFlop, releaseYear, dense_rank() OVER (PARTITION BY releaseYear ORDER BY hitFLop DESC) as rank FROM MOVIES WHERE releaseYear=2013").show(3)
spark.sql("SELECT * FROM (SELECT title, hitFlop, releaseYear, dense_rank() OVER (PARTITION BY releaseYear ORDER BY hitFLop DESC) as rank FROM MOVIES) tmp WHERE rank <=2 ORDER BY releaseYEar").show(5)
Terminal 1
Terminal
Loading...

We can now begin to appreciate the versatility and breadth of SparkSQL given the span of built-in and higher order functions it offers.