Add a New Pipeline

This guide explains how to add a new data pipeline to the template.

The pipeline architecture includes:

  1. Data ingestion using serverless functions (AWS Lambda) and an ELT tool (dlt)

  2. Staging in cloud object storage (Amazon S3)

  3. Automated data loading into Snowflake landing tables

  4. Data transformation using SQL analytics with dbt

The boringdata CLI automates many steps along the way.

Before you start, make sure you have installed the boringdata CLI:

uv tool install git+ssh://[email protected]/boringdata/boringdata-cli.git --python 3.12

You can then use the boringdata CLI from any directory:

uvx boringdata --help

Step 1: Add a New Data Source

Let's start by adding a new data source for ingestion.

The template uses dlt as the ingestion framework. Check the dlt ecosystem to find the connector you want.

You can then generate a full ingestion pipeline for this connector by running:

cd pipelines && uvx boringdata dlt add-source <connector_name>

This command will create the following files:

pipelines/<source_name>_lambda.tf = serverless function (AWS lambda) infrastructure

pipelines/ingest/<source_name>-ingestion/* = Lambda's dockerized code

Boringdata will also run some helpful operations:

  • Set up a Python virtual environment and install the necessary dependencies

  • Copy .env.example to .env.local

  • Initialize the dlt data connector

  • Parse required secrets from configuration files and update both environment variables and infrastructure configurations

Example using the Notion API as a source:

cd pipelines && uvx boringdata dlt add-source notion

You can assign a different name to your source than the connector name.

To do so, add the CLI option: --source-name <source_name>

Step 2: Configure Secrets

If your source requires secrets (for example, an API key), update the .env.example.

After deployment, update these secrets manually in AWS Secrets Manager if needed.

Example for Notion integration:

The following lines should be present in the .env file:

SOURCES__NOTION__API_KEY="your_api_key_here"

Step 3: Customize the Ingestion Logic

Edit pipelines/ingest/<source_name>-ingestion/lambda_handler.py

pipelines/ingest/<source_name>-ingestion/lambda_handler.py
#Add missing imports
from <source_name> import <source_functions>
...

#Update the scope of data to be loaded
load_data =

Example for Notion integration:

from notion import notion_databases
...

#Update the scope of data to be loaded
load_data = notion_databases(database_ids=["your_database_id"])

Use the <connector_name>_pipeline.py generated by dlt as an inspiration

Step 4: Test the Ingestion Function Locally

To verify your changes, run the function locally (using DuckDB as a local target):

cd pipelines/ingest/<source_name>-ingestion/ && make run-local

This step allows you to test the function and inspect the output data format.

Step 5: Generate the Source Schema

Generate a YAML file that defines your source's data structure (used to create data warehouse tables in Snowflake):

uvx boringdata dlt get-schema <source_name> \
    --engine snowflake \
    --output-folder pipelines/ingest/

Step 6: Create Transformation Models

Based on the YAML file generated in step 5, boringdata can automatically generate corresponding SQL transformation models for each of the tables:

uvx boringdata dbt import-source \
    --source-yml pipelines/ingest/<source_name>_source_schema.yml \
    --output-folder pipelines/transform

Step 7: (Optional) Add Workflow Automation

To coordinate the ingestion and transformation steps, add workflow automation using AWS Step Functions:

uvx boringdata aws step-function lambda-dbt \
    --output-folder pipelines \
    --source-name <source_name>

Step 8: Deploy the Infrastructure

Finally, deploy the project:

export AWS_PROFILE=your_aws_profile
export SNOWFLAKE_PROFILE=your_snowflake_profile
export ENVIRONMENT=dev
make deploy

Last updated