Pyspark Questions & Scenario Based
Pyspark Questions & Scenario Based
5. What is a SparkContext?
6. What is an RDD?
# Create an RDD
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
# Create an RDD
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
counts = words.count()
print("Count is : {}".format(counts))
8. What is a DataFrame?
# PySpark DataFrame
df = spark.createDataFrame([
('Lorem', date(2000, 8, 1)),
9. What is a Dataset?
# Create Dataframe
df = spark.range(1)
# Cache in memory
df.cache()
Out: [1, 2, 3, 4, 5]
Repartition
Coalesce
GroupByKey
ReduceByKey
Cogroup
Join
Both cache() and persist() functions are used for persisting a RDD
in memory across operations. The key difference
between persist() and cache() is that in persist() you can specify
the storage level. Where as in cache(), default strategy is used for
persisting. The default storage strategy is MEMORY_ONLY.
# Initialize SparkSession
spark = SparkSession.builder.appName("InterviewQuestions").getOrCreate()
df.show(5)
average_price_df.show()
df = df.fillna(0, subset=["quantity"])
df.show()
daily_revenue = df.groupBy("transaction_date").agg(F.sum("quantity" *
"price").alias("total_revenue"))
daily_revenue.show()
window_spec = Window.partitionBy("product_name").orderBy(F.desc("quantity"))
df = df.withColumn("rank", F.rank().over(window_spec))
top_products.show()
joined_df.select("customer_name", "order_total").show()
# User-Defined Function
def calculate_avg_length(text):
sentences = text.split(".")
return int(avg_length)
# Register UDF
df = df.withColumn("avg_length", avg_length_udf("text"))
df.show()
sampled_df.show()
# Data Validation
invalid_data.show()
cross_joined_df = df1.crossJoin(df2)
cross_joined_df.show()
deduplicated_df = df.dropDuplicates()
deduplicated_df.show()
summary_stats.show()
iqr = q3 - q1
df.show()
df = assembler.transform(df)
lr = LogisticRegression(labelCol="target", featuresCol="features")
model = lr.fit(train_data)
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="target")
auc = evaluator.evaluate(predictions)
print("AUC:", auc)
df = df.withColumn("city", df["address.city"])
df = df.withColumn("zipcode", df["address.zipcode"])
df.show()
df = assembler.transform(df)
lr = LogisticRegression(labelCol="target", featuresCol="features")
model = lr.fit(train_data)
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="target")
auc = evaluator.evaluate(predictions)
print("AUC:", auc)
df = tokenizer.transform(df)
df = remover.transform(df)
cv = CountVectorizer(inputCol="filtered_words", outputCol="raw_features")
model = cv.fit(df)
df = model.transform(df)
idf_model = idf.fit(df)
df = idf_model.transform(df)
# Split data into training and testing sets
nb = NaiveBayes(labelCol="target", featuresCol="features")
model = nb.fit(train_data)
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="target")
auc = evaluator.evaluate(predictions)
print("AUC:", auc)
df.write.parquet("data.parquet")
df.show()
broadcast_values = spark.sparkContext.broadcast(filter_values)
filtered_df = df.filter(df["column_name"].isin(broadcast_values.value))
filtered_df.show()
df = spark.read.json("data.json")
df = df.select("outer_field.inner_field1", "outer_field.inner_field2")
df.show()
df = assembler.transform(df)
# Initialize classifiers
lr = LogisticRegression(labelCol="label", featuresCol="features")
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
param_grid_lr = ParamGridBuilder().build()
param_grid_rf = ParamGridBuilder().build()
param_grid_gbt = ParamGridBuilder().build()
# Initialize evaluator
evaluator = BinaryClassificationEvaluator()
# Perform k-fold cross-validation for each classifier
cv_lr_model = cv_lr.fit(df)
cv_rf_model = cv_rf.fit(df)
cv_gbt_model = cv_gbt.fit(df)
best_lr_model = cv_lr_model.bestModel
best_rf_model = cv_rf_model.bestModel
best_gbt_model = cv_gbt_model.bestModel