# pipelines/

This template implements a modern data architecture using AWS services and Apache Iceberg, featuring a clean separation between ingestion and transformation layers.

## Architecture Overview

Data pipelines are built using a 2-layer architecture:

* **Ingestion Layer** (`ingest/`): Extracts data from external sources and loads it into Apache Iceberg tables
* **Transformation Layer** (`transform/`): Transforms raw data using dbt and AWS Athena into analytics-ready tables

### Project Structure

```
pipelines/
├── ingest/                    # Ingestion layer code
│   ├── {source}-ingestion/    # Source-specific ingestion code (Lambda)
│   └── {source}-schema/       # Schema definitions for landing tables
├── transform/                 # Transformation layer code (dbt project)
│   ├── models/                # dbt models
│   ├── sources/               # dbt source definitions
│   └── ...
└── *.tf                       # Terraform infrastructure definitions
```

### Data Flow Diagram

The following diagram illustrates how data flows through the system:

```mermaid
%%{init: {'theme':'dark', 'themeVariables': {'primaryColor': '#2a9d8f', 'primaryTextColor': '#fff', 'primaryBorderColor': '#219287', 'lineColor': '#f4a261', 'secondaryColor': '#e76f51', 'tertiaryColor': '#264653'}}}%%
graph LR
    subgraph "Ingest Layer"
        api[External Data<br/>APIs & Sources] -->|Extract Data| lambda[AWS Lambda<br/>with DLT]
        lambda -->|Write Files| s3raw[(S3 Raw<br/>Parquet Files)]
        s3raw -->|Add to Tables| icelandZone[(Iceberg<br/>Landing Zone)]
    end

    subgraph "Transform Layer"
        icelandZone -->|Source Tables| dbt[dbt Models<br/>via ECS Task]
        dbt -->|Transform Data| athena[AWS Athena<br/>Query Engine]
        athena -->|Write Results| icebergAnalytics[(Iceberg<br/>Analytics Zone)]
    end

    %% Connectors
    lambda -.->|Scheduled or<br/>Event-triggered| stepFunction[AWS Step<br/>Function]
    stepFunction -.->|Can Trigger| dbt

    %% Styling
    classDef default fill:#2a3d45,color:#fff,stroke:#333,stroke-width:1px;
    classDef source fill:#264653,color:#fff,stroke:#333,stroke-width:1px;
    classDef compute fill:#2a9d8f,color:#fff,stroke:#219287,stroke-width:1px;
    classDef storage fill:#e9c46a,color:#333,stroke:#e9b949,stroke-width:1px;
    classDef orchestration fill:#f4a261,color:#333,stroke:#f39c52,stroke-width:1px;

    class api source;
    class lambda,dbt,athena compute;
    class s3raw,icelandZone,icebergAnalytics storage;
    class stepFunction orchestration;
```

## Ingestion Layer

The ingestion layer extracts data from external sources and loads it into Apache Iceberg landing tables. It consists of three main components:

1. **Source-specific ingestion code** in `pipelines/ingest/{SOURCE_NAME}-ingestion/`
2. **Schema definitions** in `pipelines/ingest/{SOURCE_NAME}-schema/`
3. **Infrastructure as code** in Terraform files (`pipelines/*.tf`)

### Example: Chess.com Pipeline

This repository includes an example pipeline that ingests data from Chess.com:

```
pipelines/
├── ingest/
│   ├── chess-ingestion/
│   │   ├── lambda_handler.py      # Lambda code using DLT for Chess.com ingestion
│   │   ├── Dockerfile             # Container image definition
│   │   └── ...
│   ├── chess-schema/
│   │   ├── chess_players_games.py # Schema definition for players_games table
│   │   ├── chess_players.py       # Schema definition for players table
│   │   └── ...
├── chess_lambda.tf                # Terraform creating the Lambda function
├── ingestion_bucket.tf            # S3 bucket for landing zone
├── staging_bucket.tf              # S3 bucket for staging/analytics zone
└── ...
```

### Ingestion Process

The data ingestion process follows these steps:

1. **Extraction & Load**: A Lambda function uses [Data Load Tool (DLT)](https://dlthub.com/) to extract data from external sources and store it as Parquet files in S3
2. **Table Management**: The same Lambda then uses PyIceberg to add these files to Iceberg tables

{% hint style="info" %}
For detailed instructions on running and testing the Chess.com Lambda function, see the [chess-ingestion README](https://docs.boringdata.io/template-aws-iceberg/project-structure/pipelines/chess-ingestion).
{% endhint %}

### Landing Table Management

The landing tables are defined and managed through schema scripts in `pipelines/ingest/{SOURCE_NAME}-schema/`. These scripts are automatically executed during deployment to:

* Create new tables if they don't exist
* Update existing table schemas when needed
* Maintain table properties and metadata

When schema changes are required, you modify and redeploy these definition files.

{% hint style="info" %}
More details about schema evolution here: [#iceberg-landing-table-schema-evolution](https://docs.boringdata.io/template-aws-iceberg/help/faq#iceberg-landing-table-schema-evolution "mention")
{% endhint %}

## Transformation Layer

The transformation layer processes data from landing tables into analytics-ready formats using dbt. It consists of:

1. **dbt project** in `transform/`
2. **Infrastructure code** in `pipelines/*.tf` (especially `ecs_task_dbt.tf`)

{% hint style="info" %}
For details on developing and running dbt models, see the [transform README](https://docs.boringdata.io/template-aws-iceberg/project-structure/pipelines/transform).
{% endhint %}

## Infrastructure Overview

The following diagram shows the AWS infrastructure components and their relationships:

```mermaid
%%{init: {'theme':'dark', 'themeVariables': {'primaryColor': '#06d6a0', 'primaryTextColor': '#fff', 'primaryBorderColor': '#05c491', 'lineColor': '#ef476f', 'secondaryColor': '#118ab2', 'tertiaryColor': '#073b4c'}}}%%
graph TB
    %% INFRASTRUCTURE COMPONENTS
    subgraph "AWS Infrastructure"
        %% Data Storage
        subgraph "Data Storage"
            ingestion_bucket[(Ingestion<br/>S3 Bucket)]
            staging_bucket[(Staging<br/>S3 Bucket)]
            glue_catalog[AWS Glue<br/>Catalog]
        end

        %% Compute Resources
        subgraph "Compute Resources"
            chess_lambda[Chess Lambda<br/>Function]
            athena[AWS Athena]
            ecs_task[dbt ECS<br/>Task]
        end

        %% Container Registry
        subgraph "Container Registry"
            chess_ecr[Chess ECR<br/>Repository]
            dbt_ecr[dbt ECR<br/>Repository]
        end

        %% Supporting Services
        subgraph "Supporting Services"
            chess_secrets[Chess Secrets<br/>Manager]
            chess_step_function[Chess Step<br/>Function]
        end
    end

    %% CONFIGURATION
    subgraph "Configuration"
        env[Environment<br/>Variable]
        vpc_name[VPC Name<br/>Variable]
        ecs_name[ECS Cluster<br/>Variable]
    end

    %% DATA SOURCES
    subgraph "AWS Data Sources"
        vpc_data[VPC<br/>Data Source]
        subnets[Subnets<br/>Data Source]
        ecs_data[ECS Cluster<br/>Data Source]
        region[AWS Region]
        caller[AWS Caller<br/>Identity]
    end

    %% RELATIONSHIPS
    %% Configuration to Data Sources
    env --> |References| all
    vpc_name --> vpc_data
    ecs_name --> ecs_data

    %% Data Sources to Resources
    vpc_data --> subnets
    subnets --> ecs_task
    ecs_data --> ecs_task
    region --> all
    caller --> all

    %% Registry to Compute
    chess_ecr --> |Image Source| chess_lambda
    dbt_ecr --> |Image Source| ecs_task

    %% Orchestration Flow
    chess_secrets --> |Credentials| chess_lambda
    chess_lambda --> |Writes Data| ingestion_bucket
    ingestion_bucket --> |Glue Table Ref| glue_catalog
    staging_bucket --> |Glue Table Ref| glue_catalog
    glue_catalog --> |Table Metadata| athena
    athena --> |Query Execution| ecs_task
    chess_step_function --> |Triggers| chess_lambda
    chess_step_function --> |Can Trigger| ecs_task
    ecs_task --> |Writes Data| staging_bucket

    %% Styling
    classDef variables fill:#06d6a0,color:#fff,stroke:#05c491,stroke-width:1px;
    classDef datasources fill:#118ab2,color:#fff,stroke:#1179a1,stroke-width:1px;
    classDef compute fill:#ef476f,color:#fff,stroke:#de3660,stroke-width:1px;
    classDef storage fill:#ffd166,color:#073b4c,stroke:#ffcc57,stroke-width:1px;
    classDef services fill:#073b4c,color:#fff,stroke:#062a37,stroke-width:1px;
    classDef registry fill:#f78c6b,color:#073b4c,stroke:#f67b5c,stroke-width:1px;
    classDef grouping fill:none,stroke:#aaa,stroke-width:1px,color:#fff;

    class env,vpc_name,ecs_name variables;
    class vpc_data,subnets,ecs_data,region,caller datasources;
    class chess_lambda,athena,ecs_task compute;
    class ingestion_bucket,staging_bucket,glue_catalog storage;
    class chess_secrets,chess_step_function services;
    class chess_ecr,dbt_ecr registry;
    class "AWS Infrastructure","Configuration","AWS Data Sources","Data Storage","Compute Resources","Container Registry","Supporting Services" grouping;
```

### Module documentation

## Requirements

| Name                                | Version  |
| ----------------------------------- | -------- |
| [terraform](#requirement_terraform) | >=1.5.7  |
| [aws](#requirement_aws)             | >=5.63.1 |

## Providers

| Name                   | Version |
| ---------------------- | ------- |
| [aws](#provider_aws)   | 5.92.0  |
| [null](#provider_null) | 3.2.3   |

## Modules

| Name                                                                        | Source                                            | Version |
| --------------------------------------------------------------------------- | ------------------------------------------------- | ------- |
| [bucket\_ingestion\_read\_policies](#module_bucket_ingestion_read_policies) | terraform-aws-modules/iam/aws//modules/iam-policy | 5.39.1  |
| [bucket\_staging\_read\_policies](#module_bucket_staging_read_policies)     | terraform-aws-modules/iam/aws//modules/iam-policy | 5.39.1  |
| [chess\_ecr](#module_chess_ecr)                                             | terraform-aws-modules/ecr/aws                     | n/a     |
| [chess\_lambda\_function](#module_chess_lambda_function)                    | terraform-aws-modules/lambda/aws                  | 7.2.1   |
| [chess\_pipeline](#module_chess_pipeline)                                   | terraform-aws-modules/step-functions/aws          | 4.2.1   |
| [chess\_secrets](#module_chess_secrets)                                     | terraform-aws-modules/secrets-manager/aws         | 1.1.2   |
| [dbt\_ecr](#module_dbt_ecr)                                                 | terraform-aws-modules/ecr/aws                     | n/a     |
| [dbt\_task\_definition](#module_dbt_task_definition)                        | terraform-aws-modules/ssm-parameter/aws           | 1.1.1   |
| [ecs\_task\_definition\_dbt](#module_ecs_task_definition_dbt)               | terraform-aws-modules/ecs/aws///modules/service   | 5.11.2  |
| [ingestion\_bucket](#module_ingestion_bucket)                               | terraform-aws-modules/s3-bucket/aws               | 4.1.0   |
| [staging\_bucket](#module_staging_bucket)                                   | terraform-aws-modules/s3-bucket/aws               | 4.1.0   |

## Resources

| Name                                                                                                                                                                    | Type        |
| ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------- |
| [null\_resource.chess\_empty\_image](https://registry.terraform.io/providers/hashicorp/null/latest/docs/resources/resource)                                             | resource    |
| [aws\_caller\_identity.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/caller_identity)                                            | data source |
| [aws\_ecs\_cluster.ecs-cluster](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/ecs_cluster)                                             | data source |
| [aws\_iam\_policy\_document.bucket\_ingestion\_read\_write\_policy](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws\_iam\_policy\_document.bucket\_staging\_read\_write\_policy](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document)   | data source |
| [aws\_region.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/region)                                                               | data source |
| [aws\_subnets.subnets](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/subnets)                                                          | data source |
| [aws\_vpc.vpc](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/vpc)                                                                      | data source |

## Inputs

| Name                                          | Description                                                          | Type     | Default | Required |
| --------------------------------------------- | -------------------------------------------------------------------- | -------- | ------- | :------: |
| [ecs\_cluster\_name](#input_ecs_cluster_name) | The name of the ECS cluster                                          | `string` | `null`  |    no    |
| [environment](#input_environment)             | The environment to deploy to - will prefix the name of all resources | `string` | n/a     |    yes   |
| [vpc\_name](#input_vpc_name)                  | The name of the VPC to deploy the ECS cluster in                     | `string` | `null`  |    no    |

## Outputs

No outputs.
