Friday, November 28, 2025

Spark in ETL

 Spark in ETL

This blog provides an overview of using Apache Spark for Extract, Transform, Load (ETL) processes. It explores the benefits of Spark in ETL, common use cases, key components, and best practices for building efficient and scalable ETL pipelines. We will delve into how Spark's distributed processing capabilities and rich set of APIs can significantly improve the performance and manageability of data integration workflows.

Introduction to ETL and Spark

ETL (Extract, Transform, Load) is a critical process in data warehousing and business intelligence. It involves extracting data from various sources, transforming it into a consistent and usable format, and loading it into a target data warehouse or data lake for analysis and reporting. Traditional ETL tools often struggle to handle the increasing volume, velocity, and variety of data, leading to performance bottlenecks and scalability issues.



Apache Spark is a powerful open-source distributed processing engine designed for large-scale data processing. Its in-memory processing capabilities and support for various programming languages (Python, Scala, Java, R) make it an ideal choice for building high-performance ETL pipelines. Spark's ability to process data in parallel across a cluster of machines significantly reduces processing time and enables organizations to handle massive datasets efficiently.

Benefits of Using Spark for ETL

Using Spark for ETL offers several advantages over traditional ETL tools:

  • Performance: Spark's in-memory processing and distributed architecture enable it to process data much faster than traditional disk-based ETL tools. This is particularly beneficial for large datasets and complex transformations.

  • Scalability: Spark can easily scale to handle increasing data volumes by adding more nodes to the cluster. This ensures that the ETL pipeline can keep up with the growing data needs of the organization.

  • Flexibility: Spark supports a wide range of data sources and formats, including structured, semi-structured, and unstructured data. It also provides a rich set of APIs for data transformation, making it easy to implement complex business logic.

  • Cost-Effectiveness: Spark is an open-source tool, which eliminates the need for expensive commercial ETL software licenses. It can also be deployed on commodity hardware, further reducing costs.

  • Real-time Processing: Spark Streaming allows for real-time data ingestion and processing, enabling organizations to build ETL pipelines that can handle streaming data sources.

  • Fault Tolerance: Spark's resilient distributed dataset (RDD) abstraction provides fault tolerance, ensuring that the ETL pipeline can recover from failures without losing data.




Common Use Cases for Spark in ETL

Spark is well-suited for a variety of ETL use cases, including:

  • Data Warehousing: Building and maintaining data warehouses by extracting data from various sources, transforming it into a consistent format, and loading it into the data warehouse.



  • Data Migration: Migrating data from legacy systems to new platforms, such as cloud-based data warehouses or data lakes.

  • Data Cleansing: Cleaning and standardizing data by removing duplicates, correcting errors, and filling in missing values.

  • Data Enrichment: Enriching data by adding information from external sources, such as demographic data or market data.

  • Real-time Analytics: Processing streaming data in real-time to generate insights and trigger actions.

  • Log Processing: Analyzing log data to identify patterns, troubleshoot issues, and improve system performance.

  • Data Lake Ingestion: Ingesting raw data into a data lake for further processing and analysis.






Key Components of a Spark ETL Pipeline

A typical Spark ETL pipeline consists of the following components:

  • Data Sources: The sources from which data is extracted. These can include databases, files, APIs, and streaming data sources.

  • Data Extraction: The process of reading data from the data sources and loading it into Spark. Spark supports various data source connectors, such as JDBC, CSV, JSON, and Parquet.



  • Data Transformation: The process of transforming the data into a consistent and usable format. This can involve data cleansing, data enrichment, data aggregation, and data filtering. Spark provides a rich set of APIs for data transformation, including Spark SQL, DataFrames, and Datasets.

  • Data Loading: The process of writing the transformed data to the target data warehouse or data lake. Spark supports various data sink connectors, such as JDBC, CSV, JSON, Parquet, and cloud storage services.

  • Orchestration: The process of coordinating the execution of the ETL pipeline. This can be done using tools such as Apache Airflow, Apache NiFi, or cloud-based workflow services.

  • Monitoring: The process of monitoring the performance and health of the ETL pipeline. This can be done using tools such as Spark UI, Prometheus, and Grafana.





Best Practices for Building Efficient Spark ETL Pipelines

To build efficient and scalable Spark ETL pipelines, consider the following best practices:

  • Data Partitioning: Partition data based on relevant keys to improve parallelism and reduce data shuffling.

  • Data Serialization: Use efficient data serialization formats, such as Parquet or Avro, to reduce storage space and improve read/write performance.

  • Caching: Cache frequently accessed data in memory to reduce the need to read data from disk.

  • Broadcast Variables: Use broadcast variables to distribute small datasets to all nodes in the cluster.

  • User-Defined Functions (UDFs): Use UDFs sparingly, as they can be a performance bottleneck. If possible, use built-in Spark functions instead.

  • Optimize Joins: Optimize joins by using broadcast joins for small datasets and sort-merge joins for large datasets.

  • Monitor Performance: Monitor the performance of the ETL pipeline using Spark UI and other monitoring tools.

  • Use Appropriate Data Structures: Choose the appropriate data structures (RDDs, DataFrames, Datasets) based on the specific requirements of the ETL pipeline. DataFrames and Datasets offer better performance and optimization capabilities compared to RDDs.

  • Avoid Shuffling: Minimize data shuffling by optimizing the order of operations and using appropriate partitioning strategies. Shuffling is a costly operation that can significantly impact performance.

  • Proper Resource Allocation: Configure Spark's resource allocation parameters (e.g., number of executors, executor memory) to optimize performance and avoid resource contention.

  • Code Optimization: Write clean, efficient, and well-documented code. Use appropriate data structures and algorithms to minimize processing time.

  • Testing: Thoroughly test the ETL pipeline to ensure data quality and accuracy.





Example Spark ETL Pipeline (Python)

Here's a simple example of a Spark ETL pipeline written in Python using PySpark:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SimpleETL").getOrCreate()

df = spark.read.csv("input.csv", header=True, inferSchema=True)

df = df.withColumn("age_plus_one", df["age"] + 1)

df = df.filter(df["age"] > 18)

df.write.parquet("output.parquet")

spark.stop()


This example demonstrates a basic ETL pipeline that reads data from a CSV file, adds a new column, filters the data, and writes the transformed data to a Parquet file.

Conclusion

Spark is a powerful and versatile tool for building efficient and scalable ETL pipelines. Its in-memory processing capabilities, distributed architecture, and rich set of APIs make it an ideal choice for handling large datasets and complex transformations. By following best practices and leveraging Spark's features, organizations can significantly improve the performance and manageability of their data integration workflows. As data volumes continue to grow, Spark will play an increasingly important role in enabling organizations to extract value from their data.

Sunday, September 7, 2025

Purge last 30days logs from Directory using Shell

 echo -e "\nAuto purge log files older than 30 days.\n"

  list_logs=$(find /var/log/weekly_bkp_job_* -mtime +30 | wc -l)

  if [[ $list_logs > 0 ]]; then

    echo -e "$list_logs old log found\n"

     find /var/log/weekly_bkp_job_* -mtime +30 | xargs rm -v

  else

    echo -e "\nNo logs older than 30 days\n"

  fi

Number of Spark optimal executors, cores and memory configuration with basic cluster resources


                                                               Basic spark cluster resources shared

                                        3Nodes, [16 Cores, 48GB RAM] <----- Per Node

Subract 1Core and 1GB RAM for Operating System

3Nodes, [15 Cores, 47GB RAM] <----- Per Node

Cluster Resource:         Total Cores = 15cores X 3Nodes = 45Cores - 1 => 44 Cores

Total Memory = 47GB X 3Nodes    = 141 GB -  1 => 140GB RAM

[Basic thumb rule is to have optimal number of cores per executors are between 3 to 5]

Number of executers = Total cores/cores per executers = 44/4 = 11

Memory per executer = Total Memory/no. of executers   = 140/11 = ~12GB

Formula for Memory overhead = Max(384MB, 10% of Memory per executer)

Memory overhead = Max(384MB, 10% of ~12GB) = ~1.2GB

Actual memory per executer = 12 - ~1.2 GB = 11GB 

         spark-submit:   --number of executers = 11

                        --executer cores = 4

                        --executer memory = 11GB   

Note: RAM allocation for each partition for processing =  11GB / 4cores  = ~2.5GB 

Wednesday, August 13, 2025

Data Engineering role scenario based questions and responsibilities

 

🔧 

1. SQL & Data Modeling

  • What is normalization and denormalization?

  • How do you handle slowly changing dimensions (SCD Type 1/2/3)?

  • Write a SQL query to find the second highest salary from an employee table.

  • What is a CTE (Common Table Expression)? When would you use it?

  • Difference between INNER JOIN, LEFT JOIN, FULL OUTER JOIN.


🏗️ 

2. Data Warehousing

  • What is a star schema vs snowflake schema?

  • Explain ETL vs ELT – which one do you prefer and why?

  • What are fact tables and dimension tables?

  • How do you handle late arriving facts?

  • What are OLTP and OLAP?


🛠️ 

3. Big Data & Distributed Systems

  • What is Hadoop? How does HDFS work?

  • Explain the MapReduce paradigm.

  • What is partitioning and bucketing in Hive?

  • Difference between Hive and Presto (or Athena)?

  • How do you handle large datasets that don’t fit in memory?


🔄 

4. Data Pipelines & Orchestration

  • What tools have you used for data orchestration? (e.g., Airflow, AWS Glue, Step Functions)

  • How would you design a pipeline to process real-time data?

  • How do you handle failures in ETL pipelines?

  • How do you monitor pipeline health and performance?


☁️ 

5. Cloud Services (AWS/Azure/GCP)

  • What is the difference between S3 and EBS?

  • How does AWS Glue work? Glue Job vs Glue Crawler?

  • How would you implement CI/CD for AWS Glue or Redshift?

  • What is Redshift Spectrum?

  • Compare AWS Redshift vs Snowflake.


⚡ 

6. Programming & Data Transformation

  • What’s the difference between Pandas and PySpark?

  • How do you handle null values in Spark?

  • What are RDDs vs DataFrames in PySpark?

  • How do you optimize PySpark jobs?

  • Explain broadcast joins and when to use them.


🔐 

7. Data Governance & Quality

  • How do you ensure data quality in pipelines?

  • What tools have you used for data cataloging and lineage?

  • How do you handle schema evolution?

  • What is GDPR and how does it affect data engineering?


📊 

8. Scenario-Based Questions

  • How would you migrate an on-prem ETL pipeline to the cloud?

  • Design a data warehouse for a ride-sharing app (like Uber).

  • You need to deduplicate billions of rows from a clickstream – how would you approach it?

  • How do you handle schema changes in production pipelines?

Big Data SQL challenging Questions with Solutions

 

1.⁠ ⁠Write a SQL query to get the daily count of active users (logged in at least once).

>SELECT
DATE(login_time) AS login_date,
COUNT(DISTINCT user_id) AS active_user_count
FROM
user_logins
GROUP BY
DATE(login_time)
ORDER BY
login_date;

2.⁠ ⁠Find the 2nd highest transaction per user without using LIMIT or TOP.

>SELECT
t1.user_id,
t1.transaction_amount AS second_highest_transaction
FROM
transactions t1
WHERE
2 = (
SELECT COUNT(DISTINCT t2.transaction_amount)
FROM transactions t2
WHERE t2.user_id = t1.user_id
AND t2.transaction_amount >= t1.transaction_amount
);


3.⁠ ⁠Identify data gaps in time-series event logs (e.g., missing hourly records).

>WITH expected_hours AS (
SELECT generate_series(
(SELECT MIN(date_trunc('hour', event_time)) FROM event_logs),
(SELECT MAX(date_trunc('hour', event_time)) FROM event_logs),
INTERVAL '1 hour'
) AS expected_time
)
SELECT
expected_time
FROM
expected_hours e
LEFT JOIN (
SELECT DISTINCT date_trunc('hour', event_time) AS logged_hour
FROM event_logs
) a ON e.expected_time = a.logged_hour
WHERE
a.logged_hour IS NULL
ORDER BY
expected_time;

4.⁠ ⁠Fetch the first purchase date per user and calculate days since then.

>SELECT
user_id,
MIN(purchase_date) AS first_purchase_date,
CURRENT_DATE - MIN(purchase_date) AS days_since_first_purchase
FROM
purchases
GROUP BY
user_id;


5.⁠ ⁠Detect schema changes in SCD Type 2 tables using Delta Lake.


6.⁠ ⁠Join product and transaction tables and filter out null foreign keys safely.

>SELECT 
t.transaction_id,
t.product_id,
t.transaction_date,
p.product_name
FROM
transaction t
JOIN
product p ON t.product_id = p.product_id
WHERE
t.product_id IS NOT NULL;



7.⁠ ⁠Get users who upgraded to premium within 7 days of signup.

> SELECT 
user_id,
signup_date,
premium_upgrade_date
FROM
users
WHERE
premium_upgrade_date IS NOT NULL
AND premium_upgrade_date <= signup_date + INTERVAL '7 days';


8.⁠ ⁠Calculate cumulative distinct product purchases per customer.

> WITH unique_products AS (
SELECT DISTINCT customer_id, product_id, purchase_date
FROM purchases
),
ranked AS (
SELECT
customer_id,
product_id,
purchase_date,
ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY purchase_date) AS rn
FROM unique_products
),
cumulative AS (
SELECT
r1.customer_id,
r1.purchase_date,
r1.product_id,
(
SELECT COUNT(DISTINCT r2.product_id)
FROM ranked r2
WHERE r2.customer_id = r1.customer_id AND r2.purchase_date <= r1.purchase_date
) AS cumulative_distinct_products
FROM ranked r1
)
SELECT * FROM cumulative
ORDER BY customer_id, purchase_date;


9.⁠ ⁠Retrieve customers who spent above average in their region.

> WITH customer_spend AS (
SELECT
customer_id,
region,
SUM(amount) AS total_spend
FROM
transactions
GROUP BY
customer_id, region
),
region_avg AS (
SELECT
region,
AVG(total_spend) AS avg_region_spend
FROM
customer_spend
GROUP BY region
)
SELECT
c.customer_id,
c.region,
c.total_spend
FROM
customer_spend c
JOIN
region_avg r ON c.region = r.region
WHERE
c.total_spend > r.avg_region_spend;


10.⁠ ⁠Find duplicate rows in an ingestion table (based on all columns).

>SELECT 
id, name, email, created_at,
COUNT(*) AS duplicate_count
FROM
ingestion_table
GROUP BY
id, name, email, created_at
HAVING
COUNT(*) > 1;

OR

> SELECT *
FROM ingestion_table
WHERE (id, name, email, created_at) IN (
SELECT
id, name, email, created_at
FROM ingestion_table
GROUP BY id, name, email, created_at
HAVING COUNT(*) > 1
);

11.⁠ ⁠Compute daily revenue growth % using lag window function.

>SELECT
revenue_date,
total_revenue,
LAG(total_revenue) OVER (ORDER BY revenue_date) AS previous_day_revenue,
ROUND(
(total_revenue - LAG(total_revenue) OVER (ORDER BY revenue_date))
/ NULLIF(LAG(total_revenue) OVER (ORDER BY revenue_date), 0) * 100,
2
) AS revenue_growth_percent
FROM
daily_revenue
ORDER BY
revenue_date;


12.⁠ ⁠Identify products with declining sales 3 months in a row.

>WITH sales_with_lags AS (
SELECT
product_id,
sale_month,
monthly_sales,
LAG(monthly_sales, 1) OVER (PARTITION BY product_id ORDER BY sale_month) AS prev_month_sales,
LAG(monthly_sales, 2) OVER (PARTITION BY product_id ORDER BY sale_month) AS prev_2_month_sales
FROM
monthly_sales
),
declining_products AS (
SELECT
product_id,
sale_month,
monthly_sales,
prev_month_sales,
prev_2_month_sales
FROM
sales_with_lags
WHERE
monthly_sales < prev_month_sales
AND prev_month_sales < prev_2_month_sales
)
SELECT DISTINCT product_id
FROM declining_products;


13.⁠ ⁠Get users with at least 3 logins per week over last 2 months.

> WITH login_data AS (
SELECT
user_id,
DATE_TRUNC('week', login_time) AS week_start
FROM
user_logins
WHERE
login_time >= CURRENT_DATE - INTERVAL '2 months'
),
weekly_login_counts AS (
SELECT
user_id,
week_start,
COUNT(*) AS weekly_logins
FROM
login_data
GROUP BY
user_id, week_start
),
active_weeks AS (
SELECT
user_id,
COUNT(*) AS weeks_with_3_or_more_logins
FROM
weekly_login_counts
WHERE
weekly_logins >= 3
GROUP BY
user_id
),
weeks_total AS (
SELECT
COUNT(DISTINCT DATE_TRUNC('week', login_time)) AS total_weeks
FROM
user_logins
WHERE
login_time >= CURRENT_DATE - INTERVAL '2 months'
)
SELECT
a.user_id
FROM
active_weeks a
CROSS JOIN
weeks_total w
WHERE
a.weeks_with_3_or_more_logins = w.total_weeks;


14.⁠ ⁠Rank users by frequency of login in the current quarter.

WITH current_quarter_logins AS (
SELECT
user_id
FROM
user_logins
WHERE
DATE_TRUNC('quarter', login_time) = DATE_TRUNC('quarter', CURRENT_DATE)
),
login_counts AS (
SELECT
user_id,
COUNT(*) AS login_count
FROM
current_quarter_logins
GROUP BY
user_id
),
ranked_users AS (
SELECT
user_id,
login_count,
RANK() OVER (ORDER BY login_count DESC) AS login_rank
FROM
login_counts
)
SELECT * FROM ranked_users
ORDER BY login_rank;


15.⁠ ⁠Fetch users who purchased same product multiple times in one day.

SELECT
user_id,
product_id,
DATE(purchase_time) AS purchase_date,
COUNT(*) AS purchase_count
FROM
purchases
GROUP BY
user_id,
product_id,
DATE(purchase_time)
HAVING
COUNT(*) > 1
ORDER BY
user_id, purchase_date;


16.⁠ ⁠Detect and delete late-arriving data for current month partitions.

> SELECT *
FROM events
WHERE
partition_month = DATE_TRUNC('month', CURRENT_DATE)
AND DATE_TRUNC('month', event_time) <> partition_month;

>DELETE FROM events
WHERE
partition_month = DATE_TRUNC('month', CURRENT_DATE)
AND DATE_TRUNC('month', event_time) <> partition_month;

17.⁠ ⁠Get top 5 products by profit margin across all categories.

> SELECT 
product_id,
category_id,
selling_price,
cost_price,
ROUND((selling_price - cost_price) / selling_price * 100, 2) AS profit_margin_percent
FROM
products
WHERE
selling_price > 0 -- to avoid divide-by-zero
ORDER BY
profit_margin_percent DESC
LIMIT 5;

18.⁠ ⁠Compare rolling 30-day revenue vs previous 30-day window.

> WITH daily_revenue AS (
SELECT
sale_date::date AS day,
SUM(revenue) AS daily_revenue
FROM
sales
GROUP BY
sale_date::date
),
rolling_30 AS (
SELECT
day,
SUM(daily_rerevenue) OVER (
ORDER BY day
ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
) AS rolling_30_day_revenue
FROM
daily_revenue
),
comparison AS (
SELECT
day,
rolling_30_day_revenue,
LAG(rolling_30_day_revenue) OVER (ORDER BY day) AS previous_30_day_revenue
FROM
rolling_30
)
SELECT
day,
rolling_30_day_revenue,
previous_30_day_revenue,
ROUND(
(rolling_30_day_revenue - previous_30_day_revenue) /
NULLIF(previous_30_day_revenue, 0) * 100, 2
) AS revenue_change_percent
FROM
comparison
WHERE
previous_30_day_revenue IS NOT NULL
ORDER BY
day;

19.⁠ ⁠Flag transactions happening outside business hours.

>SELECT 
transaction_id,
transaction_time,
CASE
WHEN CAST(transaction_time AS TIME) < TIME '09:00:00'
OR CAST(transaction_time AS TIME) > TIME '18:00:00'
THEN 'Outside Business Hours'
ELSE 'Within Business Hours'
END AS business_hours_flag
FROM
transactions;

20.⁠ ⁠Write an optimized SQL query using broadcast join hints for small lookup tables.

> SELECT /*+ BROADCAST(lkp) */ 
f.order_id,
f.customer_id,
lkp.region,
SUM(f.amount) AS revenue
FROM fact_orders f
JOIN small_customer_lookup lkp
ON f.customer_id = lkp.customer_id
WHERE f.order_date BETWEEN '2025-01-01' AND '2025-06-30'
GROUP BY f.order_id, f.customer_id, lkp.region;