Skip to main content

Kafka Integration

Add Kafka helpers to an existing PolePosition project:

polepos add integration kafka

The command creates:

src/<package>/integrations/kafka/
__init__.py
consumer.py
factory.py
producer.py
schemas.py
testing.py

It also updates:

  • src/<package>/settings.py
  • .env.example
  • pyproject.toml

Dependency

The command adds:

aiokafka>=0.12.0

Sync dependencies after adding the integration:

uv sync --extra dev

Settings

Review the Kafka values in .env:

KAFKA_ENABLED=false
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_CLIENT_ID=<package>
KAFKA_DEFAULT_TOPIC=<package>.events
KAFKA_GROUP_ID=<package>
KAFKA_AUTO_OFFSET_RESET=earliest
KAFKA_ACKS=all
# KAFKA_COMPRESSION_TYPE=
KAFKA_REQUEST_TIMEOUT_MS=40000

Required Kafka values should remain active in .env.example. A commented required value such as # KAFKA_BOOTSTRAP_SERVERS=localhost:9092 is treated as missing by polepos check. KAFKA_COMPRESSION_TYPE is optional and may remain commented until needed.

Use the Producer

The generated factory builds a producer from settings. Keep producer lifetime management explicit in your route, service, or application wiring.

Use the generated schemas to keep event payloads predictable.

For a complete first-message walkthrough, see the Kafka Quick Start example.

Consumers and Workers

The generated consumer helper is scaffolding for a worker surface. It is not started automatically by the FastAPI app.

For production, run consumers as explicit worker processes or jobs so API startup remains fast and predictable.

Testing

Use InMemoryKafkaEventProducer from testing.py when unit tests should assert published events without connecting to Kafka.

Validate

polepos check

The check command validates Kafka files, settings, env values, and dependency signals without connecting to Kafka. It distinguishes required active env keys from optional commented examples.