Implemented a minimal but production-minded ingestion and normalization flow for a mock third‑party SaaS source.
Part 1: Custom Airbyte Python CDK source reading from local JSON fixtures (no external APIs or keys).
- Streams:
accounts,users,events,invoices. - Pagination: Implemented via
next_cursorfields in the fixtures. - Incremental sync: Based on
updated_at, with proper Airbyte state.
Part 2:
- Normalisation: Local pipeline producing analytics‑ready tables from the raw Airbyte output.
All components run locally and only depend on this repo and Python.
- Python: 3.11 [Dependency issues with higher versions]
- Ensure input jsons are kept in fixtures folder.
Clone the repository to machine:
git clone <your-repo-url> DataConnector_Airbyte
cd DataConnector_AirbyteCreate and activate a virtual environment (ensure Python 3.11):
python3 -m venv .venv
source .venv/bin/activate # macOS / Linux
# .venv\Scripts\activate # Windows PowerShellInstall dependencies:
pip install -r requirements.txtThe connector treats the fixtures/ folder in this repo as the upstream API.
Create a minimal config file at config.json in the repo root:
{
"fixtures_base_path": "fixtures"
}An empty state.json can be used to drive incremental syncs.
{}main.py exposes the standard Airbyte CLI interface.
Inspect the connector specification:
python main.py specThis prints an Airbyte SPEC message describing the config schema (only fixtures_base_path).
Validate that the connector can read the local fixtures:
python main.py check --config config.jsonYou should see an Airbyte CONNECTION_STATUS message with "status": "SUCCEEDED". If it fails, it will list missing fixture files or an invalid path.
List the available streams (accounts, users, events, invoices):
python main.py discover --config config.jsonPrints a CATALOG message with stream schemas and supported sync modes (full refresh and incremental).
Run a read to fetch data from all streams:
python main.py read \
--config config.json \
--catalog configured_catalog.json \
--state state.json \
> raw_output.jsonlNotes:
- Output is a stream of Airbyte messages (JSON‑per‑line):

- Pagination is handled internally via the
next_cursorfields inside the fixtures. - Incremental sync:
- The connector tracks the max
updated_atper stream and emits it asSTATEmessages. - On subsequent runs, records with
updated_atearlier than or equal to the stored cursor are skipped.
- The connector tracks the max
The file normalize.py implements a small, local normalization step:
- Reads
raw_output.jsonl(Airbyte output). - Extracts
RECORDmessages per stream. - Builds:
dim_accounts: an account dimension with unified status and billing fields.fact_invoices: an invoice fact table at the invoice grain.
- Exports these tables as csv files into an
analytics/folder.
After you have a raw_output.jsonl:
python normalize.pyOutputs:
analytics/dim_accounts.csvanalytics/fact_invoices.csv
You can inspect them with DuckDB directly:
duckdb
duckdb> SELECT * FROM 'analytics/dim_accounts.csv' LIMIT 5;
duckdb> SELECT * FROM 'analytics/fact_invoices.csv' LIMIT 5;
The normalization step intentionally smooths schema differences across fixture pages:
-
Accounts
- Some pages use
account_status+billing.country/billing.city. - Later pages use
status+address.billing_country/address.billing_city. normalize.pymaps these into a single shape:statusbilling_countrybilling_city
- Some pages use
-
Invoices
- All invoices are kept at invoice grain (
id,account_id,amount,currency,status, timestamps). - Line item detail is intentionally omitted from the first version of
fact_invoicesto keep the model simple.
- All invoices are kept at invoice grain (
These two tables (dim_accounts, fact_invoices) are analytics‑ready: they have stable schemas, consistent keys, and correct grain.
You now have both the raw Airbyte output as well as two curated tables under analytics/.
Assumptions
- The provided JSON fixtures are an accurate representation of the external SaaS API, including its pagination (
next_cursor) and incremental contract (updated_at). - We only need a single global cursor per stream (max
updated_at), and late‑arriving updates are acceptable as long as they increaseupdated_at. - Local filesystem is an acceptable stand‑in for HTTP calls for this exercise.
Time‑boxed trade‑offs
- Normalisation is kept intentionally small:
- Only two analytics tables (
dim_accounts,fact_invoices) are produced, although more (e.g.fact_events,dim_users,fact_invoice_line_items) could be added. - No complex type‑casting or currency handling beyond what the fixtures provide.
- Only two analytics tables (
- Robust error handling and logging is minimal but the structure is ready for production‑grade enhancements.
- The connector encodes a fixture‑specific
page_maprather than a fully generic paginator.
- Richer modelling
- Add
fact_events,dim_users, andfact_invoice_line_itemswith proper foreign keys. - Introduce slowly changing dimensions (SCD) handling for account attribute changes.
- Add
- Connector hardening
- Add configuration for per‑stream start timestamps.
- Add retry / backoff and HTTP client configuration for a real API.
- Emit more structured logs and metrics (per‑stream row counts, latency).
- Testing & CI
- Unit tests for incremental behaviour and schema drift handling.
- CI pipeline to run connector acceptance tests and normalisation regression tests.
This repo demonstrates the core ingestion + normalisation pattern using only local files and Python, which can be lifted into a production stack with relatively small changes.