pipelines/

Contents of the pipelines/ folder

This template implements a modern data architecture using AWS services and Apache Iceberg, featuring a clean separation between ingestion and transformation layers.

Architecture Overview

Data pipelines are built using a 2-layer architecture:

  • Ingestion Layer (ingest/): Extracts data from external sources and loads it into Apache Iceberg tables

  • Transformation Layer (transform/): Transforms raw data using dbt and AWS Athena into analytics-ready tables

Project Structure

pipelines/
├── ingest/                    # Ingestion layer code
│   ├── {source}-ingestion/    # Source-specific ingestion code (Lambda)
│   └── {source}-schema/       # Schema definitions for landing tables
├── transform/                 # Transformation layer code (dbt project)
│   ├── models/                # dbt models
│   ├── sources/               # dbt source definitions
│   └── ...
└── *.tf                       # Terraform infrastructure definitions

Data Flow Diagram

The following diagram illustrates how data flows through the system:

%%{init: {'theme':'dark', 'themeVariables': {'primaryColor': '#2a9d8f', 'primaryTextColor': '#fff', 'primaryBorderColor': '#219287', 'lineColor': '#f4a261', 'secondaryColor': '#e76f51', 'tertiaryColor': '#264653'}}}%%
graph LR
    subgraph "Ingest Layer"
        api[External Data<br/>APIs & Sources] -->|Extract Data| lambda[AWS Lambda<br/>with DLT]
        lambda -->|Write Files| s3raw[(S3 Raw<br/>Parquet Files)]
        s3raw -->|Add to Tables| icelandZone[(Iceberg<br/>Landing Zone)]
    end

    subgraph "Transform Layer"
        icelandZone -->|Source Tables| dbt[dbt Models<br/>via ECS Task]
        dbt -->|Transform Data| athena[AWS Athena<br/>Query Engine]
        athena -->|Write Results| icebergAnalytics[(Iceberg<br/>Analytics Zone)]
    end

    %% Connectors
    lambda -.->|Scheduled or<br/>Event-triggered| stepFunction[AWS Step<br/>Function]
    stepFunction -.->|Can Trigger| dbt

    %% Styling
    classDef default fill:#2a3d45,color:#fff,stroke:#333,stroke-width:1px;
    classDef source fill:#264653,color:#fff,stroke:#333,stroke-width:1px;
    classDef compute fill:#2a9d8f,color:#fff,stroke:#219287,stroke-width:1px;
    classDef storage fill:#e9c46a,color:#333,stroke:#e9b949,stroke-width:1px;
    classDef orchestration fill:#f4a261,color:#333,stroke:#f39c52,stroke-width:1px;

    class api source;
    class lambda,dbt,athena compute;
    class s3raw,icelandZone,icebergAnalytics storage;
    class stepFunction orchestration;

Ingestion Layer

The ingestion layer extracts data from external sources and loads it into Apache Iceberg landing tables. It consists of three main components:

  1. Source-specific ingestion code in pipelines/ingest/{SOURCE_NAME}-ingestion/

  2. Schema definitions in pipelines/ingest/{SOURCE_NAME}-schema/

  3. Infrastructure as code in Terraform files (pipelines/*.tf)

Example: Chess.com Pipeline

This repository includes an example pipeline that ingests data from Chess.com:

pipelines/
├── ingest/
│   ├── chess-ingestion/
│   │   ├── lambda_handler.py      # Lambda code using DLT for Chess.com ingestion
│   │   ├── Dockerfile             # Container image definition
│   │   └── ...
│   ├── chess-schema/
│   │   ├── chess_players_games.py # Schema definition for players_games table
│   │   ├── chess_players.py       # Schema definition for players table
│   │   └── ...
├── chess_lambda.tf                # Terraform creating the Lambda function
├── ingestion_bucket.tf            # S3 bucket for landing zone
├── staging_bucket.tf              # S3 bucket for staging/analytics zone
└── ...

Ingestion Process

The data ingestion process follows these steps:

  1. Extraction & Load: A Lambda function uses Data Load Tool (DLT) to extract data from external sources and store it as Parquet files in S3

  2. Table Management: The same Lambda then uses PyIceberg to add these files to Iceberg tables

For detailed instructions on running and testing the Chess.com Lambda function, see the chess-ingestion README.

Landing Table Management

The landing tables are defined and managed through schema scripts in pipelines/ingest/{SOURCE_NAME}-schema/. These scripts are automatically executed during deployment to:

  • Create new tables if they don't exist

  • Update existing table schemas when needed

  • Maintain table properties and metadata

When schema changes are required, you modify and redeploy these definition files.

More details about schema evolution here: Iceberg Landing Table Schema Evolution

Transformation Layer

The transformation layer processes data from landing tables into analytics-ready formats using dbt. It consists of:

  1. dbt project in transform/

  2. Infrastructure code in pipelines/*.tf (especially ecs_task_dbt.tf)

For details on developing and running dbt models, see the transform README.

Infrastructure Overview

The following diagram shows the AWS infrastructure components and their relationships:

%%{init: {'theme':'dark', 'themeVariables': {'primaryColor': '#06d6a0', 'primaryTextColor': '#fff', 'primaryBorderColor': '#05c491', 'lineColor': '#ef476f', 'secondaryColor': '#118ab2', 'tertiaryColor': '#073b4c'}}}%%
graph TB
    %% INFRASTRUCTURE COMPONENTS
    subgraph "AWS Infrastructure"
        %% Data Storage
        subgraph "Data Storage"
            ingestion_bucket[(Ingestion<br/>S3 Bucket)]
            staging_bucket[(Staging<br/>S3 Bucket)]
            glue_catalog[AWS Glue<br/>Catalog]
        end

        %% Compute Resources
        subgraph "Compute Resources"
            chess_lambda[Chess Lambda<br/>Function]
            athena[AWS Athena]
            ecs_task[dbt ECS<br/>Task]
        end

        %% Container Registry
        subgraph "Container Registry"
            chess_ecr[Chess ECR<br/>Repository]
            dbt_ecr[dbt ECR<br/>Repository]
        end

        %% Supporting Services
        subgraph "Supporting Services"
            chess_secrets[Chess Secrets<br/>Manager]
            chess_step_function[Chess Step<br/>Function]
        end
    end

    %% CONFIGURATION
    subgraph "Configuration"
        env[Environment<br/>Variable]
        vpc_name[VPC Name<br/>Variable]
        ecs_name[ECS Cluster<br/>Variable]
    end

    %% DATA SOURCES
    subgraph "AWS Data Sources"
        vpc_data[VPC<br/>Data Source]
        subnets[Subnets<br/>Data Source]
        ecs_data[ECS Cluster<br/>Data Source]
        region[AWS Region]
        caller[AWS Caller<br/>Identity]
    end

    %% RELATIONSHIPS
    %% Configuration to Data Sources
    env --> |References| all
    vpc_name --> vpc_data
    ecs_name --> ecs_data

    %% Data Sources to Resources
    vpc_data --> subnets
    subnets --> ecs_task
    ecs_data --> ecs_task
    region --> all
    caller --> all

    %% Registry to Compute
    chess_ecr --> |Image Source| chess_lambda
    dbt_ecr --> |Image Source| ecs_task

    %% Orchestration Flow
    chess_secrets --> |Credentials| chess_lambda
    chess_lambda --> |Writes Data| ingestion_bucket
    ingestion_bucket --> |Glue Table Ref| glue_catalog
    staging_bucket --> |Glue Table Ref| glue_catalog
    glue_catalog --> |Table Metadata| athena
    athena --> |Query Execution| ecs_task
    chess_step_function --> |Triggers| chess_lambda
    chess_step_function --> |Can Trigger| ecs_task
    ecs_task --> |Writes Data| staging_bucket

    %% Styling
    classDef variables fill:#06d6a0,color:#fff,stroke:#05c491,stroke-width:1px;
    classDef datasources fill:#118ab2,color:#fff,stroke:#1179a1,stroke-width:1px;
    classDef compute fill:#ef476f,color:#fff,stroke:#de3660,stroke-width:1px;
    classDef storage fill:#ffd166,color:#073b4c,stroke:#ffcc57,stroke-width:1px;
    classDef services fill:#073b4c,color:#fff,stroke:#062a37,stroke-width:1px;
    classDef registry fill:#f78c6b,color:#073b4c,stroke:#f67b5c,stroke-width:1px;
    classDef grouping fill:none,stroke:#aaa,stroke-width:1px,color:#fff;

    class env,vpc_name,ecs_name variables;
    class vpc_data,subnets,ecs_data,region,caller datasources;
    class chess_lambda,athena,ecs_task compute;
    class ingestion_bucket,staging_bucket,glue_catalog storage;
    class chess_secrets,chess_step_function services;
    class chess_ecr,dbt_ecr registry;
    class "AWS Infrastructure","Configuration","AWS Data Sources","Data Storage","Compute Resources","Container Registry","Supporting Services" grouping;

Module documentation

Requirements

Name
Version

>=1.5.7

>=5.63.1

Providers

Name
Version

5.92.0

3.2.3

Modules

Name
Source
Version

terraform-aws-modules/iam/aws//modules/iam-policy

5.39.1

terraform-aws-modules/iam/aws//modules/iam-policy

5.39.1

terraform-aws-modules/ecr/aws

n/a

terraform-aws-modules/lambda/aws

7.2.1

terraform-aws-modules/step-functions/aws

4.2.1

terraform-aws-modules/secrets-manager/aws

1.1.2

terraform-aws-modules/ecr/aws

n/a

terraform-aws-modules/ssm-parameter/aws

1.1.1

terraform-aws-modules/ecs/aws///modules/service

5.11.2

terraform-aws-modules/s3-bucket/aws

4.1.0

terraform-aws-modules/s3-bucket/aws

4.1.0

Resources

Inputs

Name
Description
Type
Default
Required

The name of the ECS cluster

string

null

no

The environment to deploy to - will prefix the name of all resources

string

n/a

yes

The name of the VPC to deploy the ECS cluster in

string

null

no

Outputs

No outputs.

Last updated