Data Lake Architecture

Data Lake Architecture #

This document defines how our data lake is structured, how ingestion workers write data, and how ingestion state is tracked.
The design is vendor-neutral — no analytics or transformation layer assumptions.


Goals #

  • Unified raw storage for all external integrations.
  • Separate data domains (accounting vs trigger providers).
  • Cheap, append-only, replayable storage.
  • Tool-agnostic control plane for tracking ingestion progress (no database dependency).
  • Simple, discoverable directory layout.

High-level flow #

[Xero / QBO / Google Drive / SharePoint / etc.]
     Ingestion Workers (Rust/Python)
     s3://tofu-datalake-<env>/
          ├── accounting/
          │     ├── xero/
          │     └── qbo/
          └── trigger/
                ├── google_drive/
                └── sharepoint/
     (Downstream consumers: data warehouse, analytics, ML, or other pipelines)

Bucket structure #

Each environment/account has its own bucket, e.g. s3://tofu-datalake-prod/.

Accounting data #

accounting/
  xero/
    invoices/ingest_date=YYYY-MM-DD/<entity_id>-<ulid>.json.gz
    contacts/ingest_date=YYYY-MM-DD/<entity_id>-<ulid>.json.gz
  qbo/
    invoices/ingest_date=YYYY-MM-DD/<entity_id>-<ulid>.json.gz
    vendor/ingest_date=YYYY-MM-DD/<entity_id>-<ulid>.json.gz
    customer/ingest_date=YYYY-MM-DD/<entity_id>-<ulid>.json.gz

Trigger data #

:construction:

Control plane (watermarks) #

control/
  watermarks/{source_system}/{resource_type}/{entity_id}.json

File Format #

  • NDJSON (newline-delimited JSON)
  • gzip compression
  • one file per tenant per batch/run (split if very large)
  • Target compressed size: 5-100MB

Each line = one JSON object, e.g. one invoice.

Example #

{
  "source_provider": "xero",
  "entity_id": "00000000-0000-0000-0000-000000000001",
  "organization_id": "00000000-0000-0000-0000-000000000002",
  "external_resource_id": "00000000-0000-0000-0000-000000000000",
  "external_organization_id": "abc",
  "updated_at": "2025-10-23T05:59:12Z",
  "payload": { "... full Xero invoice object ..." }
}

NDJSON #

Each .json.gz file contains many lines, and each line is one resource (no outer array). Top-level fields make dedupe/upserts trivial; payload is the untouched resource object, e.g. an invoice.

Example file contents after decompressing #

{"source_provider": "xero","entity_id": "00000000-0000-0000-0000-000000000001","organization_id": "00000000-0000-0000-0000-000000000002","external_resource_id": "00000000-0000-0000-0000-000000000000","external_organization_id": "abc","updated_at": "2025-10-23T05:59:12Z","payload": { "... full Xero invoice object ..." }}
{"source_provider": "xero","entity_id": "00000000-0000-0000-0000-000000000001","organization_id": "00000000-0000-0000-0000-000000000002","external_resource_id": "00000000-0000-0000-0000-000000000003","external_organization_id": "abc","updated_at": "2025-10-23T06:00:32Z","payload": { "... full Xero invoice object ..." }}

Field mappings #

We will upsert/dedupe key using following mappings.

Accounting #

  • entity_id: The ID of entity that we manage in our database.
  • organization_id: The ID of organization that we manage in our database.
  • external_resource_id: The ID of resource, e.g. invoice, in accounting software.
  • external_organization_id: The ID of organization, e.g. tenant in Xero, in accounting software.

Watermarks #

Watermarks record the last successful ingestion state per (entity_id, organization_id, resource_type, source_provider). We use this watermark to avoid fetching resources that we already have in our data lake, and avoid concurrent workers to upload files with the same name at the same time.

Example #

{
  "source_provider": "xero",
  "entity_id": "00000000-0000-0000-0000-000000000001",
  "organization_id": "00000000-0000-0000-0000-000000000002",
  "resource_type": "invoice",
}

Naming and Conventions #

Field Description
source_provider The name of external integration. e.g. Xero
ulid Unique file id (ULID for monotonic sortability
entity_id ID of entity that we manage in our database
organization_id ID of organization that we manage in our database
resource Object from external integration, e.g. Invoice