Topic¶
minikafka.Topic ¶
Bases: Generic[ModelT]
Typed, append-only queue backed by a Pydantic model.
A Topic is a logical message log inside a Source. Every record
is validated against the topic's model before insert and may be
deduplicated on a chosen tuple of fields. Records start in the new
state and transition to handled once acknowledged via
set_handled or by being consumed by a pipeline.
Use iter_new to stream pending work and set_handled to ack a
record. To transform a topic into another, use
topic.pipe(fn).to(target) and call run() on the resulting
Pipeline.
Topics are created with Source.topic — do not construct directly.
Attributes:
| Name | Type | Description |
|---|---|---|
source |
The owning |
|
name |
Topic name (matches the row in the |
|
model |
The Pydantic |
Examples:
videos = source.topic("videos", Video, dedup=("url",))
videos.append({"url": "https://x", "title": "hi"})
for record in videos.iter_new(records=True):
print(record.id, record.data.title)
videos.set_handled(record=record)
Source code in src/minikafka/core.py
append ¶
Validate and insert a new record into the topic.
The payload is validated against the topic's model (raising
pydantic.ValidationError on mismatch) and then inserted as a
new status='new' row. If the topic has dedup fields configured
and another row already exists with the same dedup values,
DuplicateMessageError is raised.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
payload
|
ModelT | dict[str, Any]
|
Either a Pydantic model instance of the topic's type
or a |
required |
Returns:
| Type | Description |
|---|---|
ModelT
|
The validated Pydantic model instance that was stored. |
Raises:
| Type | Description |
|---|---|
DuplicateMessageError
|
A row with the same dedup fields already exists in this topic. |
ValidationError
|
The payload does not match the topic's model. |
Emits
message_appended with kwargs topic and payload
(the canonical dict, not the model instance).
Source code in src/minikafka/core.py
count ¶
Return the number of records in this topic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
status
|
str | None
|
Optional filter — |
None
|
Returns:
| Type | Description |
|---|---|
int
|
The record count as an integer. |
Examples:
topic.count() # all records
topic.count("new") # only unhandled
topic.count("handled") # only acknowledged
Source code in src/minikafka/core.py
iter_handled ¶
iter_handled(*, records: bool = False, as_dict: bool = False) -> Iterator[ModelT | Record[ModelT] | dict[str, Any]]
Iterate the topic's handled rows in insertion order.
Identical to iter_new but yields rows whose status has been
transitioned to handled. Useful for auditing or replays.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
records
|
bool
|
If |
False
|
as_dict
|
bool
|
If |
False
|
Yields:
| Type | Description |
|---|---|
ModelT | Record[ModelT] | dict[str, Any]
|
Decoded |
ModelT | Record[ModelT] | dict[str, Any]
|
plain |
Source code in src/minikafka/core.py
iter_new ¶
iter_new(*, records: bool = False, as_dict: bool = False) -> Iterator[ModelT | Record[ModelT] | dict[str, Any]]
Iterate the topic's new (unhandled) rows in insertion order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
records
|
bool
|
If |
False
|
as_dict
|
bool
|
If |
False
|
Yields:
| Type | Description |
|---|---|
ModelT | Record[ModelT] | dict[str, Any]
|
Decoded |
ModelT | Record[ModelT] | dict[str, Any]
|
plain |
Raises:
| Type | Description |
|---|---|
ValueError
|
|
Examples:
for video in topic.iter_new():
print(video.title)
for record in topic.iter_new(records=True):
print(record.id, record.created_at, record.data)
Source code in src/minikafka/core.py
migrate ¶
migrate(new_model: type[OutputT], migration_function: Callable[[ModelT], OutputT | dict[str, Any]]) -> Topic[OutputT]
Rewrite every stored payload using a new model.
Runs migration_function on each row's decoded payload, validates
the result against new_model, and rewrites the row in place.
The topic's recorded schema and schema hash are updated atomically
with the rows. The dedup-field tuple is preserved and must still
be present on new_model.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
new_model
|
type[OutputT]
|
Target Pydantic |
required |
migration_function
|
Callable[[ModelT], OutputT | dict[str, Any]]
|
Callable that converts an old model instance into a new model instance (or a dict that validates as one). |
required |
Returns:
| Type | Description |
|---|---|
Topic[OutputT]
|
A new |
Raises:
| Type | Description |
|---|---|
ValueError
|
The previous dedup fields are not present on
|
TypeError
|
|
ValidationError
|
A migrated payload does not match
|
Source code in src/minikafka/core.py
pipe ¶
Start building a pipeline that transforms this topic.
Returns a Pipeline with self as the source and fn as
the transformation. Chain .to(target) to designate a target
topic, then call .run() to execute. Without .to(...) the
pipeline simply collects the return values of fn.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
Callable[[ModelT], Any]
|
Callable that takes a model instance of this topic's type
and returns either a model instance for the target topic,
a |
required |
Returns:
| Type | Description |
|---|---|
Pipeline[ModelT, Any]
|
A |
Examples:
Source code in src/minikafka/core.py
set_handled ¶
Mark a record as handled.
Pass either the integer record id positionally or a Record
instance via the record= keyword. The transition is idempotent:
calling set_handled on an already-handled row simply refreshes
handled_at.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
_id
|
int | None
|
Numeric primary key of the record to ack. |
None
|
record
|
Record[Any] | None
|
Alternative — a |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
Neither |
Emits
message_handled with kwargs topic and id.
Source code in src/minikafka/core.py
to_polars ¶
Return all rows in this topic as a Polars DataFrame.
Includes both new and handled rows. Only flat (non-nested)
payloads are supported — nested dict / list / tuple
values raise ValueError.
Returns:
| Type | Description |
|---|---|
Any
|
A |
Raises:
| Type | Description |
|---|---|
ImportError
|
|
ValueError
|
A payload contains a nested collection. |