I am specifically looking to optimize performance by updating and inserting data to a DeltaLake base table, with about 4 trillion records.
Environment:
- Spark 3.0.0
- DeltaLake 0.7.0
In context this is about making an incremental table via DeltaLake, I'll summarize this in steps to be more detailed:
- Creation of the base table (delta)
- Obtaining periodic data
- Add the data to the base table
Steps 1 and 2 have already been done, but when adding the data the performance is notoriously slow, for example adding a 9GB CSV takes about 6 hours, this mainly because delta needs to rewrite the data for each update, it also needs "read" all data from the database.
This table is also partitioned (PARTITIONED BY) and stored in the cluster's GDFS (HDFS) to ensure that the spark nodes can perform the operations.
The fields of the base table:
| Name | Type | Cardinality | Comment |
|---|---|---|---|
| ID | int |
10000 | Identifier |
| TYPE | string |
30 | |
| LOCAL_DATE | date |
Local date of the record | |
| DATE_UTC | date |
UTC date of registration | |
| VALUE | int |
Registry value | |
| YEAR | int |
4 | Calculated column |
| MONTH | int |
12 | Calculated column |
| DAY | int |
31 | Calculated column |
As the general search is by time, it was decided to partition by the LOCAL_DATE column in YEAR, MONTH, DAY, partitioning by the ID and LOCAL_DATE columns was ruled out due to its high level of cardinality, (which for performance purposes is worse), it was added finally TYPE, being as follows:
spark.sql(f"""
CREATE OR REPLACE TABLE {TABLE_NAME} (
ID INT,
FECHA_LOCAL TIMESTAMP,
FECHA_UTC TIMESTAMP,
TIPO STRING,
VALUE DOUBLE,
YEAR INT,
MONTH INT,
DAY INT )
USING DELTA
PARTITIONED BY (YEAR , MONTH , DAY, TIPO)
LOCATION '{location}'
""")
From now on, the incrementality is given by periodically adding these csv files of approximately 9GB every 5 days. Currently the MERGE operation is as follows:
spark.sql(f"""
MERGE INTO {BASE_TABLE_NAME}
USING {INCREMENTAL_TABLE_NAME} ON
--partitioned cols
{BASE_TABLE_NAME}.YEAR = {INCREMENTAL_TABLE_NAME}.YEAR AND
{BASE_TABLE_NAME}.MONTH = {INCREMENTAL_TABLE_NAME}.MONTH AND
{BASE_TABLE_NAME}.DAY = {INCREMENTAL_TABLE_NAME}.DAY AND
{BASE_TABLE_NAME}.TIPO = {INCREMENTAL_TABLE_NAME}.TIPO AND
{BASE_TABLE_NAME}.FECHA_LOCAL = {INCREMENTAL_TABLE_NAME}.FECHA_LOCAL AND
{BASE_TABLE_NAME}.ID = {INCREMENTAL_TABLE_NAME}.ID
WHEN MATCHED THEN UPDATE
SET {BASE_TABLE_NAME}.VALUE = {INCREMENTAL_TABLE_NAME}.VALUE,
{BASE_TABLE_NAME}.TIPO = {INCREMENTAL_TABLE_NAME}.TIPO
WHEN NOT MATCHED THEN INSERT *
""")
Some facts to consider:
- The time of this MERGE operation is 6 hours
- The base table was created from 230GB CSV data (55GB now in delta!)
- The spark application configuration is in cluster mode with the following parameters
- The infra consists of 3 nodes, 32 cores and 250GB RAM each, although it takes up less for security than the other existing applications approximately -50% of resources.
Spark app:
mode = 'spark://spark-master:7077'
# mode = 'local [*]'
spark = (
SparkSession.builder
.master(mode)
.appName("SparkApp")
.config('spark.cores.max', '45')
.config('spark.executor.cores', '5')
.config('spark.executor.memory', '11g')
.config('spark.driver.memory', '120g')
.config("spark.sql.shuffle.partitions", "200") # 200 only for 200GB delta table reads
.config("spark.storage.memoryFraction", "0.8")
# DeltaLake configs
.config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
# Delta optimization
.config("spark.databricks.delta.optimizeWrite.enabled", "true")
.config("spark.databricks.delta.retentionDurationCheck.enabled", "false")
.getOrCreate()
)