0% found this document useful (0 votes)
29 views8 pages

Scenario Series 19 - Handling JSON in Pyspark

..,....,...,

Uploaded by

Rajesh Tarra
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
29 views8 pages

Scenario Series 19 - Handling JSON in Pyspark

..,....,...,

Uploaded by

Rajesh Tarra
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 8

Scenario Series : Working With JSON files with different

scenarios
1. Reading Basic JSON Files

Challenge: Loading a single JSON file is straightforward, but you need to ensure the path is
correct.

Solution:

# Load a single JSON file into a DataFrame


df = spark.read.json("path/to/file.json")

# Show the DataFrame


df.show(truncate=False)

2. Handling Multiple JSON Files

Challenge: Reading multiple JSON files from a directory at once.

Solution: Point to the directory with a wildcard path:

# Load multiple JSON files from a directory into a DataFrame


df = spark.read.json("path/to/directory/*.json")

# Show the DataFrame


df.show(truncate=False)

3. Handling Deeply Nested JSON

Challenge: Extracting and manipulating deeply nested JSON data can be complex.

Example JSON Data:

{
"id": 1,
"name": "John",
"address": {
"street": "123 Elm St",
"city": "Springfield"

Follow me on LinkedIn – Shivakiran kotur


},
"orders": [
{"order_id": 1001, "amount": 250.0},
{"order_id": 1002, "amount": 150.0}
]
}

Solution: Flatten the nested structure.

# Load JSON file


df = spark.read.format("json").option("multiline",
"true").load("dbfs:/FileStore/shared_uploads/Deeply_Nested_JSON-
1.json")
df.show(truncate = False)

from pyspark.sql.functions import explode, col

# Flatten nested fields


flattened_df = df.select(
col("id"),
col("name"),
col("address.street"),
col("address.city"),
explode(col("orders")).alias("order")
)

flattened_df.show(truncate =False)

Follow me on LinkedIn – Shivakiran kotur


# Further flatten order details
flattened_df = flattened_df.select(
"id", "name", "street", "city",
col("order.order_id").alias("order_id"),
col("order.amount").alias("order_amount")
)

flattened_df.show(truncate=False)

3. Handling Arrays and Maps in JSON

Challenge: Working with arrays and maps in JSON data.

Example JSON Data:

{
"id": 1,
"tags": ["spark", "big data"],
"info": {"key": "value", "count": 10}
}

Follow me on LinkedIn – Shivakiran kotur


# Load JSON file
df1 =
spark.read.format("json").option("multiline","true").load("dbfs:/Fi
leStore/shared_uploads/[email protected]/complexjson2.json")
df1.display()

from pyspark.sql.functions import explode, col

# Handle arrays
tags_df = df1.withColumn("tag", explode(col("tags")))
tags_df.show(truncate=False)

# Handle maps
info_df = tags_df.select(
col("id"),
col("tag"),
col("info.key").alias("info_key"),
col("info.count").alias("info_count")
)
info_df.show(truncate=False)

Follow me on LinkedIn – Shivakiran kotur


4. Handle Malformed JSON in PySpark

To handle malformed JSON and capture corrupt records in PySpark, you can use the
PERMISSIVE mode when reading the JSON file. In this mode, Spark stores any corrupt or
invalid records in a special column called _corrupt_record. This is especially useful for
detecting and handling malformed JSON entries.

Here's an example of how to handle malformed JSON and store corrupt records in PySpark.

{"id": 1, "name": "Alice", "age": 30


{"id": 2, "name": "Bob", "age": 25}
{"id": 3, "name": "Charlie", "age": "thirty"}

from pyspark.sql import SparkSession


from pyspark.sql.functions import col

# Path to the malformed JSON file


json_file_path = "dbfs:/FileStore/shared_uploads/corruptjson-
1.json"

# Read the JSON file with permissive mode to handle malformed data
df_with_corrupt = spark.read.format("json") \
.option("mode", "PERMISSIVE") \
.load(json_file_path)

# Show the DataFrame including the corrupt record column


df_with_corrupt.show(truncate=False)

from pyspark.sql.functions import col

Follow me on LinkedIn – Shivakiran kotur


# Filter the rows where _corrupt_record is not null (i.e., the
corrupt records)
corrupt_records_df =
df_with_corrupt.filter(col("_corrupt_record").isNotNull())

# Show only the corrupt records


corrupt_records_df.show(truncate=False)

#filter correct records


# Filter the rows where _corrupt_record is not null (i.e., the
corrupt records)
correct_records_df =
df_with_corrupt.filter(col("_corrupt_record").isNull()).drop(col("_
corrupt_record"))

# Show only the corrupt records


correct_records_df.show(truncate=False)

5. Inferring Incorrect Schema

Challenge: PySpark may infer incorrect data types.


Example JSON Data:
{
"id": "1", # Should be Integer
"value": "100.5" # Should be Double
}

Follow me on LinkedIn – Shivakiran kotur


from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType

# Define schema
schema = StructType([
StructField("id", IntegerType(), True),
StructField("value", DoubleType(), True)
])

# Load JSON file with the defined schema


df = spark.read.schema(schema).json("path/to/file.json")

df.show(truncate=False)

6. Handling JSON with Missing or Null Fields


Challenge: Missing or null fields causing issues.

Example JSON Data:


{
"id": 1,
"name": null,
"age": 28
}

# Load JSON file


df = spark.read.json("path/to/file.json")

# Fill null values


df_filled = df.fillna({"name": "Unknown"})

# Drop rows with null values in specific fields


df_cleaned = df.dropna(subset=["name"])

df_filled.show(truncate=False)
df_cleaned.show(truncate=False)

Follow me on LinkedIn – Shivakiran kotur


7.Complex Nested Structures Impacting Performance

Challenge: Deeply nested structures affecting performance.


Example JSON Data:
{
"id": 1,
"items": [
{"item_id": 100, "details": {"price": 10.5}},
{"item_id": 101, "details": {"price": 20.75}}
]
}

Solution: Flatten the nested structure early and consider converting to Parquet.

from pyspark.sql.functions import explode, col

# Load JSON file


df = spark.read.json("path/to/file.json")

# Flatten nested structure


flattened_df = df.select(
col("id"),
explode(col("items")).alias("item")
).select(
"id",
col("item.item_id").alias("item_id"),
col("item.details.price").alias("price")
)

# Convert to Parquet for better performance


flattened_df.write.mode("overwrite").parquet("path/to/output_direct
ory")

flattened_df.show(truncate=False)

Follow me on LinkedIn – Shivakiran kotur

You might also like