haileyok/at-kafka
A small service for putting AT firehose events onto Kafka
{ "createdAt": "2025-11-26T01:34:07Z", "defaultBranch": "main", "description": "A small service for putting AT firehose events onto Kafka", "fullName": "haileyok/at-kafka", "homepage": null, "language": "Go", "name": "at-kafka", "pushedAt": "2026-01-19T23:24:33Z", "stargazersCount": 4, "topics": [], "updatedAt": "2026-01-19T23:24:36Z", "url": "https://github.com/haileyok/at-kafka"}at-kafka
Section titled “at-kafka”A small service that receives events from ATProto and produces them to Kafka. Supports:
- Firehose events from relay or Tap (for network backfill)
- Ozone moderation events
- Standard JSON and Osprey-compatible event formats
Docker Compose
Section titled “Docker Compose”Three compose files are provided:
# Firehose relay mode (default)docker compose up -d
# Firehose tap mode (for backfill)docker compose -f docker-compose.tap.yml up -d
# Ozone moderation eventsdocker compose -f docker-compose.ozone.yml up -dFirehose Configuration
Section titled “Firehose Configuration”environment: # Relay/Tap connection ATKAFKA_RELAY_HOST: "wss://bsky.network" ATKAFKA_TAP_HOST: "ws://localhost:2480" ATKAFKA_DISABLE_ACKS: false
# Kafka ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092" ATKAFKA_OUTPUT_TOPIC: "atproto-events" ATKAFKA_OSPREY_COMPATIBLE: false
# Filtering ATKAFKA_WATCHED_SERVICES: "*.bsky.network" ATKAFKA_IGNORED_SERVICES: "blacksky.app" ATKAFKA_WATCHED_COLLECTIONS: "app.bsky.*" ATKAFKA_IGNORED_COLLECTIONS: "fm.teal.*"Ozone Configuration
Section titled “Ozone Configuration”environment: # Ozone connection ATKAFKA_OZONE_PDS_HOST: "https://pds.example.com" ATKAFKA_OZONE_IDENTIFIER: "your.handle" ATKAFKA_OZONE_PASSWORD: "password" ATKAFKA_OZONE_LABELER_DID: "did:plc:..."
# Kafka ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092" ATKAFKA_OUTPUT_TOPIC: "ozone-events"# Firehose modesatkafka firehose relay --bootstrap-servers localhost:9092 --output-topic eventsatkafka firehose tap --tap-host ws://localhost:2480 --bootstrap-servers localhost:9092 --output-topic events
# Ozone modeatkafka ozone-events \ --pds-host https://pds.example.com \ --identifier admin@example.com \ --password password \ --labeler-did did:plc:... \ --bootstrap-servers localhost:9092 \ --output-topic ozone-eventsEvent Structure
Section titled “Event Structure”Firehose Events
Section titled “Firehose Events”Events are structured similarly to the raw AT Protocol firehose, with one key difference: commit events are split into individual operation events.
Operation Event
Section titled “Operation Event”{ "did": "did:plc:...", "timestamp": "2024-01-01T12:00:00.000Z", "operation": { "action": "create", "collection": "app.bsky.feed.post", "rkey": "some-rkey", "uri": "at://did:plc:123/app.bsky.feed.post/some-rkey", "cid": "bafyrei...", "path": "app.bsky.feed.post/...", "record": { "text": "Hello world!", "$type": "app.bsky.feed.post", "createdAt": "2024-01-01T12:00:00.000Z" } }}Account Event
Section titled “Account Event”{ "did": "did:plc:...", "timestamp": "2024-01-01T12:00:00.000Z", "account": { "active": true, "seq": 12345, "status": "active" }}Identity Event
Section titled “Identity Event”{ "did": "did:plc:...", "timestamp": "2024-01-01T12:00:00.000Z", "identity": { "seq": 12345, "handle": "user.bsky.social" }}Osprey-Compatible Mode
Section titled “Osprey-Compatible Mode”When --osprey-compatible is enabled, events are wrapped in the Osprey event format:
{ "data": { "action_name": "operation#create", "action_id": 1234567890, "data": { "did": "did:plc:...", "timestamp": "2024-01-01T12:00:00.000Z", "operation": { ... } }, "timestamp": "2024-01-01T12:00:00.000Z", "secret_data": {}, "encoding": "UTF8" }, "send_time": "2024-01-01T12:00:00Z"}Action names in Osprey mode:
operation#create- Record creationoperation#update- Record updateoperation#delete- Record deletionaccount- Account status changesidentity- Identity/handle changesinfo- Informational messages
Ozone Events
Section titled “Ozone Events”Ozone events are produced as-is from the tools.ozone.moderation.queryEvents API response. Events include moderation actions, reports, and other moderation activity. The cursor is persisted to disk and automatically resumed on restart.
Monitoring
Section titled “Monitoring”The service exposes Prometheus metrics on the default metrics port.
atkafka_handled_events- Total events that are received on the firehose and handledatkafka_produced_events- Total messages that are output on the busatkafka_plc_requests- Total number of PLC requests that were made, if applicable, and whether they were cachedatkafka_api_requests- Total number of API requests that were made, if applicable, and whether they were cachedatkafka_cache_size- The size of the PLC and API cachesatkafka_acks_sent- Total acks that were sent to Tap, if applicable