deltalake tutorial with code examples
deltalake tutorial with code examples
example 1: creating and writing to a delta table
from delta.tables import deltaTable
from pyspark.sql import sparkSession
spark = sparkSession.builder.appName("delta_example").getOrCreate()
df = spark.read.csv("s3://data-source/sales.csv", header=True, inferSchema=True)
df.write.format("delta").mode("overwrite").save("s3://delta-lake/sales")
explanation
initialize spark session –
spark = sparkSession.builder.appName("delta_example").getOrCreate()
starts a spark session to enable processing of delta lake tables.read data from csv –
df =
spark.read
.csv("s3://data-source/sales.csv", header=True, inferSchema=True)
loads raw csv data from amazon s3 into a structured dataframe.write data to delta –
df.write.format("delta").mode("overwrite").save("s3://delta-lake/sales")
stores data in delta format, ensuring transaction support and schema enforcement.ensure data consistency – delta lake enforces aciD transactions, preventing partial writes and ensuring reliable storage for analytics and machine learning workloads.
example 2: reading and querying a delta table
df = spark.read.format("delta").load("s3://delta-lake/sales")
df.createOrReplaceTempView("sales_data")
result = spark.sql("SELECT product, SUM(amount) FROM sales_data GROUP BY product")
result.show()
explanation
read delta table –
df =
spark.read
.format("delta").load("s3://delta-lake/sales")
loads structured delta lake data, ensuring consistency and optimized performance for queries.create sql view –
df.createOrReplaceTempView("sales_data")
registers the dataset as a temporary sql table, enabling querying with spark sql.execute sql query –
result = spark.sql("SELECT product, SUM(amount) FROM sales_data GROUP BY product")
retrieves aggregated sales data by grouping products and summing sales amounts.display query output –
result.show
()
prints the computed sales totals per product, making real-time analytics possible within a databricks or spark environment.
example 3: implementing time travel in delta lake
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load("s3://delta-lake/sales")
df_v1.show()
df_date = spark.read.format("delta").option("timestampAsOf", "2024-02-24").load("s3://delta-lake/sales")
df_date.show()
explanation
retrieve past version –
df_v1 =
spark.read
.format("delta").option("versionAsOf", 1).load("s3://delta-lake/sales")
loads an earlier version of the delta table for historical analysis.display old records –
df_
v1.show
()
prints the dataset as it existed at version 1, enabling debugging and rollback capabilities in delta lake.query by timestamp –
df_date =
spark.read
.format("delta").option("timestampAsOf", "2024-02-24").load("s3://delta-lake/sales")
retrieves data as it was on a specific date.verify historical state –
df_
date.show
()
allows comparing past dataset states, ensuring consistency and enabling audits in regulatory or analytical workflows.
example 4: optimizing and vacuuming a delta table
from delta.tables import deltaTable
delta_table = deltaTable.forPath(spark, "s3://delta-lake/sales")
delta_table.optimize().executeCompaction()
delta_table.vacuum(168)
explanation
initialize delta table –
delta_table = deltaTable.forPath(spark, "s3://delta-lake/sales")
loads a delta lake table for maintenance operations like optimization and cleanup.run compaction –
delta_table.optimize().executeCompaction()
merges smaller files into larger ones, reducing i/o operations and improving query performance.remove old files –
delta_table.vacuum(168)
deletes obsolete data older than 168 hours (7 days), reclaiming storage space while maintaining access to recent records.boost query speed – optimization and vacuuming reduce data fragmentation, making analytics queries faster and more cost-efficient in large-scale data environments.
these four examples demonstrate essential deltalake functionalities, covering data storage, querying, time travel, and performance optimization.