Orchestrating and Observing Data Pipelines: A Guide to Apache Airflow, PostgreSQL, and Polar
airflow
backend
python
Building reliable data pipelines is a foundational task in modern software engineering, but ensuring they run efficiently and identifying performance bottlenecks can be a significant challenge. Orchestration tools solve part of the problem, but true operational excellence requires deep visibility into resource consumption.
This tutorial walks you through building a complete, observable data pipeline ecosystem using a powerful, open-source stack:
- Apache Airflow: For robust workflow orchestration.
- PostgreSQL: As our reliable data store.
- Polar: For continuous, low-overhead performance profiling.
- Docker: To containerize and link all our services seamlessly.
By the end, you will have a running pipeline and the tools to monitor its performance characteristics in real-time.
Prerequisites
Before we begin, ensure you have the following installed on your system:
A Scalable Project Structure
A clean project structure is crucial for maintainability. We’ll use a standard, scalable layout for our Airflow project.
.
├── dags/
│ └── simple_etl_dag.py
├── docker-compose.yml
└── .env
dags/: This directory will hold all our Airflow DAG (Directed Acyclic Graph) files. Airflow automatically discovers and loads any DAGs placed here.docker-compose.yml: This file is the heart of our infrastructure, defining and connecting all our services..env: This file will store environment variables, helping us keep configuration separate from our code.
Orchestrating Services with Docker Compose
Our docker-compose.yml file will define and configure all the necessary services: a PostgreSQL database, the Airflow components (scheduler, webserver, worker), and the Polar agent for profiling.
First, create a .env file to specify the Airflow user and UID, which helps in avoiding permission issues.
# .env
AIRFLOW_UID=50000
Now, let’s create the docker-compose.yml file. We are using the official Airflow Docker Compose file as a base and extending it with Polar.
# docker-compose.yml
version: '3'
services:
postgres:
image: postgres:13
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
ports:
- "5432:5432"
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
airflow-webserver:
image: apache/airflow:2.8.1
depends_on:
- postgres
environment:
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
- AIRFLOW__CORE__LOAD_EXAMPLES=false
volumes:
- ./dags:/opt/airflow/dags
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "curl --fail http://localhost:8080/health"]
interval: 30s
timeout: 30s
retries: 3
# Add other Airflow services like scheduler and worker as needed
# For simplicity, we'll focus on the webserver here.
polar-agent:
image: polar-agent:latest # Replace with the actual Polar agent image
command:
- "agent"
- "--config-file=/etc/polar/agent.yaml"
volumes:
- ./polar-agent-config.yaml:/etc/polar/agent.yaml
depends_on:
- airflow-webserver
# You would typically attach the Polar profiler to your application containers
# This is a simplified representation. Refer to Polar documentation for specifics.
Note: The Polar integration shown is conceptual. Please refer to the official Polar documentation for the correct way to attach profilers to your running services, which often involves a sidecar container or an agent running on the host.
Creating Your First Airflow DAG
Now for the fun part. Let’s create a simple ETL pipeline. This DAG will have two tasks: one to create a table in our PostgreSQL database and another to insert some data into it.
Create the file dags/simple_etl_dag.py:
# dags/simple_etl_dag.py
from airflow.decorators import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from pendulum import datetime
@dag(
dag_id="simple_postgres_etl",
start_date=datetime(2025, 1, 1),
schedule=None,
catchup=False,
tags=["etl", "postgres"],
)
def simple_postgres_etl():
@task
def create_customers_table():
"""Creates a customers table if it doesn't exist."""
pg_hook = PostgresHook(postgres_conn_id="postgres_default")
sql = """
CREATE TABLE IF NOT EXISTS customers (
customer_id SERIAL PRIMARY KEY,
name VARCHAR NOT NULL,
signup_date DATE
);
"""
pg_hook.run(sql)
@task
def insert_new_customer():
"""Inserts a new customer record into the customers table."""
pg_hook = PostgresHook(postgres_conn_id="postgres_default")
sql = """
INSERT INTO customers (name, signup_date)
VALUES ('John Doe', '2025-09-26');
"""
pg_hook.run(sql)
create_customers_table() >> insert_new_customer()
simple_postgres_etl()
This DAG uses the TaskFlow API for a clean, functional approach. Airflow’s default PostgreSQL connection (postgres_default) will automatically work with the postgres service defined in our docker-compose.yml.
Observing with Polar
Once your services are running (docker-compose up), and you’ve triggered the DAG from the Airflow UI (available at http://localhost:8080), the Polar agent will begin profiling the resource consumption of the Airflow components.
Navigate to your Polar UI. You will be able to:
- Filter by Service: Isolate the
airflow-webserverorairflow-schedulerto see their specific performance profiles. - Analyze CPU and Memory Usage: See exactly how much CPU and memory each process used during the DAG run.
- Identify Bottlenecks: If a particular task is resource-intensive, it will be immediately visible in the profiling data, allowing you to optimize your code or provision resources more effectively.
Conclusion
You have successfully built a modern, observable data pipeline. This setup provides a robust foundation for developing complex workflows in Airflow while maintaining clear visibility into their performance with Polar. By integrating orchestration with continuous profiling, you move from reactive problem-solving to proactive optimization, ensuring your data pipelines are not just reliable but also efficient.
Latest Posts
Hoppscotch: The Modern, Lightweight Alternative to Postman and Bruno
A deep dive into Hoppscotch, the open-source API client, and why it might be the perfect replacement for Postman and Bruno in your workflow.
Mastering Python Monorepos: A Practical Guide
A comprehensive guide to building, managing, and scaling Python projects with a monorepo architecture, covering shared packages, FastAPI, Airflow, and modern tooling like Ruff.
Demystifying Retrieval-Augmented Generation (RAG)
An introduction to Retrieval-Augmented Generation (RAG), explaining what it is, why it's needed, how it works, and when to use it for building smarter AI applications.
Enjoyed this article? Follow me on X for more content and updates!
Follow @Ctrixdev