Glen Knight

NYC Based IT Professional

New – Insert, Update, Delete Data on S3 with Amazon EMR and Apache Hudi

Storing your data in Amazon S3 provides lots of benefits in terms of scale, reliability, and cost effectiveness. On top of that, you can leverage Amazon EMR to process and analyze your data using open source tools like Apache Spark, Hive, and Presto. As powerful as these tools are, it can still be challenging to deal with use cases where you need to do incremental data processing, and record-level insert, update, and delete.

Talking with customers, we found that there are use cases that need to handle incremental changes to individual records, for example:

  • Complying with data privacy regulations, where their users choose to exercise their right to be forgotten, or change their consent as to how their data can be used.
  • Working with streaming data, when you have to handle specific data insertion and update events.
  • Using change data capture (CDC) architectures to track and ingest database change logs from enterprise data warehouses or operational data stores.
  • Reinstating late arriving data, or analyzing data as of a specific point in time.

Starting today, EMR release 5.28.0 includes Apache Hudi (incubating), so that you no longer need to build custom solutions to perform record-level insert, update, and delete operations. Hudi development started in Uber in 2016 to address inefficiencies across ingest and ETL pipelines. In the recent months the EMR team has worked closely with the Apache Hudi community, contributing patches that include updating Hudi to Spark 2.4.4 (HUDI-12), supporting Spark Avro (HUDI-91), adding support for AWS Glue Data Catalog (HUDI-306), as well as multiple bug fixes.

Using Hudi, you can perform record-level inserts, updates, and deletes on S3 allowing you to comply with data privacy laws, consume real time streams and change data captures, reinstate late arriving data and track history and rollbacks in an open, vendor neutral format. You create datasets and tables and Hudi manages the underlying data format. Hudi uses Apache Parquet, and Apache Avro for data storage, and includes built-in integrations with Spark, Hive, and Presto, enabling you to query Hudi datasets using the same tools that you use today with near real-time access to fresh data.

When launching an EMR cluster, the libraries and tools for Hudi are installed and configured automatically any time at least one of the following components is selected: Hive, Spark, or Presto. You can use Spark to create new Hudi datasets, and insert, update, and delete data. Each Hudi dataset is registered in your cluster’s configured metastore (including the AWS Glue Data Catalog), and appears as a table that can be queried using Spark, Hive, and Presto.

Hudi supports two storage types that define how data is written, indexed, and read from S3:

  • Copy on Write – data is stored in columnar format (Parquet) and updates create a new version of the files during writes. This storage type is best used for read-heavy workloads, because the latest version of the dataset is always available in efficient columnar files.
  • Merge on Read – data is stored with a combination of columnar (Parquet) and row-based (Avro) formats; updates are logged to row-based “delta files” and compacted later creating a new version of the columnar files. This storage type is best used for write-heavy workloads, because new commits are written quickly as delta files, but reading the data set requires merging the compacted columnar files with the delta files.

Let’s do a quick overview of how you can set up and use Hudi datasets in an EMR cluster.

Using Apache Hudi with Amazon EMR
I start creating a cluster from the EMR console. In the advanced options I select EMR release 5.28.0 (the first including Hudi) and the following applications: Spark, Hive, and Tez. In the hardware options, I add 3 task nodes to ensure I have enough capacity to run both Spark and Hive.

When the cluster is ready, I use the key pair I selected in the security options to SSH into the master node and access the Spark Shell. I use the following command to start the Spark Shell to use it with Hudi:

$ spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
              --conf "spark.sql.hive.convertMetastoreParquet=false"
              --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

There, I use the following Scala code to import some sample ELB logs in a Hudi dataset using the Copy on Write storage type:

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor

//Set up various input values as variables
val inputDataPath = "s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1/"
val hudiTableName = "elb_logs_hudi_cow"
val hudiTablePath = "s3://MY-BUCKET/PATH/" + hudiTableName

// Set up our Hudi Data Source Options
val hudiOptions = Map[String,String](
    DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "request_ip",
    DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "request_verb", 
    HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
    DataSourceWriteOptions.OPERATION_OPT_KEY ->
        DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "request_timestamp", 
    DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", 
    DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, 
    DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "request_verb", 
    DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> "false", 
    DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->
        classOf[MultiPartKeysValueExtractor].getName)

// Read data from S3 and create a DataFrame with Partition and Record Key
val inputDF = spark.read.format("parquet").load(inputDataPath)

// Write data into the Hudi dataset
inputDF.write
       .format("org.apache.hudi")
       .options(hudiOptions)
       .mode(SaveMode.Overwrite)
       .save(hudiTablePath)

In the Spark Shell, I can now count the records in the Hudi dataset:

scala> inputDF2.count()
res1: Long = 10491958

In the options, I used the integration with the Hive metastore configured for the cluster, so that the table is created in the default database. In this way, I can use Hive to query the data in the Hudi dataset:

hive> use default;
hive> select count(*) from elb_logs_hudi_cow;
...
OK
10491958
...

I can now update or delete a single record in the dataset. In the Spark Shell, I prepare some variables to find the record I want to update, and a SQL statement to select the value of the column I want to change:

val requestIpToUpdate = "243.80.62.181"
val sqlStatement = s"SELECT elb_name FROM elb_logs_hudi_cow WHERE request_ip = '$requestIpToUpdate'"

I execute the SQL statement to see the current value of the column:

scala> spark.sql(sqlStatement).show()
+------------+                                                                  
|    elb_name|
+------------+
|elb_demo_003|
+------------+

Then, I select and update the record:

// Create a DataFrame with a single record and update column value
val updateDF = inputDF.filter(col("request_ip") === requestIpToUpdate)
                      .withColumn("elb_name", lit("elb_demo_001"))

Now I update the Hudi dataset with a syntax similar to the one I used to create it. But this time, the DataFrame I am writing contains only one record:

// Write the DataFrame as an update to existing Hudi dataset
updateDF.write
        .format("org.apache.hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
                DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .mode(SaveMode.Append)
        .save(hudiTablePath)

In the Spark Shell, I check the result of the update:

scala> spark.sql(sqlStatement).show()
+------------+                                                                  
|    elb_name|
+------------+
|elb_demo_001|
+------------+

Now I want to delete the same record. To delete it, I pass the EmptyHoodieRecordPayload payload in the write options:

// Write the DataFrame with an EmptyHoodieRecordPayload for deleting a record
updateDF.write
        .format("org.apache.hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
                DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY,
                "org.apache.hudi.EmptyHoodieRecordPayload")
        .mode(SaveMode.Append)
        .save(hudiTablePath)

In the Spark Shell, I see that the record is no longer available:

scala> spark.sql(sqlStatement).show()
+--------+                                                                      
|elb_name|
+--------+
+--------+

How are all those updates and deletes managed by Hudi? Let’s use the Hudi Command Line Interface (CLI) to connect to the dataset and see now those changes are interpreted as commits:

This dataset is a Copy on Write dataset, that means that each time there is an update to a record, the file that contains that record will be rewritten to contain the updated values. You can see how many records have been written for each commit. The bottom line of the table describes the initial creation of the dataset, above there is the single record update, and at the top the single record delete.

With Hudi, you can roll back to each commit. For example, I can roll back the delete operation with:

hudi:elb_logs_hudi_cow->commit rollback --commit 20191104121031

In the Spark Shell, the record is now back to where it was, just after the update:

scala> spark.sql(sqlStatement).show()
+------------+                                                                  
|    elb_name|
+------------+
|elb_demo_001|
+------------+

Copy on Write is the default storage type. I can repeat the steps above to create and update a Merge on Read dataset type by adding this to our hudiOptions:

DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "MERGE_ON_READ"

If you update a Merge on Read dataset and look at the commits with the Hudi CLI, you can see how different Merge on Read is compared to Copy on Write. With Merge on Read, you are only writing the updated rows and not whole files as with Copy on Write. This is why Merge on Read is helpful for use cases that require more writes, or update/delete heavy workload, with a fewer number of reads. Delta commits are written to disk as Avro records (row-based storage), and compacted data is written as Parquet files (columnar storage). To avoid creating too many delta files, Hudi will automatically compact your dataset so that your reads are as performant as possible.

When a Merge On Read dataset is created, two Hive tables are created:

  • The first table matches the name of the dataset.
  • The second table has the characters _rt appended to its name; the _rt postfix stands for real-time.

When queried, the first table return the data that has been compacted, and will not show the latest delta commits. Using this table provides the best performance, but omits the freshest data. Querying the real-time table will merge the compacted data with the delta commits on read, hence this dataset being called “Merge on Read”. This will result in the freshest data being available, but incurs a performance overhead, and is not as performant as querying the compacted data. In this way, data engineers and analysts have the flexibility to choose between performance and data freshness.

Available Now
This new feature is available now in all regions with EMR 5.28.0. There is no additional cost in using Hudi with EMR. You can learn more about Hudi in the EMR documentation. This new tool can simplify the way you process, update and delete data in S3. Let me know which use cases are you going to use it for!

Danilo


Source: AWS News

Leave a Reply

Your email address will not be published. Required fields as marked *.

This site uses Akismet to reduce spam. Learn how your comment data is processed.