# pipelines/

Data Pipelines are built using a 2-layer architecture:

* `ingest/` for data ingestion from source to S3
* `transform/` for data transformation in Snowflake via dbt

For each layer, the application code is in a separate folder while the underlying infrastructure is defined in terraform files in pipelines/.

```
pipelines/
├── ingest/
├── transform/
├── *.tf
```

A typical data flow looks like this:

{% @mermaid/diagram content="%%{init: {'theme':'dark'}}%%
graph LR
subgraph Ingest Layer
Source --> Lambda --> S3
end

```
subgraph Transform Layer
    S3 --> Snowpipes --> Snowflake+dbt
end

%% Styling
classDef default fill:#f9f9f9,stroke:#333,stroke-width:2px;
classDef layer fill:#e1f7d5,stroke:#333,stroke-width:2px;

class Ingest,Transform layer;" %}
```

## Ingestion Layer

The ingest layer is composed of three artifacts:

* the ingestion code in `pipelines/ingest/{SOURCE_NAME}-ingestion/`
* the data source schema in `ingest/{SOURCE_NAME}_source_schema.yml`
* the infrastructure code (terraform) in `pipelines/*.tf`

Let's take the example of the chess.com pipeline example provided in this repo:

```
pipelines/
├── ingest/
│   ├── chess-lambda/
│   │   ├── lambda_handler.py     # Lambda code embedding DLT for Chess.com ingestion
│   │   └── ...
│   └── chess_source_schema.yml.  # YAML file defining the Chess.com data schema
├── chess_lambda.tf               # Terraform creating the lambda function
├── ingestion_bucket.tf           # Terraform creating target S3 bucket
...
```

The ingestion is done in a lambda function embedding dlt with:

* Source code in `pipelines/ingest/chess-ingestion`
* Terraform in`pipelines/chess_lambda.tf`

This lambda writes to a bucket defined in <kbd>ingestion\_bucket.tf</kbd>.

{% hint style="info" %}
Get more info on how to run/test this lambda [here](https://github.com/boringdata/boringdata-template-aws-snowflake/blob/main/pipelines/ingest/chess-lambda/README.md)
{% endhint %}

We maintain a YAML file for each data source `ingest/{source_name}_source_schema.yml` to track the source schema and automatically create landing tables in the Snowflake Warehouse.

{% hint style="info" %}
[Why do you need to generate a yaml for each source ?](https://docs.boringdata.io/template-aws-snowflake/help/faq#what-are-less-than-source-greater-than-source_schema.yml-files)
{% endhint %}

## Transform Layer

The transform layer is composed of two artifacts:

* The transformation code in `transform/` (typically a dbt project)
* The infrastructure code (terraform) in `pipelines/*.tf`

### S3 -> Snowflake

`ingestion_snowpipe.tf` automatically reads all the yml files in the `ingest/` folder and creates:

* all landing tables in Snowflake
* all Snowflake's pipes to copy automatically the data from S3 to these tables

### dbt

`transform/` is a standard dbt project with models split into two folders (schemas in Snowflake):

* STAGING: for the transformed data
* MART: for the data ready to be used by the business

The dbt project is run in an ECS task (`ecs_task_dbt.tf`<kbd>)</kbd> .

{% hint style="info" %}
You can get more info on this project and how to run dbt locally and remotely [here](https://docs.boringdata.io/template-aws-snowflake/project-structure/pipelines/transform)
{% endhint %}

Let's take the example of the chess pipeline provided in this repo:

```

├── pipelines/
│       ├── models/
│       │   └── staging/
│       │       └── chess/                    # Chess staging models
│       │           ├── stg_chess_games.sql
│       │           ├── stg_chess_players.sql
│       │           ├── stg_chess_players_games.sql
│       │           ├── stg_chess_...
│       │
│       └── sources/
│           └── chess.yml
├── ingestion_snowpipe.tf       # Terraform for Snowflake landing table + snowpipe creation
├── ecs_task_dbt.tf             # Terraform for creating the ECS task running dbt in AWS
...

```

## Terraform Module

### Example Usage

```hcl
module "chess_lambda" {
  source = "git::https://github.com/boringdata/boringdata-template-aws-snowflake.git//modules/chess_lambda"
  environment = "prod"
  vpc_name = "vpc-12345678"
  ecs_cluster_name = "ecs-cluster-12345678"
}
```

### Diagram

{% @mermaid/diagram content="%%{init: {'theme':'dark'}}%%
graph TB
%% Variables
env\[environment<br/>variable] --> all
vpc\_name\[vpc\_name<br/>variable] --> vpc\_data
ecs\_name\[ecs\_cluster\_name<br/>variable] --> ecs\_data

```
%% Data Sources
vpc_data[aws_vpc<br/>data source] --> subnets
subnets[aws_subnets<br/>data source] --> ecs_task
ecs_data[aws_ecs_cluster<br/>data source] --> ecs_task
region[aws_region<br/>data source] --> all
caller[aws_caller_identity<br/>data source] --> all

%% ECR Repositories
chess_ecr[Chess ECR<br/>Repository] --> chess_lambda
dbt_ecr[DBT ECR<br/>Repository] --> ecs_task

%% Lambda Resources
chess_secrets[Chess Secrets<br/>Manager] --> chess_lambda
chess_lambda[Chess Lambda<br/>Function] --> |writes to| s3_bucket

%% S3 and Snowflake Resources
s3_bucket[Ingestion S3<br/>Bucket] --> snowflake_stage
snowflake_stage[Snowflake<br/>External Stage] --> snowpipes
snowpipes[Snowflake<br/>Pipes] --> landing_tables[Snowflake<br/>Landing Tables]
storage_int[Storage<br/>Integration] --> snowflake_stage

%% ECS Resources
ecs_task[DBT ECS Task<br/>Definition]

%% Styling
classDef variable fill:#e1f7d5
classDef data fill:#c6e2ff
classDef resource fill:#f9d6ff
classDef storage fill:#ffebcc

class env,vpc_name,ecs_name variable
class vpc_data,subnets,ecs_data,region,caller data
class chess_secrets,chess_lambda,ecs_task resource
class chess_ecr,dbt_ecr,s3_bucket,snowflake_stage,snowpipes,landing_tables,storage_int storage" %}
```

## Requirements

| Name                                | Version  |
| ----------------------------------- | -------- |
| [terraform](#requirement_terraform) | >=1.5.7  |
| [aws](#requirement_aws)             | >=5.63.1 |
| [snowflake](#requirement_snowflake) | >=1.0.0  |

## Providers

| Name                             | Version |
| -------------------------------- | ------- |
| [aws](#provider_aws)             | 5.92.0  |
| [null](#provider_null)           | 3.2.3   |
| [snowflake](#provider_snowflake) | 1.0.4   |
| [time](#provider_time)           | 0.13.0  |

## Modules

| Name                                                                        | Source                                                    | Version |
| --------------------------------------------------------------------------- | --------------------------------------------------------- | ------- |
| [bucket\_ingestion\_read\_policies](#module_bucket_ingestion_read_policies) | terraform-aws-modules/iam/aws//modules/iam-policy         | 5.39.1  |
| [chess\_ecr](#module_chess_ecr)                                             | terraform-aws-modules/ecr/aws                             | n/a     |
| [chess\_lambda\_function](#module_chess_lambda_function)                    | terraform-aws-modules/lambda/aws                          | 7.2.1   |
| [chess\_pipeline](#module_chess_pipeline)                                   | terraform-aws-modules/step-functions/aws                  | 4.2.1   |
| [chess\_secrets](#module_chess_secrets)                                     | terraform-aws-modules/secrets-manager/aws                 | 1.1.2   |
| [dbt\_ecr](#module_dbt_ecr)                                                 | terraform-aws-modules/ecr/aws                             | n/a     |
| [dbt\_task\_definition](#module_dbt_task_definition)                        | terraform-aws-modules/ssm-parameter/aws                   | 1.1.1   |
| [ecs\_task\_definition\_dbt](#module_ecs_task_definition_dbt)               | terraform-aws-modules/ecs/aws///modules/service           | 5.11.2  |
| [iam\_role\_assumable\_snowflake](#module_iam_role_assumable_snowflake)     | terraform-aws-modules/iam/aws//modules/iam-assumable-role | 5.39.1  |
| [ingestion\_bucket](#module_ingestion_bucket)                               | terraform-aws-modules/s3-bucket/aws                       | 4.1.0   |
| [s3\_notifications](#module_s3_notifications)                               | terraform-aws-modules/s3-bucket/aws//modules/notification | n/a     |

## Resources

| Name                                                                                                                                                                    | Type        |
| ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------- |
| [null\_resource.chess\_empty\_image](https://registry.terraform.io/providers/hashicorp/null/latest/docs/resources/resource)                                             | resource    |
| [snowflake\_pipe.ingestion\_pipes](https://registry.terraform.io/providers/Snowflake-Labs/snowflake/latest/docs/resources/pipe)                                         | resource    |
| [snowflake\_stage.snowflake\_landing\_stage](https://registry.terraform.io/providers/Snowflake-Labs/snowflake/latest/docs/resources/stage)                              | resource    |
| [snowflake\_storage\_integration.storage\_integration](https://registry.terraform.io/providers/Snowflake-Labs/snowflake/latest/docs/resources/storage_integration)      | resource    |
| [snowflake\_table.snowflake\_landing\_tables](https://registry.terraform.io/providers/Snowflake-Labs/snowflake/latest/docs/resources/table)                             | resource    |
| [time\_sleep.wait\_for\_iam\_role](https://registry.terraform.io/providers/hashicorp/time/latest/docs/resources/sleep)                                                  | resource    |
| [aws\_caller\_identity.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/caller_identity)                                            | data source |
| [aws\_ecs\_cluster.ecs-cluster](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/ecs_cluster)                                             | data source |
| [aws\_iam\_policy\_document.bucket\_ingestion\_read\_write\_policy](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws\_region.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/region)                                                               | data source |
| [aws\_subnets.subnets](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/subnets)                                                          | data source |
| [aws\_vpc.vpc](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/vpc)                                                                      | data source |

## Inputs

| Name                                          | Description                                                          | Type     | Default | Required |
| --------------------------------------------- | -------------------------------------------------------------------- | -------- | ------- | :------: |
| [ecs\_cluster\_name](#input_ecs_cluster_name) | The name of the ECS cluster                                          | `string` | `null`  |    no    |
| [environment](#input_environment)             | The environment to deploy to - will prefix the name of all resources | `string` | n/a     |    yes   |
| [vpc\_name](#input_vpc_name)                  | The name of the VPC to deploy the ECS cluster in                     | `string` | `null`  |    no    |

## Outputs

No outputs.
