Search⌘ K
AI Features

Ingestion Job: Part II

Explore how to process nested JSON files in Apache Spark using Java by applying flatMap transformations to flatten complex data structures. Understand the implementation of a JsonFlatMapper to convert nested sales data into rows suitable for database insertion. Learn how to efficiently persist these rows using mapPartitions with batch updates in JDBC, completing the ingestion job workflow in a Spark batch application.

Processing the input

The project with the codebase for this lesson is the same as the previous lesson:

mvn install -DskipTests
java -jar /usercode/target/batch-app-0.0.1-SNAPSHOT.jar jobName=ingesterJob clientId=client1 readFormat=json fileName=sales
Project with the IngesterJob code implementation

Once Spark is instructed to read and point the input—JSON file contents—into our favorite logical abstraction, the DataFrame, this object is then “passed” to the job’s process method in the following manner:

Java
@Override
protected Dataset<Row> process(Dataset<Row> preProcessOutput) {
return (Dataset<Row>) ingesterProcessor.process(preProcessOutput);
}

The ingesterProcessor object receives it as the argument for its process(preProcessOutput) method. What goes on inside it? Let’s inspect the following code snippet:

Java
@Component
public class IngesterProcessor implements Processor<Dataset<Row>> {
private static Logger LOGGER = LoggerFactory
.getLogger(IngesterProcessor.class);
@Override
public Dataset<Row> process(Dataset<Row> inputDf) {
LOGGER.info("Flattening JSON records...");
//Get the appropriate Spark based class doing a Transformation
Dataset<Row> parsedResults = inputDf.flatMap(new IngesterJsonFlatMapper(), RowEncoder.apply(SalesSchema.getSparkSchema()));
return parsedResults;
}
}

Line 11 in the code packs a lot of interesting things:

  • On the inputDf, a flatMap transformation is applied to parse each of the JSON records into a single line represented by a Row object. This “translation” process is needed because when Spark reads the JSON file, the contents are internally structured in the following way:
Java
root
|-- Sales: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Date: string (nullable = true)
| | |-- Items: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- Product: string (nullable = true)
| | | | |-- Quantity: long (nullable = true)
|-- Seller_Id: string (nullable = true)

Interestingly enough, Spark internal representation of a JSON read record (the schema) is pretty similar to the JSON structure.

It contains an array of Sales and Seller_id String fields as root properties. The sales array is a collection of elements, named element, of type struct.

Struct ...