Lesson 3: Spark Tutorial
Lesson 3: Spark Tutorial
This tutorial covers advanced Spark functionalities, including handling JSON, using UDFs, ranking with window functions, and writing to SQL databases.
Example 1: Reading and Writing JSON Data
This example demonstrates loading JSON data, writing it to a file, and viewing the schema.
from pyspark.sql import SparkSession
# Initialize Spark Session
spark = SparkSession.builder.appName("Lesson3JSON").getOrCreate()
# Read JSON file into DataFrame
df = spark.read.option("multiline", True).json("data.json")
# Show the data
df.show()
# Print schema
df.printSchema()
# Write to a new JSON file
df.write.mode("overwrite").json("output.json")
Explanation:
Initializing SparkSession:
spark = SparkSession.builder.appName("Lesson3JSON").getOrCreate()
creates a Spark session, allowing JSON data to be processed.Reading JSON Data:
df =
spark.read
.option("multiline", True).json("data.json")
loads the JSON file, ensuring it reads multi-line structures correctly.Displaying Schema and Data:
df.show
()
prints records, whiledf.printSchema()
reveals column data types.Writing JSON Output:
df.write.mode("overwrite").json("output.json")
saves the DataFrame tooutput.json
, replacing existing content if present.
Example 2: Using UDFs (User-Defined Functions) in Spark
This example defines a custom transformation function and applies it using Spark's UDF (User-Defined Function) feature.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Sample DataFrame
data = [("Alice", "NY"), ("Bob", "LA"), ("Charlie", "SF")]
df = spark.createDataFrame(data, ["Name", "City"])
# Define a UDF to add 'City: ' prefix
def format_city(city):
return f"City: {city}"
# Register UDF
city_udf = udf(format_city, StringType())
# Apply UDF
df_transformed = df.withColumn("FormattedCity", city_udf(df.City))
# Show results
df_transformed.show()
Explanation:
Creating Sample Data:
data = [("Alice", "NY"), ("Bob", "LA"), ("Charlie", "SF")]
represents a list of tuples, used to create a DataFrame.Defining a UDF:
def format_city(city): return f"City: {city}"
appends"City: "
to each city name.Registering and Applying UDF:
city_udf = udf(format_city, StringType())
creates a UDF, applied withdf.withColumn("FormattedCity", city_udf(
df.City
))
.Displaying Transformed Data:
df_
transformed.show
()
shows the new column"FormattedCity"
, demonstrating the custom transformation.
Example 3: Performing Window Functions in Spark
This example demonstrates how to use window functions for rank-based operations over a dataset.
from pyspark.sql.window import Window
from pyspark.sql.functions import rank
# Sample DataFrame
data = [("Alice", "HR", 5000), ("Bob", "HR", 4500), ("Charlie", "IT", 6000)]
df = spark.createDataFrame(data, ["Name", "Department", "Salary"])
# Define window specification
window_spec = Window.partitionBy("Department").orderBy(df.Salary.desc())
# Apply rank function
df_ranked = df.withColumn("Rank", rank().over(window_spec))
# Show results
df_ranked.show()
Explanation:
Creating Sample Data:
data = [("Alice", "HR", 5000), ("Bob", "HR", 4500), ("Charlie", "IT", 6000)]
represents employees, their departments, and salaries.Defining Window Specification:
window_spec = Window.partitionBy("Department").orderBy(df.Salary.desc())
ranks employees by salary within each department.Applying Rank Function:
df_ranked = df.withColumn("Rank", rank().over(window_spec))
assigns a rank based on salary.Displaying Results:
df_
ranked.show
()
shows the ranked DataFrame, highlighting salary-based rankings per department.
Example 4: Writing Data to a SQL Database
This example demonstrates writing DataFrame data into an SQL database, useful for integrating Spark with relational databases.
from pyspark.sql import SparkSession
import sqlalchemy
# Initialize Spark Session
spark = SparkSession.builder.appName("Lesson3SQL").getOrCreate()
# Sample DataFrame
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["Name", "Age"])
# Define database connection properties
db_properties = {
"user": "root",
"password": "password123",
"driver": "com.mysql.cj.jdbc.Driver"
}
# JDBC URL
jdbc_url = "jdbc:mysql://localhost:3306/mydatabase"
# Write DataFrame to SQL table
df.write.jdbc(url=jdbc_url, table="people", mode="overwrite", properties=db_properties)
Explanation:
Creating Sample Data:
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
initializes a DataFrame for SQL insertion.Setting Database Connection:
db_properties = {"user": "root", "password": "password123", "driver": "com.mysql.cj.jdbc.Driver"}
provides authentication details for MySQL.Defining JDBC URL:
jdbc_url = "jdbc:mysql://
localhost:3306/mydatabase
"
specifies the database address.Writing Data to SQL:
df.write.jdbc(url=jdbc_url, table="people", mode="overwrite", properties=db_properties)
inserts the DataFrame into the"people"
table.