En Keepler somos expertos en el diseño, implementación y refactorización de sistemas de procesamiento de grandes volúmenes de datos en entornos de nube pública para clientes corporativos de sectores como banca, seguros, telco, retail, farma o industria, que requieren de un cumplimiento estricto en materia de seguridad. Por tanto, nos especializamos en construir arquitecturas de datos que aseguran la aplicación de las buenas prácticas de securización y se fundamentan en la privacidad del dato por diseño.

Una de las principales preocupaciones de nuestros clientes es cumplir con los requisitos que establece el Reglamento General de Protección de Datos (GDPR, por sus siglas en inglés). Aspectos muy importantes del mismo son los denominados derecho de “olvido” y derecho de “rectificación”. Ejercitando estos derechos, una persona podría solicitar eliminar de todos los sistemas cualquier registro que contenga información personal relacionada con ella o rectificar datos que considere erróneos.

Cuando trabajamos con Data Warehouses implementados en motores de bases de datos, ya sean columnares, como Amazon Redshift, u orientados a filas, como Amazon RDS, estas tareas son relativamente sencillas de resolver, puesto que contamos con operaciones de tipo DELETE y UPDATE.

Sin embargo, en mucho casos, los datos residen en sistemas de almacenamiento de objetos como Amazon S3, ya que suelen ser más eficientes para grandes Data Lakes, por su menor coste y sencillez. En estos casos, en los que trabajamos con repositorios basados en el modelo WORM (Write Once, Read Many), la implementación de los derechos de olvido y rectificación es más complicada y, habitualmente, termina derivando en costosos procesos batch pesados que tienen que realizar tareas masivas de escaneo y generación de nuevos ficheros limpios y corregidos.

Apache Hudi es una tecnología Open Source soportada recientemente por Amazon EMR que puede ayudarnos a simplificar esta tarea, ya que implementa operaciones de tipo UPDATE y DELETE sobre conjuntos de datos alojados en sistemas como HDFS o S3 a través de herramientas analíticas como Spark y Hive.

A continuación, presentamos una prueba de concepto para la implementación de los derechos de olvido y rectificación utilizando Apache Hudi en EMR con Spark sobre datos almacenados en S3.

Premisas

La prueba de concepto que vamos a desarrollar parte de un dataset que contiene información sobre personajes del universo de Juego de Tronos y sus respectivas muertes. Los campos más relevantes de este dataset son los siguientes:

  • Name: nombre del personaje
  • Allegiances: casa del personaje
  • Death Year: año de la muerte
  • Book of Death: libro en el que se produce la muerte
  • Gender: género, 1 es hombre, 0 es mujer

Este dataset estará almacenado en formato CSV en un bucket de S3, al que accederemos desde un cluster de EMR 5.28.0 con soporte para Hudi y con las siguientes aplicaciones: Hive 2.3.6, Tez 0.9.2 y Spark 2.4.4.

Desde este cluster, realizaremos operaciones de actualización (UPDATE) y borrado (DELETE) para validar si Hudi cumple con las estrictas leyes de privacidad de Poniente.

Prueba de concepto

A partir de la versión 5.28.0 de EMR, Apache Hudi se encuentra soportado de forma nativa, por lo que se crea un nuevo cluster formado por:

  • 1 instancia Master.
  • 3 instancias Task.
  • 2 instancias Core.

La operativa se realizará desde la instancia Master, en la que utilizaremos las herramientas de línea de comandos de Spark, Hive y Hudi para comprobar el comportamiento de las instrucciones UPDATE y DELETE sobre los datos almacenados en S3.

El CSV del dataset de prueba se almacena en S3 para utilizarlo como fuente a la hora de importar el nuevo conjunto de datos de Hudi a través de un script de Scala, que proporciona una forma sencilla de aplicar los cambios.

De forma previa a la ejecución del script de Scala, es necesario iniciar la consola de Spark para que ésta esté configurada para utilizar Hudi. Para ello la invocamos a través del siguiente comando.


spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
--conf "spark.sql.hive.convertMetastoreParquet=false"
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

Ya tenemos nuestro entorno listo para poder empezar a trabajar con Hudi.

El código de Scala para la generación del nuevo dataset configura las opciones de Hudi y realiza la lectura de los datos de personajes de Juego de Tronos y sus muertes, almacenados en S3 y, a continuación, crea sobre el directorio “character_deaths” los ficheros de configuración necesarios para mantener los datos de Hudi.


import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor

//Set up various input values as variables
val fileName = "csv/character-deaths.csv"
val inputDataPath = "s3://apache-hudi-emr/" + fileName
val hudiTableName = "character_deaths"
val hudiTablePath = "s3://apache-hudi-emr/" + hudiTableName

// Set up our Hudi Data Source Options
val hudiOptions = Map[String,String](
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "Name",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "Gender",
HoodieWriteConfig.TABLE_NAME -> hudiTableName,
"hoodie.cleaner.policy" -> "KEEP_LATEST_FILE_VERSIONS",
"hoodie.keep.max.commits" -> "2",
"hoodie.keep.min.commits" -> "1",
"hoodie.cleaner.commits.retained" -> "0",
DataSourceWriteOptions.OPERATION_OPT_KEY ->
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "Allegiances",
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName,
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "Gender",
DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> "false",
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->
classOf[MultiPartKeysValueExtractor].getName)

// Read data from S3 and create a DataFrame with Partition and Record Key
val inputDF = spark.read.format("csv").option("header", "true").load(inputDataPath)

// Write data into the Hudi dataset
inputDF.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)

Una vez generado el nuevo conjunto de datos, podemos convertirnos en jueces del devenir de los personajes de Juego de Tronos. Demasiadas muertes para que todos los fans queden contentos con el resultado, así que vamos a aprovechar las ventajas de Hudi para cambiar a nuestro antojo el destino de alguno de los protagonistas de la historia.

En primer lugar, con el objetivo de validar la correcta transformación de los datos, realizamos una consulta básica desde Spark para verificar que no se ha producido ningún fallo durante el proceso.

Perfecto, esto nos confirma que todos los datos del CSV han sido exportados correctamente a nuestro nuevo dataset de Hudi en S3. A continuación vamos a utilizar Hive para realizar algunas queries y así encontrar los datos sobre los que nos interesa operar para obtener una versión mejorada de Juego de Tronos.

Seleccionamos uno de los personajes cuyo final en la saga no fue demasiado alentador, Robb Stark, y analizamos cuál es la casa a la que está asociado y el libro en el que se produce la muerte.

En este caso, nuestro compañero Robb nos ha comunicado que decidió cambiar de casa poco antes de morir (por algún tipo de enemistad irremediable con su familia) y necesitamos que Hudi nos ayude a actualizar su información utilizando Scala para poder cumplir con su solicitud, amparada por el derecho de rectificación de la GDPR.


// Update table record
val deathToUpdate = "Robb Stark"

// Change Allegiances to "House Lannister"
val newAllegiance = "House Lannister"
val updateDF = inputDF.filter(col("name") === deathToUpdate).withColumn("Allegiances", lit(newAllegiance))

// The record was changed but we still need to write it to Hudi
updateDF.write.format("org.apache.hudi")
.options(hudiOptions)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(hudiTablePath)

Ejecutamos el script de actualización y validamos que la afiliación de Robb es correcta de nuevo y ya es todo un Lannister (una pena que no le podamos hacer rubio).

Sin embargo, sigue sin encontrarse a gusto con su situación (parece que preferiría seguir con vida) y nos hace una última solicitud en base a sus derechos descritos por la GDPR: debemos eliminar sus datos de nuestro sistema.


// Update table record
val deathToDelete = "Robb Stark"
val updateDF = inputDF.filter(col("name") === deathToDelete)

// Remove the entry from the character_deaths list
updateDF.write.format("org.apache.hudi")
.options(hudiOptions)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.EmptyHoodieRecordPayload")
.mode(SaveMode.Append).save(hudiTablePath)

Hive nos permite validar que hemos devuelto a la vida a nuestro cliente y hemos podido completar nuestra realidad alternativa en Juego de Tronos.

Una de las ventajas de la utilización de Hudi en entornos de procesado de grandes volúmenes de datos es la facilitación de la correcta aplicación de ciertas políticas de GDPR, como se ha introducido previamente. Sin embargo, si analizamos la tecnología en profundidad, nos encontramos con una característica que impide conseguir nuestro objetivo final en cuanto al borrado total de datos, puesto que Hudi implementa de forma nativa un sistema de registro de “Commits” que mantiene una copia de los elementos actualizados o eliminados y permite retroceder a un estado anterior.

Por tanto, es necesario implementar acciones adicionales para realizar borrados permanentes de los datos almacenados en S3, si realmente queremos que estos sean eliminados completamente de todo el sistema.

De forma nativa, Apache Hudi permite configurar la retención de commits y el número máximo y mínimo de estos que se mantienen almacenados en S3. A través de unas políticas de Clean, Hudi gestiona el archivado de los ficheros de commit obsoletos de forma que no sean accesible desde la consola.


"hoodie.cleaner.policy" -> "KEEP_LATEST_FILE_VERSIONS",
"hoodie.keep.max.commits" -> "2",
"hoodie.keep.min.commits" -> "1",
"hoodie.cleaner.commits.retained" -> "0",

Sin embargo, debido al funcionamiento de la herramienta, no es posible realizar una configuración en la que se suprima completamente el almacenamiento de commits, por lo que el borrado total de los datos debe apoyarse en un mecanismo que permita deshacerse de las copias de datos retenidos para el cumplimiento del “derecho al olvido” de la normativa GDPR.

La estructura de un dataset Hudi en S3, incluidos los archivos que representan cada estado de los datos en el momento en que se actualiza el conjunto de datos, se muestra en la siguiente captura de pantalla. En este caso, se emplea una configuración que en ningún caso mantiene más de dos commits de forma activa.

Con el objetivo de mantener sólo la última de las versiones del dataset, una de las opciones es la utilización de una función Lambda que sea invocada de forma periódica, para así eliminar todos aquellos archivos que referencien Commits previos, manteniendo únicamente el estado actual de los datos.


import os
from datetime import datetime
import boto3

# S3 target
s3_bucket = "apache-hudi-emr"
s3_prefix = "character_deaths/.hoodie"
hudi_reg = [".commit", ".clean"]

client = boto3.client('s3')
response = client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)

hudi_keys = []
for object in response['Contents']:
key = object['Key']
# Look for .commit and .clean files
if any(hudi_suffix in key for hudi_suffix in hudi_reg):
# Extract creation timestamp from filename
timestamp = key.split("/")[-1].split(".")[0]
epoch_time = datetime.strptime(timestamp, '%Y%m%d%H%M%S').timestamp()
hudi_keys.append({"Key": key, "Date": epoch_time})

# Sort based on timestamp
hudi_sorted = (sorted(hudi_keys, key = lambda i: i['Date']))
# Maintain latest .commit and .clean file
commit_files = [commit for commit in hudi_sorted if ".commit" in commit['Key']][:-1]
clean_files = [clean for clean in hudi_sorted if ".clean" in clean['Key']][:-1]
# Files to be deleted
to_delete = commit_files + clean_files

for delete_object in to_delete:
key = delete_object['Key']
client.delete_object(Bucket=s3_bucket, Key=key)

Para esta prueba de concepto hemos escrito un pequeño script de Python que, dado el nombre del bucket de S3 y el prefijo del dataset de Hudi, se encarga de mantener únicamente los archivos asociados al último commit. Sobre este código básico, se puede evolucionar para crear una herramienta totalmente parametrizada para mantener múltiples bases de datos de Hudi de forma simultánea y, además, implemente el borrado total de los datos archivados, en caso de que sea necesario. El resultado final del directorio de Apache Hudi, es el siguiente:

En este caso, si realizamos una consulta desde Hive sobre el dataset de Hudi, accederemos a la última versión de los datos en la que Robb ha ejercido su derecho GDPR a ser olvidado y perderemos la visibilidad sobre los commits eliminados en la interfaz de línea de comandos de Hudi, por lo que los datos no son recuperables mediante rollback.

Conclusiones

En esta prueba de concepto se han validado, desde un punto de vista funcional, algunas de las principales características de Apache Hudi, como son su capacidad para realizar operaciones UPDATE y DELETE sobre datasets que residen en S3.

Hemos comprobado que Hudi es capaz de llevar a cabo estas operaciones, sin embargo, para dar respuesta de manera efectiva a los derechos que tienen los usuarios finales como parte de la normativa GDPR, se debe tener en cuenta el funcionamiento interno de Hudi, en concreto la utilización de ficheros de commits que versionan el dataset cuando se llevan a cabo actualizaciones o borrados. Dicho comportamiento hace que Hudi, de manera independiente, no sea una herramienta válida para el cumplimiento de GDPR, aunque sí consideramos que puede ser muy útil, si se combina con otros mecanismos (como la función Lambda que se propone en este post) que realicen la limpieza de los ficheros históricos que no se deseen mantener, consiguiendo así borrados o actualizaciones efectivas.

Por otro lado, para poder considerar a Hudi como una solución viable para la implementación de los derechos de rectificación y olvido en entornos productivos, sería necesario llevar a cabo pruebas de rendimiento que nos permitan medir su eficacia cuando dicha herramienta se enfrenta a datasets de volumetrías considerables. Hemos considerado que estas pruebas tienen demasiada entidad para incluirlas en este post, así que seguiremos trabajando en ellas e intentaremos publicar los resultados en un nuevo artículo.

Imagen: unsplash | @markusspiske

Author

  • Carlos Salas

    Cloud Engineer en Keepler Data Tech. "I am passionate about Linux, Cloud Computing, Security and everything related to the Internet. My experience is within the cyber security ecosystem and as a cloud project engineer. I love to learn and face challenges every day".