Spark ETL Chapter 8 with Lakehouse | Apache HUDI

Previous blog/Context:

In an earlier blog, we discussed Spark ETL with Lakehouse (with Delta Lake). Please find below blog post for more details.

Introduction:

In this blog, we will discuss Spark ETL with Apache HUDI. We will first understand what Apache HUDI is and why Apache HUDI is used for creating Lake house. We will source data from one of the source systems which we have learned till now and load that data into Apache HUDI format. We will create an on-premise lake house and load all data into it.

What is Apache HUDI?

Apache Hudi is an open-source data management framework for Apache Hadoop-based data lakes. Hudi stands for “Hadoop Upserts Deletes and Incrementals.” It provides a way to manage data in a big data environment with features like data ingestion, data processing, and data serving. Hudi was originally developed by Uber and was later contributed to the Apache Software Foundation as an open-source project.

Hudi provides several key features that make it useful for managing big data, including:

  1. Upserts, deletes, and increments: Hudi supports efficient updates and deletes existing data in a Hadoop-based data lake, allowing for incremental data processing.
  2. Transactional writes: Hudi supports ACID transactions, ensuring that data is consistent and reliable.
  3. Delta storage: Hudi stores data as delta files, which allows for fast querying and processing of data changes.
  4. Schema evolution: Hudi supports schema evolution, enabling changes to the schema without requiring a full reload of the data.
  5. Data indexing: Hudi provides indexing capabilities that make it easy to query data in a Hadoop-based data lake.

Overall, Hudi provides a flexible and efficient way to manage big data in a Hadoop-based data lake. It enables efficient data processing and querying while ensuring data consistency and reliability through ACID transactions. Hudi is used by a variety of companies and organizations, including Uber, Alibaba, and Verizon Media

Today, we will be doing the operations below ETL and with this we will also be learning about Apache iceberg and how to build a lake house.

  1. Read data from MySQL server into Spark
  2. Create HIVE temp view from data frame
  3. Load filtered data into HUDI format (create initial table)
  4. Load filtered data again into HUDI format into same table
  5. Read HUDI tables using Spark data frame
  6. Create Temp HIVE of HUDI tables
  7. Explore data

First clone below GitHub repo, where we have all the required sample files and solution

https://github.com/developershomes/SparkETL/tree/main/Chapter8

If you don’t have setup for Spark instance follow earlier blog for setting up Data Engineering tools in your system. (Data Engineering suite will setup Spark, MySQL, PostgreSQL and MongoDB in your system) In that Spark instance we already have packages installed for Azure blog storage and Azure Data Lake Services.

Start Spark application with all required packages

First, we will start Spark session with all the required packages and configuration for Apache HUDI. We know that with our spark instance we don’t have packages (jar file) available for Apache HUDI, so when we start spark session, we need to externally specify that. We will also be using MySQL so we will specify package requirement for MySQL also.

With Apache Iceberg, we also need to pass the configurations below.

spark.sql.extensionsorg.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.sql.catalog.spark_catalogorg.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.serializerorg.apache.spark.serializer.KryoSerializer
spark.sql.catalog.local.warehousewarehouse

# First Load all the required library and also Start Spark Session
# Load all the required library
from pyspark.sql import SparkSession
#Start Spark Session
spark = SparkSession.builder.appName("chapter8")\
.config('spark.jars.packages', 'org.apache.hudi:hudi-spark3.2-bundle_2.12:0.13.0,mysql:mysql-connector-java:8.0.32')\
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension')\
.config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog')\
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')\
.config('spark.sql.catalog.local.warehouse','warehouse')\
.getOrCreate()
sqlContext = SparkSession(spark)
#Dont Show warning only error
spark.sparkContext.setLogLevel("ERROR")

Now, we have our spark session available with all the required packages and configuration, so we can start ETL process.

Read data from MySQL server into Spark

(If you have already completed chapter 7, you can skip read data from MySQL and create HIVE table, and can directly go to create HUDI table section)

For this ETL, we are also using the same MySQL as source system and are loading same table. We will not discuss much on how to load data from MySQL and how to create HIVE table as we have already discussed in detail in Chapter 7.

If you don’t have already uploaded data into MySQL, please follow the earlier blog for the same.

We will read this data from Spark and we will create a spark data frame and HIVE table on this.

#Load CSV file into DataFrame
mysqldf = spark.read \
.format("jdbc") \
.option("driver","com.mysql.cj.jdbc.Driver") \
.option("url", "jdbc:mysql://192.168.1.104:3306/DATAENG") \
.option("dbtable", "genericfood") \
.option("user", "root") \
.option("password", "mysql") \
.load()
#Checking dataframe schema
mysqldf.printSchema()
#Show data
mysqldf.show(n=10)

Create HIVE temp view from data frame

We will create HIVE temp view from data frame.

#Create Hive temp table from Dataframe
mysqldf.createOrReplaceTempView("tempFood")
#Write Spark SQL query on Temp view
sqlContext.sql("SELECT * FROM tempFood").show(n=20)

We will explore data and check the highest food group and filter with one group.

# Getting count of food by group and also displaying in descending or based on count
sqlContext.sql("SELECT GROUP,count(*) FROM tempFood GROUP BY 1 ORDER BY 2 DESC").show(truncate=False)
# creating dataframe in which we have only one group food.
newdf = sqlContext.sql("SELECT *, now() as ts FROM tempFood WHERE GROUP = 'Herbs and Spices'")
# Checking number of rows
newdf.count()

Load filtered data into HUDI format (create initial table)

We have data frame “newdf” available in which we have only one group food is there. We will use that to create the first hudi table.

hudi_options = {'hoodie.table.name': 'hudi_food','hoodie.datasource.write.recordkey.field': 'FOODNAME'}
basePath = "/opt/spark/SparkETL/Chapter8/hudi_food"
newdf.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(basePath)

With HUDI format we need to pass a few options. And some of the options are mandatory, if we don’t pass that it will not create table. Few mandatory columns which we have passed in our example.

hoodie.table.namehudi_food
hoodie.datasource.write.recordkey.fieldFOODNAME (name of unique column)
hoodie.datasource.write.precombine.fieldts (we have created column for this)

We also need to pass the base path.

Once, we prepare options parameters and base path, using format “hudi” we can create hudi table.

It will create a folder named “hudi_food” and create parquet file in which it will store data and metadata.

Inside “hudI_food”, we have metadata and parquet file in which we have data. Folder structure will be as below

Which is actually looks like this in folder

Inside “.hoddie” folder

properties file have all the properties of HUDI table.

parquet file with actual data.

If in case, if we also specify partition property, it will create folder as below

# with partition as Group
# also providing insert and upset parallelism
hudi_options = {
'hoodie.table.name': 'hudi_food',
'hoodie.datasource.write.recordkey.field': 'FOODNAME',
'hoodie.datasource.write.partitionpath.field': 'GROUP',
'hoodie.datasource.write.table.name': 'hudi_food',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2
}
basePath = "/opt/spark/SparkETL/Chapter8/hudi_food"
newdf.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(basePath)

It will create folder as below

Inside “hudi_food”

Load filtered data again into hudi format into same table

We will create one more data frame by filtering one more food group and then appending data into the same hudi table.

# Filter data based on new group
newdf1 = sqlContext.sql("SELECT *,now() as ts FROM tempFood WHERE GROUP = 'Fruits'")
# Append new data into same hudi table
newdf1.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(basePath)

It will create one more parquet file into the same folder.

Read HUDI tables using Spark data frame

Now, we will read HUDI table into Spark data frame.

# Location of HUDI table
basePath = "/opt/spark/SparkETL/Chapter8/hudi_food"
#Load delta file into DataFrame
hudidf = spark.read.format("hudi").load(basePath)
#Print Schema
hudidf.printSchema()
#Print data
hudidf.show()

Here, we see that there are extra columns added. First 5 columns which we have not added but added by HUDI data frame. Those are the metadata. As we discussed, HUDI is not creating separate files for storing metadata but it is storing into same file only.

_hoodie_commit_timeThis field contains the commit timestamp in the timeline that created this record. This enables granular, record-level history tracking on the table, much like database change-data-capture.
_hoodie_commit_seqnoThis field contains a unique sequence number for each record within each transaction. This serves much like offsets in Apache Kafka topics, to enable generating streams out of tables.
_hoodie_record_keyUnique record key identifying the record within the partition. Key is materialized to avoid changes to key field(s) resulting in violating unique constraints maintained within a table.
(We have passed food name)
_hoodie_partition_pathIf we pass partition column, it creates folders for each partition
so, it will give path of each record
_hoodie_file_nameFile name in which record is stored

Now, if we print data

If I do “truncate=False” to check commit time and sequence.

Commit time is: 20230322103927766

Which is 2023/03/22 10:39:27.766 (YYYY/MM/DD HH:mm:SS.sss)

and commit sequence number: 20230322103927766_0_0

Create Temp HIVE of HUDI tables

We have data available in data frame. Now we will create HIVE temp table so that we can write Spark SQL.

# Create HIVE table
hudidf.createOrReplaceTempView("tempHUDI")
# Select data from table
sqlContext.sql("SELECT * FROM tempHUDI").show()

Explore data

We can Spark SQL queries and explore data.

# Get count of rows
sqlContext.sql("SELECT count(*) FROM tempHUDI").show()
# Get subgroup
sqlContext.sql("SELECT DISTINCT(SUBGROUP) FROM tempHUDI").show()
# Get count by subgroup and order by based on count number in descening order
sqlContext.sql("SELECT SUBGROUP,count(*) FROM tempHUDI GROUP BY SUBGROUP ORDER BY 2 DESC ").show()

Conclusion:

Here, we have learned the concepts below.

  • Understating of Apache HUDI
  • How to install HUDI packages from Maven repo
  • How to configure Spark parameters for HUDI
  • How to create HUDI table and load data
  • How data is stored in HUDI format
  • How to read data from HUDI tables
  • How to write Spark SQL queries on HUDI

Video Explanation:

One thought on “Spark ETL Chapter 8 with Lakehouse | Apache HUDI

Add yours

Leave a comment

Create a website or blog at WordPress.com

Up ↑