Add a New Pipeline
This guide explains how to add a new data pipeline to the template.
The pipeline architecture includes:
Data ingestion using serverless functions (AWS Lambda) and an ELT tool (dlt)
Staging in cloud object storage (Amazon S3)
Automated data loading into Snowflake landing tables
Data transformation using SQL analytics with dbt
The boringdata CLI automates many steps along the way.
Before you start, make sure you have installed the boringdata CLI:
uv tool install git+ssh://[email protected]/boringdata/boringdata-cli.git --python 3.12
You can then use the boringdata CLI from any directory:
uvx boringdata --help
Step 1: Add a New Data Source
Let's start by adding a new data source for ingestion.
The template uses dlt as the ingestion framework. Check the dlt ecosystem to find the connector you want.
You can then generate a full ingestion pipeline for this connector by running:
cd pipelines && uvx boringdata dlt add-source <connector_name>
This command will create the following files:
pipelines/<source_name>_lambda.tf
= serverless function (AWS lambda) infrastructure
pipelines/ingest/<source_name>-ingestion/*
= Lambda's dockerized code
Boringdata will also run some helpful operations:
Set up a Python virtual environment and install the necessary dependencies
Copy
.env.example
to.env.local
Initialize the dlt data connector
Parse required secrets from configuration files and update both environment variables and infrastructure configurations
Example using the Notion API as a source:
cd pipelines && uvx boringdata dlt add-source notion
Step 2: Configure Secrets
If your source requires secrets (for example, an API key), update the .env.example.
After deployment, update these secrets manually in AWS Secrets Manager if needed.
Example for Notion integration:
The following lines should be present in the .env file:
SOURCES__NOTION__API_KEY="your_api_key_here"
Step 3: Customize the Ingestion Logic
Edit pipelines/ingest/<source_name>-ingestion/lambda_handler.py
#Add missing imports
from <source_name> import <source_functions>
...
#Update the scope of data to be loaded
load_data =
Example for Notion integration:
from notion import notion_databases
...
#Update the scope of data to be loaded
load_data = notion_databases(database_ids=["your_database_id"])
Step 4: Test the Ingestion Function Locally
To verify your changes, run the function locally (using DuckDB as a local target):
cd pipelines/ingest/<source_name>-ingestion/ && make run-local
This step allows you to test the function and inspect the output data format.
Step 5: Generate the Source Schema
Generate a YAML file that defines your source's data structure (used to create data warehouse tables in Snowflake):
uvx boringdata dlt get-schema <source_name> \
--engine snowflake \
--output-folder pipelines/ingest/
Step 6: Create Transformation Models
Based on the YAML file generated in step 5, boringdata can automatically generate corresponding SQL transformation models for each of the tables:
uvx boringdata dbt import-source \
--source-yml pipelines/ingest/<source_name>_source_schema.yml \
--output-folder pipelines/transform
Step 7: (Optional) Add Workflow Automation
To coordinate the ingestion and transformation steps, add workflow automation using AWS Step Functions:
uvx boringdata aws step-function lambda-dbt \
--output-folder pipelines \
--source-name <source_name>
Step 8: Deploy the Infrastructure
Finally, deploy the project:
export AWS_PROFILE=your_aws_profile
export SNOWFLAKE_PROFILE=your_snowflake_profile
export ENVIRONMENT=dev
make deploy
Last updated