# pipelines/

Data Pipelines are built using a 2-layer architecture:

* `ingest/` for data ingestion from source to S3
* `transform/` for data transformation in Snowflake via dbt

For each layer, the application code is in a separate folder while the underlying infrastructure is defined in terraform files in pipelines/.

```
pipelines/
├── ingest/
├── transform/
├── *.tf
```

A typical data flow looks like this:

{% @mermaid/diagram content="%%{init: {'theme':'dark'}}%%
graph LR
subgraph Ingest Layer
Source --> Lambda --> S3
end

```
subgraph Transform Layer
    S3 --> Snowpipes --> Snowflake+dbt
end

%% Styling
classDef default fill:#f9f9f9,stroke:#333,stroke-width:2px;
classDef layer fill:#e1f7d5,stroke:#333,stroke-width:2px;

class Ingest,Transform layer;" %}
```

## Ingestion Layer

The ingest layer is composed of three artifacts:

* the ingestion code in `pipelines/ingest/{SOURCE_NAME}-ingestion/`
* the data source schema in `ingest/{SOURCE_NAME}_source_schema.yml`
* the infrastructure code (terraform) in `pipelines/*.tf`

Let's take the example of the chess.com pipeline example provided in this repo:

```
pipelines/
├── ingest/
│   ├── chess-lambda/
│   │   ├── lambda_handler.py     # Lambda code embedding DLT for Chess.com ingestion
│   │   └── ...
│   └── chess_source_schema.yml.  # YAML file defining the Chess.com data schema
├── chess_lambda.tf               # Terraform creating the lambda function
├── ingestion_bucket.tf           # Terraform creating target S3 bucket
...
```

The ingestion is done in a lambda function embedding dlt with:

* Source code in `pipelines/ingest/chess-ingestion`
* Terraform in`pipelines/chess_lambda.tf`

This lambda writes to a bucket defined in <kbd>ingestion\_bucket.tf</kbd>.

{% hint style="info" %}
Get more info on how to run/test this lambda [here](https://github.com/boringdata/boringdata-template-aws-snowflake/blob/main/pipelines/ingest/chess-lambda/README.md)
{% endhint %}

We maintain a YAML file for each data source `ingest/{source_name}_source_schema.yml` to track the source schema and automatically create landing tables in the Snowflake Warehouse.

{% hint style="info" %}
[Why do you need to generate a yaml for each source ?](https://docs.boringdata.io/template-aws-snowflake/project-structure/pages/mvzXlWASNIL69qnoS81s#what-are-less-than-source-greater-than-source_schema.yml-files)
{% endhint %}

## Transform Layer

The transform layer is composed of two artifacts:

* The transformation code in `transform/` (typically a dbt project)
* The infrastructure code (terraform) in `pipelines/*.tf`

### S3 -> Snowflake

`ingestion_snowpipe.tf` automatically reads all the yml files in the `ingest/` folder and creates:

* all landing tables in Snowflake
* all Snowflake's pipes to copy automatically the data from S3 to these tables

### dbt

`transform/` is a standard dbt project with models split into two folders (schemas in Snowflake):

* STAGING: for the transformed data
* MART: for the data ready to be used by the business

The dbt project is run in an ECS task (`ecs_task_dbt.tf`<kbd>)</kbd> .

{% hint style="info" %}
You can get more info on this project and how to run dbt locally and remotely [here](/template-aws-snowflake/project-structure/pipelines/transform.md)
{% endhint %}

Let's take the example of the chess pipeline provided in this repo:

```

├── pipelines/
│       ├── models/
│       │   └── staging/
│       │       └── chess/                    # Chess staging models
│       │           ├── stg_chess_games.sql
│       │           ├── stg_chess_players.sql
│       │           ├── stg_chess_players_games.sql
│       │           ├── stg_chess_...
│       │
│       └── sources/
│           └── chess.yml
├── ingestion_snowpipe.tf       # Terraform for Snowflake landing table + snowpipe creation
├── ecs_task_dbt.tf             # Terraform for creating the ECS task running dbt in AWS
...

```

## Terraform Module

### Example Usage

```hcl
module "chess_lambda" {
  source = "git::https://github.com/boringdata/boringdata-template-aws-snowflake.git//modules/chess_lambda"
  environment = "prod"
  vpc_name = "vpc-12345678"
  ecs_cluster_name = "ecs-cluster-12345678"
}
```

### Diagram

{% @mermaid/diagram content="%%{init: {'theme':'dark'}}%%
graph TB
%% Variables
env\[environment<br/>variable] --> all
vpc\_name\[vpc\_name<br/>variable] --> vpc\_data
ecs\_name\[ecs\_cluster\_name<br/>variable] --> ecs\_data

```
%% Data Sources
vpc_data[aws_vpc<br/>data source] --> subnets
subnets[aws_subnets<br/>data source] --> ecs_task
ecs_data[aws_ecs_cluster<br/>data source] --> ecs_task
region[aws_region<br/>data source] --> all
caller[aws_caller_identity<br/>data source] --> all

%% ECR Repositories
chess_ecr[Chess ECR<br/>Repository] --> chess_lambda
dbt_ecr[DBT ECR<br/>Repository] --> ecs_task

%% Lambda Resources
chess_secrets[Chess Secrets<br/>Manager] --> chess_lambda
chess_lambda[Chess Lambda<br/>Function] --> |writes to| s3_bucket

%% S3 and Snowflake Resources
s3_bucket[Ingestion S3<br/>Bucket] --> snowflake_stage
snowflake_stage[Snowflake<br/>External Stage] --> snowpipes
snowpipes[Snowflake<br/>Pipes] --> landing_tables[Snowflake<br/>Landing Tables]
storage_int[Storage<br/>Integration] --> snowflake_stage

%% ECS Resources
ecs_task[DBT ECS Task<br/>Definition]

%% Styling
classDef variable fill:#e1f7d5
classDef data fill:#c6e2ff
classDef resource fill:#f9d6ff
classDef storage fill:#ffebcc

class env,vpc_name,ecs_name variable
class vpc_data,subnets,ecs_data,region,caller data
class chess_secrets,chess_lambda,ecs_task resource
class chess_ecr,dbt_ecr,s3_bucket,snowflake_stage,snowpipes,landing_tables,storage_int storage" %}
```

## Requirements

| Name                                | Version  |
| ----------------------------------- | -------- |
| [terraform](#requirement_terraform) | >=1.5.7  |
| [aws](#requirement_aws)             | >=5.63.1 |
| [snowflake](#requirement_snowflake) | >=1.0.0  |

## Providers

| Name                             | Version |
| -------------------------------- | ------- |
| [aws](#provider_aws)             | 5.92.0  |
| [null](#provider_null)           | 3.2.3   |
| [snowflake](#provider_snowflake) | 1.0.4   |
| [time](#provider_time)           | 0.13.0  |

## Modules

| Name                                                                        | Source                                                    | Version |
| --------------------------------------------------------------------------- | --------------------------------------------------------- | ------- |
| [bucket\_ingestion\_read\_policies](#module_bucket_ingestion_read_policies) | terraform-aws-modules/iam/aws//modules/iam-policy         | 5.39.1  |
| [chess\_ecr](#module_chess_ecr)                                             | terraform-aws-modules/ecr/aws                             | n/a     |
| [chess\_lambda\_function](#module_chess_lambda_function)                    | terraform-aws-modules/lambda/aws                          | 7.2.1   |
| [chess\_pipeline](#module_chess_pipeline)                                   | terraform-aws-modules/step-functions/aws                  | 4.2.1   |
| [chess\_secrets](#module_chess_secrets)                                     | terraform-aws-modules/secrets-manager/aws                 | 1.1.2   |
| [dbt\_ecr](#module_dbt_ecr)                                                 | terraform-aws-modules/ecr/aws                             | n/a     |
| [dbt\_task\_definition](#module_dbt_task_definition)                        | terraform-aws-modules/ssm-parameter/aws                   | 1.1.1   |
| [ecs\_task\_definition\_dbt](#module_ecs_task_definition_dbt)               | terraform-aws-modules/ecs/aws///modules/service           | 5.11.2  |
| [iam\_role\_assumable\_snowflake](#module_iam_role_assumable_snowflake)     | terraform-aws-modules/iam/aws//modules/iam-assumable-role | 5.39.1  |
| [ingestion\_bucket](#module_ingestion_bucket)                               | terraform-aws-modules/s3-bucket/aws                       | 4.1.0   |
| [s3\_notifications](#module_s3_notifications)                               | terraform-aws-modules/s3-bucket/aws//modules/notification | n/a     |

## Resources

| Name                                                                                                                                                                    | Type        |
| ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------- |
| [null\_resource.chess\_empty\_image](https://registry.terraform.io/providers/hashicorp/null/latest/docs/resources/resource)                                             | resource    |
| [snowflake\_pipe.ingestion\_pipes](https://registry.terraform.io/providers/Snowflake-Labs/snowflake/latest/docs/resources/pipe)                                         | resource    |
| [snowflake\_stage.snowflake\_landing\_stage](https://registry.terraform.io/providers/Snowflake-Labs/snowflake/latest/docs/resources/stage)                              | resource    |
| [snowflake\_storage\_integration.storage\_integration](https://registry.terraform.io/providers/Snowflake-Labs/snowflake/latest/docs/resources/storage_integration)      | resource    |
| [snowflake\_table.snowflake\_landing\_tables](https://registry.terraform.io/providers/Snowflake-Labs/snowflake/latest/docs/resources/table)                             | resource    |
| [time\_sleep.wait\_for\_iam\_role](https://registry.terraform.io/providers/hashicorp/time/latest/docs/resources/sleep)                                                  | resource    |
| [aws\_caller\_identity.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/caller_identity)                                            | data source |
| [aws\_ecs\_cluster.ecs-cluster](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/ecs_cluster)                                             | data source |
| [aws\_iam\_policy\_document.bucket\_ingestion\_read\_write\_policy](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws\_region.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/region)                                                               | data source |
| [aws\_subnets.subnets](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/subnets)                                                          | data source |
| [aws\_vpc.vpc](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/vpc)                                                                      | data source |

## Inputs

| Name                                          | Description                                                          | Type     | Default | Required |
| --------------------------------------------- | -------------------------------------------------------------------- | -------- | ------- | :------: |
| [ecs\_cluster\_name](#input_ecs_cluster_name) | The name of the ECS cluster                                          | `string` | `null`  |    no    |
| [environment](#input_environment)             | The environment to deploy to - will prefix the name of all resources | `string` | n/a     |    yes   |
| [vpc\_name](#input_vpc_name)                  | The name of the VPC to deploy the ECS cluster in                     | `string` | `null`  |    no    |

## Outputs

No outputs.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.boringdata.io/template-aws-snowflake/project-structure/pipelines.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
