The User Defined Function udf()
acts as a function wrapper for Python functions to use on DataFrames and SQL. UDFs are used to expand the framework’s functions to re-use them across various DataFrames. If we want to perform an operation on a data structure and PySpark doesn't have that function, we may write it as a UDF and reuse it as many times on multiple DataFrames.
The syntax for using udf()
is as follows:
udf_function = udf(lambda arg: func(arg), returnType: Type())
The udf()
function is a pre-built from pyspark.sql.functions
. This udf()
function accepts the following two arguments:
lambda arg: func(arg)
: This argument accepts the lambda function func()
to loop over data, and the lambda value arg
will become an argument for the function that we want to make as a UDF.returnType: Type()
: This is the intended return type of the lambda function func()
. By default, the return type is pyspark.sql.types.StringType()
. We can use other return types from pyspark.sql.types
.In the example below, it takes the following arguments as input and returns a function wrapped inside the variable udf_split_text
:
f
provides the Python function named split_text
.returnType
is the intended return type of udf_split_text
. By default, the return type is pyspark.sql.types.StringType()
To create a UDF, we need to follow the following steps:
Let's look at the code below:
from pyspark.sql.types import IntegerType, ArrayType, StringType from pyspark.sql.functions import udf from pyspark.sql import SparkSession # Step 1: Creating a DataFrame spark = SparkSession.builder.appName('PySpark UDF').getOrCreate() columns = ["ID","Brand"] data = [("1", "Samsung"), ("2", "Apple"), ("3", "Huawei")] df = spark.createDataFrame(data=data,schema=columns) df.show(truncate=False) # Step 2: Defining a function def countProducts(brand): return len(brand) * 2 # Step 3: Converting function to UDF countUDF = udf(lambda z: countProducts(z), IntegerType()) # Step 4: Using UDF on a DataFrame df.withColumn("Count", countUDF("Brand")).show() print('UDF Created')
Note: Some warnings will appear in the terminal after running the code. Please ignore them.
pyspark
library.df
.countProduct(brand)
for counting the number of products using the length of the brand name and doubling it.countProduct
function to a UDF and define the return type as IntegerType()
because the count will be an integer. The converted function is countUDF
.countDF
on every row of a DataFrame and pass the brand
name to this UDF for counting products. The count of the products will appear in the Count
column of the DataFrame.Free Resources