Skip to main content

Build an ingest pipeline

Every signal source feeds a four-stage pipeline that normalizes, enriches, and publishes knowledge atoms on repeat.

Fridays I manually hit three Slack channels, a Confluence label, GoodLinks, and whatever YouTube link someone called mandatory—manual scavenger hunt dressed up as “catch-up.” The pipeline replaced it with one script chain: ingest allowed sources, normalize to atoms with hashes and provenance, enrich, publish into Hugo (or whatever destination shares the same contract). Coffee still brewing; Monday’s log already shows what landed, skipped, or failed auth.

When I reach for this #

I need a single loop that captures signals from multiple sources, deduplicates them, enriches them, and publishes to a place the team already reads. Manual curation is drifting, or I’m repeating the same “did we already summarize that?” question every week. I want Claude Code skills to do the scraping and summarizing while I review, and I want a log I can scan Monday morning to see exactly what was captured, skipped, or failed.

What I need before starting #

  • Claude Code installed with the relevant ingest skills: Slack channels, Confluence CQL search, GoodLinks bookmarks, YouTube transcript fetcher, generic web page fetcher, Outlook connector if email matters
  • Explicit channel allowlists and app credentials (Slack tokens scoped to the opted-in channels, Confluence PAT, GoodLinks export key, YouTube/Google API key if needed)
  • A working staging directory (I use pipelines/ingest/.stage/) plus a knowledge-atoms/ directory where normalized JSONL or Markdown lands
  • A cache file (SQLite or JSON) that tracks the dedupe hashes and last-success timestamps for each source so re-runs know where to resume
  • An orchestration script (mine is scripts/pipeline-a-run.sh) that can run ingest → stage → process → publish without babysitting
  • Destination target — usually this Hugo site, so I keep a content/atoms/ folder and a short publish script that rsyncs or copies outputs into Hugo frontmatter-ready files

What I do #

Phases: ingest raw → stage/normalize → process/enrich → publish.

Idempotency: every phase tolerates a re-run after I add a source or fix a bug—dedupe and markers keep duplicates from flooding downstream.

1. Map each source to an ingest skill #

For each signal type I pick the Claude Code skill that already authenticates and emits structured output:

  • Slack/slack-ingest with channel allowlists (never a workspace firehose)
  • Confluence/confluence-ingest with CQL such as type = page AND label = decision
  • GoodLinks/goodlinks-pull; web/web-ingest; Outlook/outlook-ingest; YouTube/youtube-transcript

Every skill drops a raw artifact (JSON or Markdown) into pipelines/ingest/raw/<source>/ with filenames that include ISO timestamps and source IDs. I keep adapters thin—auth and capture only—so later phases own structure and I can swap a connector without rewriting normalization.

2. Normalize into knowledge atoms during staging #

A staging script walks pipelines/ingest/raw/ and wraps each artifact in a common envelope. I keep the logic in pipelines/ingest/stage.py. Each atom at minimum carries:

  • Identity — title, canonical URL, captured_at, source metadata
  • Body — raw text or structured fields from the connector
  • Tags — optional; heavy tagging often waits for the process phase

Dedupe uses source_id + canonical_url hashed into pipelines/ingest/.cache/seen.json. Known hash → skip quietly.

Shape by source: Slack → one atom per thread; Confluence → one per page; GoodLinks → bookmark + note embedded; YouTube → chunks by section markers.

Provenance: every atom points at the raw file path so I can replay staging when the schema changes.

3. Enrich and tag in the process phase #

Processing reads staged atoms and adds fields, not replacements:

  • Summaries — 2–3 sentence abstracts (Claude or a fixed prompt)
  • Topics — heuristics or a small classifier; normalized tags like automation, research, team names
  • Cross-links — when an atom references a known project or prior atom

Summaries and takeaways go in new fields. Original capture stays intact for audit.

Idempotency: set processed_at when an atom is done; skip on re-run unless I force. New schema version → backfill only missing fields so old summaries stay stable.

4. Publish to the knowledge base #

scripts/pipeline-a-run.sh publish turns processed atoms into deliverables. For Hugo here:

  • Template each atom to content/atoms/<slug>.md (frontmatter + body)
  • Drop a CSV of new rows into data/feeds/ if the home page lists “latest captures”

Other sinks (Notion, API, digest email) hang off the same processed dataset—one enrich pass, many publishers.

Publish touches git last, only after stage + process succeed.

5. Wire the phases together with one command #

Example chain:

./scripts/pipeline-a-run.sh ingest slack confluence links youtube \
  && ./scripts/pipeline-a-run.sh stage \
  && ./scripts/pipeline-a-run.sh process \
  && ./scripts/pipeline-a-run.sh publish

Each step prints a receipt: added vs skipped vs failed. Non-zero exit on failure so hooks or CI can page me.

Keys are deterministic per source ID, so a full re-run after adding a connector only processes new rows.

6. Schedule and observe #

  • Cadence — cron or just ingest-weekly: I like Friday EOD plus Monday AM so the week’s decisions land before standups.
  • Logspipelines/ingest/logs/ with timestamps; first place I look when auth flakes.
  • Volume signals — Grafana or a simple stats.json per source; silent Confluence for a week is a smell.
  • Hooks — on critical Slack sources, a session hook can warn if a channel returns zero rows twice in a row (often a revoked scope).

What goes wrong #

  • Firehose instead of allowlists — pointing the Slack skill at the whole workspace floods staging with noise. Fix: require an explicit channels_allowlist array and have the script refuse to run if it’s empty.
  • Non-idempotent runs — missing dedupe hashes or processed markers means every run republishes the same atoms. Fix: store source IDs plus timestamps in a cache and gate every stage on those keys.
  • Stale credentials — Confluence PATs expire, Slack tokens lose scopes, Outlook refresh tokens get revoked. Fix: centralize secrets in one .env file and add a ./scripts/pipeline-a-run.sh health subcommand that runs lightweight API calls and fails loudly before a full ingest.
  • Skipping staging — publishing raw Slack JSON directly makes downstream consumers do the normalization work. Fix: treat staging as mandatory and make the publish script read only staged files.
  • Run order drift — someone runs publish without process and we ship untagged atoms. Fix: orchestration script enforces the sequence and checks for the processed_at field before allowing publish.

Notes #

Re-runs — keep every stage re-runnable. The best bug fix is often ./scripts/pipeline-a-run.sh … with --since last_success and trust in the dedupe cache.

Schema versions — when I add a field (say enriched_summary), I bump a schema version constant so stages know whether to backfill or skip.

Raw retention — store raw captures for auditing. If someone questions a summary, I reopen the original Slack message or Confluence diff.

Pipeline as code — tests for dedupe logic, lint on stage scripts, and a deploy hook that runs an ingest dry-run before merge.

Runtime — when ingest or process suddenly doubles in duration, I look for a rogue source or a prompt change before it blocks publishing.