Build an ingest pipeline
Fridays I manually hit three Slack channels, a Confluence label, GoodLinks, and whatever YouTube link someone called mandatory—manual scavenger hunt dressed up as “catch-up.” The pipeline replaced it with one script chain: ingest allowed sources, normalize to atoms with hashes and provenance, enrich, publish into Hugo (or whatever destination shares the same contract). Coffee still brewing; Monday’s log already shows what landed, skipped, or failed auth.
When I reach for this #
I need a single loop that captures signals from multiple sources, deduplicates them, enriches them, and publishes to a place the team already reads. Manual curation is drifting, or I’m repeating the same “did we already summarize that?” question every week. I want Claude Code skills to do the scraping and summarizing while I review, and I want a log I can scan Monday morning to see exactly what was captured, skipped, or failed.
What I need before starting #
- Claude Code installed with the relevant ingest skills: Slack channels, Confluence CQL search, GoodLinks bookmarks, YouTube transcript fetcher, generic web page fetcher, Outlook connector if email matters
- Explicit channel allowlists and app credentials (Slack tokens scoped to the opted-in channels, Confluence PAT, GoodLinks export key, YouTube/Google API key if needed)
- A working staging directory (I use
pipelines/ingest/.stage/) plus aknowledge-atoms/directory where normalized JSONL or Markdown lands - A cache file (SQLite or JSON) that tracks the dedupe hashes and last-success timestamps for each source so re-runs know where to resume
- An orchestration script (mine is
scripts/pipeline-a-run.sh) that can runingest → stage → process → publishwithout babysitting - Destination target — usually this Hugo site, so I keep a
content/atoms/folder and a short publish script that rsyncs or copies outputs into Hugo frontmatter-ready files
What I do #
Phases: ingest raw → stage/normalize → process/enrich → publish.
Idempotency: every phase tolerates a re-run after I add a source or fix a bug—dedupe and markers keep duplicates from flooding downstream.
1. Map each source to an ingest skill #
For each signal type I pick the Claude Code skill that already authenticates and emits structured output:
- Slack —
/slack-ingestwith channel allowlists (never a workspace firehose) - Confluence —
/confluence-ingestwith CQL such astype = page AND label = decision - GoodLinks —
/goodlinks-pull; web —/web-ingest; Outlook —/outlook-ingest; YouTube —/youtube-transcript
Every skill drops a raw artifact (JSON or Markdown) into pipelines/ingest/raw/<source>/ with filenames that include ISO timestamps and source IDs. I keep adapters thin—auth and capture only—so later phases own structure and I can swap a connector without rewriting normalization.
2. Normalize into knowledge atoms during staging #
A staging script walks pipelines/ingest/raw/ and wraps each artifact in a common envelope. I keep the logic in pipelines/ingest/stage.py. Each atom at minimum carries:
- Identity — title, canonical URL,
captured_at, source metadata - Body — raw text or structured fields from the connector
- Tags — optional; heavy tagging often waits for the process phase
Dedupe uses source_id + canonical_url hashed into pipelines/ingest/.cache/seen.json. Known hash → skip quietly.
Shape by source: Slack → one atom per thread; Confluence → one per page; GoodLinks → bookmark + note embedded; YouTube → chunks by section markers.
Provenance: every atom points at the raw file path so I can replay staging when the schema changes.
3. Enrich and tag in the process phase #
Processing reads staged atoms and adds fields, not replacements:
- Summaries — 2–3 sentence abstracts (Claude or a fixed prompt)
- Topics — heuristics or a small classifier; normalized tags like
automation,research, team names - Cross-links — when an atom references a known project or prior atom
Summaries and takeaways go in new fields. Original capture stays intact for audit.
Idempotency: set processed_at when an atom is done; skip on re-run unless I force. New schema version → backfill only missing fields so old summaries stay stable.
4. Publish to the knowledge base #
scripts/pipeline-a-run.sh publish turns processed atoms into deliverables. For Hugo here:
- Template each atom to
content/atoms/<slug>.md(frontmatter + body) - Drop a CSV of new rows into
data/feeds/if the home page lists “latest captures”
Other sinks (Notion, API, digest email) hang off the same processed dataset—one enrich pass, many publishers.
Publish touches git last, only after stage + process succeed.
5. Wire the phases together with one command #
Example chain:
./scripts/pipeline-a-run.sh ingest slack confluence links youtube \
&& ./scripts/pipeline-a-run.sh stage \
&& ./scripts/pipeline-a-run.sh process \
&& ./scripts/pipeline-a-run.sh publish
Each step prints a receipt: added vs skipped vs failed. Non-zero exit on failure so hooks or CI can page me.
Keys are deterministic per source ID, so a full re-run after adding a connector only processes new rows.
6. Schedule and observe #
- Cadence — cron or
just ingest-weekly: I like Friday EOD plus Monday AM so the week’s decisions land before standups. - Logs —
pipelines/ingest/logs/with timestamps; first place I look when auth flakes. - Volume signals — Grafana or a simple
stats.jsonper source; silent Confluence for a week is a smell. - Hooks — on critical Slack sources, a session hook can warn if a channel returns zero rows twice in a row (often a revoked scope).
What goes wrong #
- Firehose instead of allowlists — pointing the Slack skill at the whole workspace floods staging with noise. Fix: require an explicit
channels_allowlistarray and have the script refuse to run if it’s empty. - Non-idempotent runs — missing dedupe hashes or processed markers means every run republishes the same atoms. Fix: store source IDs plus timestamps in a cache and gate every stage on those keys.
- Stale credentials — Confluence PATs expire, Slack tokens lose scopes, Outlook refresh tokens get revoked. Fix: centralize secrets in one
.envfile and add a./scripts/pipeline-a-run.sh healthsubcommand that runs lightweight API calls and fails loudly before a full ingest. - Skipping staging — publishing raw Slack JSON directly makes downstream consumers do the normalization work. Fix: treat staging as mandatory and make the publish script read only staged files.
- Run order drift — someone runs
publishwithoutprocessand we ship untagged atoms. Fix: orchestration script enforces the sequence and checks for theprocessed_atfield before allowing publish.
Notes #
Re-runs — keep every stage re-runnable. The best bug fix is often ./scripts/pipeline-a-run.sh … with --since last_success and trust in the dedupe cache.
Schema versions — when I add a field (say enriched_summary), I bump a schema version constant so stages know whether to backfill or skip.
Raw retention — store raw captures for auditing. If someone questions a summary, I reopen the original Slack message or Confluence diff.
Pipeline as code — tests for dedupe logic, lint on stage scripts, and a deploy hook that runs an ingest dry-run before merge.
Runtime — when ingest or process suddenly doubles in duration, I look for a rogue source or a prompt change before it blocks publishing.