Skip to content
vic

haileyok/at-kafka

A small service for putting AT firehose events onto Kafka

haileyok/at-kafka.json
{
"createdAt": "2025-11-26T01:34:07Z",
"defaultBranch": "main",
"description": "A small service for putting AT firehose events onto Kafka",
"fullName": "haileyok/at-kafka",
"homepage": null,
"language": "Go",
"name": "at-kafka",
"pushedAt": "2026-01-19T23:24:33Z",
"stargazersCount": 4,
"topics": [],
"updatedAt": "2026-01-19T23:24:36Z",
"url": "https://github.com/haileyok/at-kafka"
}

A small service that receives events from ATProto and produces them to Kafka. Supports:

  • Firehose events from relay or Tap (for network backfill)
  • Ozone moderation events
  • Standard JSON and Osprey-compatible event formats

Three compose files are provided:

Terminal window
# Firehose relay mode (default)
docker compose up -d
# Firehose tap mode (for backfill)
docker compose -f docker-compose.tap.yml up -d
# Ozone moderation events
docker compose -f docker-compose.ozone.yml up -d
environment:
# Relay/Tap connection
ATKAFKA_RELAY_HOST: "wss://bsky.network"
ATKAFKA_TAP_HOST: "ws://localhost:2480"
ATKAFKA_DISABLE_ACKS: false
# Kafka
ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092"
ATKAFKA_OUTPUT_TOPIC: "atproto-events"
ATKAFKA_OSPREY_COMPATIBLE: false
# Filtering
ATKAFKA_WATCHED_SERVICES: "*.bsky.network"
ATKAFKA_IGNORED_SERVICES: "blacksky.app"
ATKAFKA_WATCHED_COLLECTIONS: "app.bsky.*"
ATKAFKA_IGNORED_COLLECTIONS: "fm.teal.*"
environment:
# Ozone connection
ATKAFKA_OZONE_PDS_HOST: "https://pds.example.com"
ATKAFKA_OZONE_IDENTIFIER: "your.handle"
ATKAFKA_OZONE_PASSWORD: "password"
ATKAFKA_OZONE_LABELER_DID: "did:plc:..."
# Kafka
ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092"
ATKAFKA_OUTPUT_TOPIC: "ozone-events"
Terminal window
# Firehose modes
atkafka firehose relay --bootstrap-servers localhost:9092 --output-topic events
atkafka firehose tap --tap-host ws://localhost:2480 --bootstrap-servers localhost:9092 --output-topic events
# Ozone mode
atkafka ozone-events \
--pds-host https://pds.example.com \
--identifier admin@example.com \
--password password \
--labeler-did did:plc:... \
--bootstrap-servers localhost:9092 \
--output-topic ozone-events

Events are structured similarly to the raw AT Protocol firehose, with one key difference: commit events are split into individual operation events.

{
"did": "did:plc:...",
"timestamp": "2024-01-01T12:00:00.000Z",
"operation": {
"action": "create",
"collection": "app.bsky.feed.post",
"rkey": "some-rkey",
"uri": "at://did:plc:123/app.bsky.feed.post/some-rkey",
"cid": "bafyrei...",
"path": "app.bsky.feed.post/...",
"record": {
"text": "Hello world!",
"$type": "app.bsky.feed.post",
"createdAt": "2024-01-01T12:00:00.000Z"
}
}
}
{
"did": "did:plc:...",
"timestamp": "2024-01-01T12:00:00.000Z",
"account": {
"active": true,
"seq": 12345,
"status": "active"
}
}
{
"did": "did:plc:...",
"timestamp": "2024-01-01T12:00:00.000Z",
"identity": {
"seq": 12345,
"handle": "user.bsky.social"
}
}

When --osprey-compatible is enabled, events are wrapped in the Osprey event format:

{
"data": {
"action_name": "operation#create",
"action_id": 1234567890,
"data": {
"did": "did:plc:...",
"timestamp": "2024-01-01T12:00:00.000Z",
"operation": { ... }
},
"timestamp": "2024-01-01T12:00:00.000Z",
"secret_data": {},
"encoding": "UTF8"
},
"send_time": "2024-01-01T12:00:00Z"
}

Action names in Osprey mode:

  • operation#create - Record creation
  • operation#update - Record update
  • operation#delete - Record deletion
  • account - Account status changes
  • identity - Identity/handle changes
  • info - Informational messages

Ozone events are produced as-is from the tools.ozone.moderation.queryEvents API response. Events include moderation actions, reports, and other moderation activity. The cursor is persisted to disk and automatically resumed on restart.

The service exposes Prometheus metrics on the default metrics port.

  • atkafka_handled_events - Total events that are received on the firehose and handled
  • atkafka_produced_events - Total messages that are output on the bus
  • atkafka_plc_requests - Total number of PLC requests that were made, if applicable, and whether they were cached
  • atkafka_api_requests - Total number of API requests that were made, if applicable, and whether they were cached
  • atkafka_cache_size - The size of the PLC and API caches
  • atkafka_acks_sent - Total acks that were sent to Tap, if applicable