Lesson 3: Spark Tutorial

Lesson 3: Spark Tutorial

This tutorial covers advanced Spark functionalities, including handling JSON, using UDFs, ranking with window functions, and writing to SQL databases.


Example 1: Reading and Writing JSON Data

This example demonstrates loading JSON data, writing it to a file, and viewing the schema.

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("Lesson3JSON").getOrCreate()

# Read JSON file into DataFrame
df = spark.read.option("multiline", True).json("data.json")

# Show the data
df.show()

# Print schema
df.printSchema()

# Write to a new JSON file
df.write.mode("overwrite").json("output.json")

Explanation:

  1. Initializing SparkSession: spark = SparkSession.builder.appName("Lesson3JSON").getOrCreate() creates a Spark session, allowing JSON data to be processed.

  2. Reading JSON Data: df = spark.read.option("multiline", True).json("data.json") loads the JSON file, ensuring it reads multi-line structures correctly.

  3. Displaying Schema and Data: df.show() prints records, while df.printSchema() reveals column data types.

  4. Writing JSON Output: df.write.mode("overwrite").json("output.json") saves the DataFrame to output.json, replacing existing content if present.


Example 2: Using UDFs (User-Defined Functions) in Spark

This example defines a custom transformation function and applies it using Spark's UDF (User-Defined Function) feature.

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Sample DataFrame
data = [("Alice", "NY"), ("Bob", "LA"), ("Charlie", "SF")]
df = spark.createDataFrame(data, ["Name", "City"])

# Define a UDF to add 'City: ' prefix
def format_city(city):
    return f"City: {city}"

# Register UDF
city_udf = udf(format_city, StringType())

# Apply UDF
df_transformed = df.withColumn("FormattedCity", city_udf(df.City))

# Show results
df_transformed.show()

Explanation:

  1. Creating Sample Data: data = [("Alice", "NY"), ("Bob", "LA"), ("Charlie", "SF")] represents a list of tuples, used to create a DataFrame.

  2. Defining a UDF: def format_city(city): return f"City: {city}" appends "City: " to each city name.

  3. Registering and Applying UDF: city_udf = udf(format_city, StringType()) creates a UDF, applied with df.withColumn("FormattedCity", city_udf(df.City)).

  4. Displaying Transformed Data: df_transformed.show() shows the new column "FormattedCity", demonstrating the custom transformation.


Example 3: Performing Window Functions in Spark

This example demonstrates how to use window functions for rank-based operations over a dataset.

from pyspark.sql.window import Window
from pyspark.sql.functions import rank

# Sample DataFrame
data = [("Alice", "HR", 5000), ("Bob", "HR", 4500), ("Charlie", "IT", 6000)]
df = spark.createDataFrame(data, ["Name", "Department", "Salary"])

# Define window specification
window_spec = Window.partitionBy("Department").orderBy(df.Salary.desc())

# Apply rank function
df_ranked = df.withColumn("Rank", rank().over(window_spec))

# Show results
df_ranked.show()

Explanation:

  1. Creating Sample Data: data = [("Alice", "HR", 5000), ("Bob", "HR", 4500), ("Charlie", "IT", 6000)] represents employees, their departments, and salaries.

  2. Defining Window Specification: window_spec = Window.partitionBy("Department").orderBy(df.Salary.desc()) ranks employees by salary within each department.

  3. Applying Rank Function: df_ranked = df.withColumn("Rank", rank().over(window_spec)) assigns a rank based on salary.

  4. Displaying Results: df_ranked.show() shows the ranked DataFrame, highlighting salary-based rankings per department.


Example 4: Writing Data to a SQL Database

This example demonstrates writing DataFrame data into an SQL database, useful for integrating Spark with relational databases.

from pyspark.sql import SparkSession
import sqlalchemy

# Initialize Spark Session
spark = SparkSession.builder.appName("Lesson3SQL").getOrCreate()

# Sample DataFrame
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["Name", "Age"])

# Define database connection properties
db_properties = {
    "user": "root",
    "password": "password123",
    "driver": "com.mysql.cj.jdbc.Driver"
}

# JDBC URL
jdbc_url = "jdbc:mysql://localhost:3306/mydatabase"

# Write DataFrame to SQL table
df.write.jdbc(url=jdbc_url, table="people", mode="overwrite", properties=db_properties)

Explanation:

  1. Creating Sample Data: data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)] initializes a DataFrame for SQL insertion.

  2. Setting Database Connection: db_properties = {"user": "root", "password": "password123", "driver": "com.mysql.cj.jdbc.Driver"} provides authentication details for MySQL.

  3. Defining JDBC URL: jdbc_url = "jdbc:mysql://localhost:3306/mydatabase" specifies the database address.

  4. Writing Data to SQL: df.write.jdbc(url=jdbc_url, table="people", mode="overwrite", properties=db_properties) inserts the DataFrame into the "people" table.