At Keepler we are experts in the design, implementation and refactoring of high volume data processing systems in public cloud environments for corporate clients in sectors such as banking, insurance, telecoms, retail, pharmaceutical or industry, which require strict compliance with security. Therefore, we specialise in building data architectures that ensure the application of good security practices and that are based on data privacy by design.

One of the main concerns of our clients is complying with the requirements of the General Data Protection Regulation (GDPR). The right to be forgotten and the right to rectification are two very important aspects of GDPR, by which a person may request the removal of any record containing personal information related to himself or herself from all systems or the rectification of data that he or she considers to be incorrect.

These tasks are relatively easy to solve when working with Data Warehouses implemented in database engines, whether columnar, such as Amazon Redshift, or row oriented, such as Amazon RDS, using DELETE and UPDATE functions.

However, in many cases, the data resides in object storage systems such as Amazon S3, as they are usually more efficient for large Data Lakes because of their lower cost and simplicity. In these cases, where repositories based on the WORM model (Write Once, Read Many) are used, implementation of the rights to be forgotten and rectification is more complicated and usually ends up in expensive heavy batch processes that have to perform massive scanning tasks and generate new clean and corrected files.

Apache Hudi is an Open Source technology, recently supported by Amazon EMR, that can help us to simplify this task, since it implements UPDATE and DELETE type operations on data sets hosted in systems such as HDFS or S3 through analytical tools such as Spark and Hive.

The following is a proof of concept for implementation of the rights to be forgotten and to rectification using Apache Hudi in EMR with Spark on data stored in S3.

Premises

The proof of concept that we are going to develop here is part of a dataset containing information about characters from the Game of Thrones universe and their respective deaths. The most relevant fields of this dataset are the following:

  • Name: name of the character
  • Allegiances: home of the character
  • Year of death
  • Book of Death: book in which the character dies
  • Gender: gender, 1 is male, 0 is female

This dataset is stored in CSV format in an S3 loop, which we access from an EMR 5.28.0 cluster with Hudi support and using the following applications: Hive 2.3.6, Complexion 0.9.2 and Spark 2.4.4

From this cluster, we perform update (UPDATE) and delete (DELETE) operations to confirm whether Hudi complies with the strict privacy laws of Poniente.

Proof of concept

From version 5.28.0 of EMR, Apache Hudi is supported natively, so a new cluster is created consisting of:

  • 1 Master instance.
  • 3 Task instances.
  • 2 Core instances.

 

The operation is performed from the Master instance, where we use the Spark, Hive and Hudi command line tools to check the behaviour of the UPDATE and DELETE instructions on the data stored in S3.

The CSV of the test dataset is stored in S3, to be used as a source when importing the new Hudi data set through a Scala script, which provides an easy way to apply the changes.

Before running the Scala script, the Spark console must be initiated so that it is configured to use Hudi. This is undertaken using the following command:

spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
--conf "spark.sql.hive.convertMetastoreParquet=false"
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

Now our environment is ready to start working with Hudi.

The Hudi options are configured by the Scala code for generating the new dataset that reads the data about the Throne Game characters and their deaths, stored in S3.  The configuration files needed to keep the Hudi data are then created over the “character deaths” directory.

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor

//Set up various input values as variables
val fileName = "csv/character-deaths.csv"
val inputDataPath = "s3://apache-hudi-emr/" + fileName
val hudiTableName = "character_deaths"
val hudiTablePath = "s3://apache-hudi-emr/" + hudiTableName

// Set up our Hudi Data Source Options
val hudiOptions = Map[String,String](
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "Name",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "Gender",
HoodieWriteConfig.TABLE_NAME -> hudiTableName,
"hoodie.cleaner.policy" -> "KEEP_LATEST_FILE_VERSIONS",
"hoodie.keep.max.commits" -> "2",
"hoodie.keep.min.commits" -> "1",
"hoodie.cleaner.commits.retained" -> "0",
DataSourceWriteOptions.OPERATION_OPT_KEY ->
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "Allegiances",
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName,
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "Gender",
DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> "false",
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->
classOf[MultiPartKeysValueExtractor].getName)

// Read data from S3 and create a DataFrame with Partition and Record Key
val inputDF = spark.read.format("csv").option("header", "true").load(inputDataPath)

// Write data into the Hudi dataset
inputDF.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)

Once the new data set has been generated, we can become the judges deciding the future of the Game of Thrones characters. There are too many deaths for all the fans to be happy with the result, so let’s take advantage of Hudi to change the destiny of some of the main characters in the story.

First of all, in order to confirm the correct transformation of data, we carried out a basic consultation from Spark to verify that no error had occurred during the process.

Perfect, this confirms that all CSV data has been correctly exported to our new Hudi dataset in S3. Next we will use Hive to perform some queries in order to find the data we are interested in working on to obtain an improved version of Game of Thrones.

We selected Robb Stark, one of the characters whose end in the saga was not too encouraging, and analysed which house he is associated with and the book in which he dies.

In this case, our colleague Robb informed us that he decided to move house shortly before his death (due to some kind of irreconcilable differences with his family) and we need Hudi to help us update his information using Scala in order to comply with his request, which is covered by the GDPR’s right to rectification.

// Update table record
val deathToUpdate = "Robb Stark"

// Change Allegiances to "House Lannister"
val newAllegiance = "House Lannister"
val updateDF = inputDF.filter(col("name") === deathToUpdate).withColumn("Allegiances", lit(newAllegiance))

// The record was changed but we still need to write it to Hudi
updateDF.write.format("org.apache.hudi")
.options(hudiOptions)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(hudiTablePath)

We run the update script and confirm that Robb’s affiliation is correct again and he’s already a Lannister (too bad we can’t make him blonde).

However, he is still not comfortable with his situation (it seems he would prefer to stay alive) and makes one final request, based on his rights as described by the GDPR: we must remove his data from our system.

// Update table record
val deathToDelete = "Robb Stark"
val updateDF = inputDF.filter(col("name") === deathToDelete)

// Remove the entry from the character_deaths list
updateDF.write.format("org.apache.hudi")
.options(hudiOptions)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.EmptyHoodieRecordPayload")
.mode(SaveMode.Append).save(hudiTablePath)

Hive allows us to confirm that we have brought our client back to life and have been able to complete our alternative reality in Game of Thrones.

One of the advantages of using Hudi in high data volume processing environments is the facilitation of the correct application of certain GDPR policies, as mentioned previously. However, if we analyse the technology in depth, we discover a feature that prevents us from achieving our ultimate goal in terms of total data erasure, since Hudi natively implements a system of registration of “Commits” that keeps a copy of the updated or deleted elements and allows to go back to a previous state.

Therefore, if we want these operations to be completely removed from the whole system, additional actions must be undertaken in order to perform permanent deletions of the data stored in S3.

Natively, Apache Hudi allows you to configure the retention of commits and the maximum and minimum number of commits stored in S3. Through Clean policies, Hudi manages the archiving of obsolete commit files so that they are not accessible from the console.

"hoodie.cleaner.policy" -> "KEEP_LATEST_FILE_VERSIONS",
"hoodie.keep.max.commits" -> "2",
"hoodie.keep.min.commits" -> "1",
"hoodie.cleaner.commits.retained" -> "0",

However, due to the way the tool works, it is not possible to configure complete deletion of the commit storage, so in order to comply with the GDPR “right to be forgotten” regulation, total deletion of data must be supported by a mechanism allowing disposal of the retained data copies.

The structure of a Hudi dataset in S3, including files that represent each state of the data at the time the dataset is updated, is shown in the following screenshot. In this case, a configuration is used that never actively keeps more than two commits.

In order to keep only the last version of the dataset, one of the options is the use of a Lambda function that is invoked periodically, in order to eliminate all those files that reference previous commits, keeping only the current state of the data.

import os
from datetime import datetime
import boto3

# S3 target
s3_bucket = "apache-hudi-emr"
s3_prefix = "character_deaths/.hoodie"
hudi_reg = [".commit", ".clean"]

client = boto3.client('s3')
response = client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)

hudi_keys = []
for object in response['Contents']:
key = object['Key']
# Look for .commit and .clean files
if any(hudi_suffix in key for hudi_suffix in hudi_reg):
# Extract creation timestamp from filename
timestamp = key.split("/")[-1].split(".")[0]
epoch_time = datetime.strptime(timestamp, '%Y%m%d%H%M%S').timestamp()
hudi_keys.append({"Key": key, "Date": epoch_time})

# Sort based on timestamp
hudi_sorted = (sorted(hudi_keys, key = lambda i: i['Date']))
# Maintain latest .commit and .clean file
commit_files = [commit for commit in hudi_sorted if ".commit" in commit['Key']][:-1]
clean_files = [clean for clean in hudi_sorted if ".clean" in clean['Key']][:-1]
# Files to be deleted
to_delete = commit_files + clean_files

for delete_object in to_delete:
key = delete_object['Key']
client.delete_object(Bucket=s3_bucket, Key=key)

For this proof of concept we have written a small Python script that, given the name of the S3 loop and the prefix of the Hudi dataset, takes care of maintaining only the files associated with the last commit. This basic code can be evolved to create a fully parametrised tool to maintain multiple Hudi databases simultaneously and, in addition, implement total deletion of archived data if necessary. The final result of the Apache Hudi directory is as follows:

In this case, if we make a query from Hive about the Hudi dataset, we access the latest version of the data where Robb has exercised his GDPR right to be forgotten and we do not see the deleted commits in the Hudi command line interface, so the data is not recoverable by rollback.

Conclusions

This proof of concept validates some of the main features of Apache Hudi from a functional point of view, such as its ability to perform UPDATE and DELETE operations on datasets residing in S3.

We have verified that Hudi is able to carry out these operations.  However, to respond effectively to the rights that end users have as part of GDPR, we must take into account the internal functioning of Hudi, specifically the use of commit files that version the dataset when carrying out updates or deletions. This behaviour means that independently, Hudi is not a valid tool for GDPR compliance, although we do consider that it is very useful if combined with other mechanisms (such as the Lambda function proposed in this post) that performs the cleaning of unwanted historical files, thus achieving deletions or effective updates.

On the other hand, in order to consider Hudi as a viable solution for the implementation of rights to rectification or to be forgotten in productive environments, it would be necessary to carry out performance tests that allow us to measure its effectiveness in the face of considerable volumetric datasets. We have considered that these tests are too extensive to include them in this post, so we will continue working on them and try to publish the results in a new article.

Image: unsplash | @markusspiske

Author

  • Cloud Engineer en Keepler Data Tech. "I am passionate about Linux, Cloud Computing, Security and everything related to the Internet. My experience is within the cyber security ecosystem and as a cloud project engineer. I love to learn and face challenges every day".