Spark User Defined Functions

Learn how to create user defined functions in Spark and utilize in-built Spark functions.

We have previously seen and worked with Spark’s in-built function, but Spark also allows users to define their own functionality wrapped inside user defined functions (UDFs) that can be invoked in Spark SQL. The major benefit of UDFs is reusability. UDFs exist per session and don’t persist within the underlying metastore. Let’s consider a simple function that returns the last two digits of the releaseYear value e.g., if the function is passed-in 2021, it’ll return 21. The function definition and its use is presented below:

val movies = spark.read.format("csv")
                       .option("header", "true")
                       .option("samplingRatio", 0.001)
                       .option("inferSchema", "true")
                       .load("/data/BollywoodMovieDetail.csv")

scala> movies.write.saveAsTable("movies")

scala> val twoDigitYear = (year : Int) => {  ((year/10)%10).toString +  (year%10).toString }

scala> spark.udf.register("shortYear", twoDigitYear)

scala> spark.sql("SELECT title, shortYear(releaseYear) FROM movies").show(3)
+--------------------+----------------------+
|               title|shortYear(releaseYear)|
+--------------------+----------------------+
|              Albela|                    01|
|Lagaan: Once Upon...|                    01|
|Meri Biwi Ka Jawa...|                    04|
+--------------------+----------------------+
only showing top 3 rows

Spark built-in functions

Spark offers a rich collection of built-in functions. Below are a few examples:

## Convert the movie title to upper case
scala> spark.sql("SELECT upper(title) FROM movies").show(3)
+--------------------+
|        upper(title)|
+--------------------+
|              ALBELA|
|LAGAAN: ONCE UPON...|
|MERI BIWI KA JAWA...|
+--------------------+
only showing top 3 rows

## Generating random number
scala> spark.sql("SELECT random()").show(1)
+------------------+
|            rand()|
+------------------+
|0.5073736569856021|
+------------------+

## Retrieving the current timestamp
scala> spark.sql("SELECT now()").show(1, false)
+-----------------------+
|now()                  |
+-----------------------+
|2021-05-09 20:56:06.462|
+-----------------------+


scala> spark.sql("SELECT current_timestamp()").show(1, false)
+-----------------------+
|current_timestamp()    |
+-----------------------+
|2021-05-09 20:56:15.702|
+-----------------------+

Recall that the columns actors, directors, and writers contain pipe-separated names. We can use the split() function to tokenize the names as follows:

scala> spark.sql("""SELECT split(actors,"[|]") AS Names FROM movies""").show(3, false)
+-------------------------------------------------------------------------+
|Names                                                                    |
+-------------------------------------------------------------------------+
|[Govinda ,  Aishwarya Rai Bachchan ,  Jackie Shroff ,  Namrata Shirodkar]|
|[Aamir Khan ,  Gracy Singh ,  Rachel Shelley ,  Paul Blackthorne]        |
|[Akshay Kumar ,  Sridevi ,  Gulshan Grover ,  Laxmikant Berde]           |
+-------------------------------------------------------------------------+
only showing top 3 rows

UDFs in PySpark

UDFs can also be defined in PySpark. Initially, PySpark UDFs required data to be moved between JVM and Python, causing them to be slower than Scala UDFs. This issue was resolved with Pandas UDFs, also known as vectorized UDFs, in Spark 2.3. A Pandas UDF uses Apache Arrow for data transfer and circumvents the need for serialization or pickling because the Apache Arrow format data is consumable by the Python process.

All the queries and commands used in this lesson are reproduced in the widget below for easy copy and pasting into the terminal.

Get hands-on with 1200+ tech skills courses.