How to write a source plugin
Add a new source type to fathomdx by extending SourceProducer. Implement poll() to fetch items and digest() to shape them into deltas.
A source plugin is a Python class that knows how to fetch items from somewhere and shape them into deltas. The source-runner handles polling, deduplication, and writing to the lake. You implement the parts specific to your data source.
This page walks through the four-step workflow the runner expects.
Prerequisites
- A running Fathom stack.
- A clone of the fathomdx repo.
- Python 3.11+ familiarity. The runner uses async/await for I/O.
Step 1: Copy the template
The runner ships with a working starting point at source-runner/sources/template.py. Copy it under a new name:
cp source-runner/sources/template.py source-runner/sources/my_source.py
The template has every required method, default values for every metadata field, and inline comments explaining each one. Read through it once before editing.
Step 2: Set the metadata
At the top of the new class, fill in the identifying fields:
class MySourceProducer(SourceProducer):
source_type = "my-source" # unique ID, lowercase, kebab-case
display_name = "My Source" # what shows in the dashboard
description = "What this source pulls in"
version = "0.1.0"
author = "you"
auth_type = "none" # none | oauth2 | api_key | file
schedule_type = "poll" # poll | watch | import
default_interval = "30m" # 1m, 5m, 15m, 30m, 1h, 6h, daily
digestion = "raw" # raw | llm
default_expiry_days = 30 # None to keep forever
expiry_configurable = True
source_type is the stable ID. Once a source has been added by users, never change it. Renames are migrations.
Step 3: Implement poll()
poll() is where you fetch items. It receives the user's config (from the dashboard form) and the timestamp of the last successful poll. Return a list of RawItem. Always return everything the source currently exposes; the runner deduplicates against previously-seen IDs.
async def poll(self, config: dict, since: float | None = None) -> list[RawItem]:
url = config["url"]
items: list[RawItem] = []
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(url)
resp.raise_for_status()
for entry in resp.json():
items.append(RawItem(
id=entry["id"], # unique within this source
content=entry["text"], # raw payload
timestamp=entry.get("date"),
title=entry.get("title", ""),
url=entry.get("url"),
image_urls=entry.get("images", []),
))
return items
Return an empty list when there's nothing new (or when the fetch fails non-fatally). Raise an exception only when something is genuinely wrong; the runner catches it and surfaces it in the dashboard.
Step 4: Implement digest()
digest() shapes a RawItem into a ProducedDelta. This is where you decide tags, format the content, and pass through any image URLs.
def digest(self, item: RawItem, config: dict | None = None) -> ProducedDelta:
return ProducedDelta(
content=f"{item.title}\n\n{item.content}" if item.title else item.content,
tags=[self.source_type, "feed", f"feed:{config.get('url', '')}"],
source=self.source_type,
timestamp=item.timestamp,
image_urls=item.image_urls,
)
The default tag [self.source_type] is fine for simple cases. Add tags that make later filtering easy: a feed-specific tag, a topic tag, a contact tag if relevant.
If digestion is set to llm, the runner offers users an opt-in summarization step. Most plugins should leave this raw and let users compose summarization separately if they want it.
Step 5: Register the plugin
Open source-runner/source_runner.py and find _register_builtins(). Add an import and a registry entry:
def _register_builtins(self):
from sources.rss import RSSProducer
from sources.mastodon import MastodonProducer
from sources.vault import VaultProducer
from sources.my_source import MySourceProducer # add this
self._registry["rss"] = RSSProducer
self._registry["mastodon"] = MastodonProducer
self._registry["vault"] = VaultProducer
self._registry["my-source"] = MySourceProducer # and this
The key in _registry must match source_type exactly.
Step 6: Rebuild and test
docker compose build source-runner
docker compose up -d source-runner
The new source type appears in the dashboard's "Add source" dialog. Configure one with test values and watch it poll:
docker compose logs source-runner -f
You'll see one log line per poll cycle, and one entry per item digested. Once items have landed:
curl 'http://localhost:8201/v1/deltas?source=my-source&limit=5'
If they show up with the right shape, the plugin is working.
Optional: validate_config()
To reject bad config before it's saved, override validate_config():
def validate_config(self, config: dict) -> list[str]:
errors = []
if not config.get("url", "").startswith("https://"):
errors.append("URL must use https.")
return errors
The dashboard surfaces these errors in the form.
Optional: extract images
If your source has image URLs, the helpers in sources/base.py will fetch and store them as image moments. See RSSProducer._entry_content() for the pattern (around extract_images()).
Things to know
poll()returns everything; the runner deduplicates. Don't try to track "last seen" yourself unless the source's API is fundamentally pull-since. The runner does it for you.source_typeis a permanent identifier. Renames are user-data migrations. Pick a good name on day one.- Async all the way down. Use
httpx.AsyncClient,asyncio.to_threadfor sync libs, neverrequests. The runner is a single event loop. - Errors during poll are recoverable by default. Log and return
[]. The runner retries on the next interval. Raise only for genuinely unrecoverable conditions. - Tags shape recall, not just display. Be deliberate. The reserved tag conventions in reserved-tags-spec.md apply here.
- Authentication.
auth_typeis metadata only. The actual auth flow lives in yourpoll()body usingconfigvalues the user set up (API key, OAuth token, file path).
Sharing a plugin
If your source is generally useful (a popular service, a common log format, a frequently-imported file type), open a PR against fathomdx-io/fathomdx adding it under source-runner/sources/. Plugins that ship with the build are available to every Fathom user without extra setup.
For private sources, keep the file in your fork or in a separate sidecar repo and rebuild the source-runner image when you want to update it.