Skip to content

Topic

minikafka.Topic

Topic(source: Source, name: str, model: type[ModelT])

Bases: Generic[ModelT]

Typed, append-only queue backed by a Pydantic model.

A Topic is a logical message log inside a Source. Every record is validated against the topic's model before insert and may be deduplicated on a chosen tuple of fields. Records start in the new state and transition to handled once acknowledged via set_handled or by being consumed by a pipeline.

Use iter_new to stream pending work and set_handled to ack a record. To transform a topic into another, use topic.pipe(fn).to(target) and call run() on the resulting Pipeline.

Topics are created with Source.topic — do not construct directly.

Attributes:

Name Type Description
source

The owning Source.

name

Topic name (matches the row in the topics SQLite table).

model

The Pydantic BaseModel subclass for this topic.

Examples:

videos = source.topic("videos", Video, dedup=("url",))
videos.append({"url": "https://x", "title": "hi"})

for record in videos.iter_new(records=True):
    print(record.id, record.data.title)
    videos.set_handled(record=record)
Source code in src/minikafka/core.py
def __init__(self, source: Source, name: str, model: type[ModelT]):
    self.source = source
    self.name = name
    self.model = model

append

append(payload: ModelT | dict[str, Any]) -> ModelT

Validate and insert a new record into the topic.

The payload is validated against the topic's model (raising pydantic.ValidationError on mismatch) and then inserted as a new status='new' row. If the topic has dedup fields configured and another row already exists with the same dedup values, DuplicateMessageError is raised.

Parameters:

Name Type Description Default
payload ModelT | dict[str, Any]

Either a Pydantic model instance of the topic's type or a dict that can be coerced into one.

required

Returns:

Type Description
ModelT

The validated Pydantic model instance that was stored.

Raises:

Type Description
DuplicateMessageError

A row with the same dedup fields already exists in this topic.

ValidationError

The payload does not match the topic's model.

Emits

message_appended with kwargs topic and payload (the canonical dict, not the model instance).

Source code in src/minikafka/core.py
def append(self, payload: ModelT | dict[str, Any]) -> ModelT:
    """Validate and insert a new record into the topic.

    The payload is validated against the topic's model (raising
    ``pydantic.ValidationError`` on mismatch) and then inserted as a
    new ``status='new'`` row. If the topic has dedup fields configured
    and another row already exists with the same dedup values,
    ``DuplicateMessageError`` is raised.

    Args:
        payload: Either a Pydantic model instance of the topic's type
            or a ``dict`` that can be coerced into one.

    Returns:
        The validated Pydantic model instance that was stored.

    Raises:
        DuplicateMessageError: A row with the same dedup fields already
            exists in this topic.
        pydantic.ValidationError: The payload does not match the topic's
            model.

    Emits:
        ``message_appended`` with kwargs ``topic`` and ``payload``
        (the canonical dict, not the model instance).
    """
    model = self._validate(payload)
    payload_dict = _model_to_payload(model)
    self._insert_payload(payload_dict)
    return model

count

count(status: str | None = None) -> int

Return the number of records in this topic.

Parameters:

Name Type Description Default
status str | None

Optional filter — "new", "handled", or None (default) to count all records regardless of status.

None

Returns:

Type Description
int

The record count as an integer.

Examples:

topic.count()            # all records
topic.count("new")       # only unhandled
topic.count("handled")   # only acknowledged
Source code in src/minikafka/core.py
def count(self, status: str | None = None) -> int:
    """Return the number of records in this topic.

    Args:
        status: Optional filter — ``"new"``, ``"handled"``, or ``None``
            (default) to count all records regardless of status.

    Returns:
        The record count as an integer.

    Examples:
        ```python
        topic.count()            # all records
        topic.count("new")       # only unhandled
        topic.count("handled")   # only acknowledged
        ```
    """
    if status is not None:
        row = self.source._conn.execute(
            "SELECT COUNT(*) FROM messages WHERE topic = ? AND status = ?",
            (self.name, status),
        ).fetchone()
    else:
        row = self.source._conn.execute(
            "SELECT COUNT(*) FROM messages WHERE topic = ?",
            (self.name,),
        ).fetchone()
    return row[0]

iter_handled

iter_handled(*, records: bool = False, as_dict: bool = False) -> Iterator[ModelT | Record[ModelT] | dict[str, Any]]

Iterate the topic's handled rows in insertion order.

Identical to iter_new but yields rows whose status has been transitioned to handled. Useful for auditing or replays.

Parameters:

Name Type Description Default
records bool

If True, yield Record instances.

False
as_dict bool

If True, yield raw payload dicts.

False

Yields:

Type Description
ModelT | Record[ModelT] | dict[str, Any]

Decoded ModelT instances, Record[ModelT] wrappers, or

ModelT | Record[ModelT] | dict[str, Any]

plain dict payloads.

Source code in src/minikafka/core.py
def iter_handled(
    self, *, records: bool = False, as_dict: bool = False
) -> Iterator[ModelT | Record[ModelT] | dict[str, Any]]:
    """Iterate the topic's ``handled`` rows in insertion order.

    Identical to ``iter_new`` but yields rows whose status has been
    transitioned to ``handled``. Useful for auditing or replays.

    Args:
        records: If ``True``, yield ``Record`` instances.
        as_dict: If ``True``, yield raw payload dicts.

    Yields:
        Decoded ``ModelT`` instances, ``Record[ModelT]`` wrappers, or
        plain ``dict`` payloads.
    """
    yield from self._iter(status="handled", records=records, as_dict=as_dict)

iter_new

iter_new(*, records: bool = False, as_dict: bool = False) -> Iterator[ModelT | Record[ModelT] | dict[str, Any]]

Iterate the topic's new (unhandled) rows in insertion order.

Parameters:

Name Type Description Default
records bool

If True, yield Record instances with storage metadata. Defaults to False (yield decoded models).

False
as_dict bool

If True, yield raw payload dicts and skip Pydantic validation. Useful for to_polars. Defaults to False.

False

Yields:

Type Description
ModelT | Record[ModelT] | dict[str, Any]

Decoded ModelT instances, Record[ModelT] wrappers, or

ModelT | Record[ModelT] | dict[str, Any]

plain dict payloads depending on the flags above.

Raises:

Type Description
ValueError

records and as_dict are both True.

Examples:

for video in topic.iter_new():
    print(video.title)

for record in topic.iter_new(records=True):
    print(record.id, record.created_at, record.data)
Source code in src/minikafka/core.py
def iter_new(
    self, *, records: bool = False, as_dict: bool = False
) -> Iterator[ModelT | Record[ModelT] | dict[str, Any]]:
    """Iterate the topic's ``new`` (unhandled) rows in insertion order.

    Args:
        records: If ``True``, yield ``Record`` instances with storage
            metadata. Defaults to ``False`` (yield decoded models).
        as_dict: If ``True``, yield raw payload dicts and skip Pydantic
            validation. Useful for ``to_polars``. Defaults to ``False``.

    Yields:
        Decoded ``ModelT`` instances, ``Record[ModelT]`` wrappers, or
        plain ``dict`` payloads depending on the flags above.

    Raises:
        ValueError: ``records`` and ``as_dict`` are both ``True``.

    Examples:
        ```python
        for video in topic.iter_new():
            print(video.title)

        for record in topic.iter_new(records=True):
            print(record.id, record.created_at, record.data)
        ```
    """
    yield from self._iter(status="new", records=records, as_dict=as_dict)

migrate

migrate(new_model: type[OutputT], migration_function: Callable[[ModelT], OutputT | dict[str, Any]]) -> Topic[OutputT]

Rewrite every stored payload using a new model.

Runs migration_function on each row's decoded payload, validates the result against new_model, and rewrites the row in place. The topic's recorded schema and schema hash are updated atomically with the rows. The dedup-field tuple is preserved and must still be present on new_model.

Parameters:

Name Type Description Default
new_model type[OutputT]

Target Pydantic BaseModel subclass.

required
migration_function Callable[[ModelT], OutputT | dict[str, Any]]

Callable that converts an old model instance into a new model instance (or a dict that validates as one).

required

Returns:

Type Description
Topic[OutputT]

A new Topic[OutputT] bound to the same name and source.

Raises:

Type Description
ValueError

The previous dedup fields are not present on new_model.

TypeError

new_model is not a Pydantic BaseModel subclass.

ValidationError

A migrated payload does not match new_model.

Source code in src/minikafka/core.py
def migrate(
    self,
    new_model: type[OutputT],
    migration_function: Callable[[ModelT], OutputT | dict[str, Any]],
) -> Topic[OutputT]:
    """Rewrite every stored payload using a new model.

    Runs ``migration_function`` on each row's decoded payload, validates
    the result against ``new_model``, and rewrites the row in place.
    The topic's recorded schema and schema hash are updated atomically
    with the rows. The dedup-field tuple is preserved and must still
    be present on ``new_model``.

    Args:
        new_model: Target Pydantic ``BaseModel`` subclass.
        migration_function: Callable that converts an old model instance
            into a new model instance (or a dict that validates as one).

    Returns:
        A new ``Topic[OutputT]`` bound to the same name and source.

    Raises:
        ValueError: The previous dedup fields are not present on
            ``new_model``.
        TypeError: ``new_model`` is not a Pydantic ``BaseModel`` subclass.
        pydantic.ValidationError: A migrated payload does not match
            ``new_model``.
    """
    new_model = _validate_model_class(new_model, argument="new_model")  # type: ignore[assignment]
    rows = self.source._conn.execute(
        "SELECT * FROM messages WHERE topic = ? ORDER BY id", (self.name,)
    ).fetchall()
    new_payloads: list[tuple[int, dict[str, Any], str, str | None]] = []
    old_dedup_fields = tuple(json.loads(self.source._topic_row(self.name)["dedup_fields"]))
    _validate_dedup_fields(new_model, old_dedup_fields)
    new_schema = _schema_for(new_model)
    new_schema_hash = _schema_hash(new_model)

    for row in rows:
        old_payload = json.loads(row["payload_json"])
        old_instance = self.model.model_validate(old_payload)
        migrated = new_model.model_validate(migration_function(old_instance))
        payload_dict = _model_to_payload(migrated)
        payload_hash = _stable_hash(payload_dict)
        dedup_hash = self._dedup_hash_for_payload(payload_dict, old_dedup_fields)
        new_payloads.append((row["id"], payload_dict, payload_hash, dedup_hash))

    with self.source._conn:
        self.source._conn.execute(
            """
            UPDATE topics
            SET schema_json = ?, schema_hash = ?
            WHERE name = ?
            """,
            (_canonical_json(new_schema), new_schema_hash, self.name),
        )
        for message_id, payload_dict, payload_hash, dedup_hash in new_payloads:
            self.source._conn.execute(
                """
                UPDATE messages
                SET payload_json = ?, schema_hash = ?, payload_hash = ?, dedup_hash = ?
                WHERE id = ? AND topic = ?
                """,
                (
                    _canonical_json(payload_dict),
                    new_schema_hash,
                    payload_hash,
                    dedup_hash,
                    message_id,
                    self.name,
                ),
            )

    self.source._models[self.name] = new_model
    return Topic(self.source, self.name, new_model)

pipe

pipe(fn: Callable[[ModelT], Any]) -> Pipeline[ModelT, Any]

Start building a pipeline that transforms this topic.

Returns a Pipeline with self as the source and fn as the transformation. Chain .to(target) to designate a target topic, then call .run() to execute. Without .to(...) the pipeline simply collects the return values of fn.

Parameters:

Name Type Description Default
fn Callable[[ModelT], Any]

Callable that takes a model instance of this topic's type and returns either a model instance for the target topic, a dict that validates against the target's model, or None (which acks the source record without writing).

required

Returns:

Type Description
Pipeline[ModelT, Any]

A Pipeline[ModelT, Any] ready for .to(target).

Examples:

raw.pipe(clean).to(clean_topic).run()
Source code in src/minikafka/core.py
def pipe(self, fn: Callable[[ModelT], Any]) -> Pipeline[ModelT, Any]:
    """Start building a pipeline that transforms this topic.

    Returns a ``Pipeline`` with ``self`` as the source and ``fn`` as
    the transformation. Chain ``.to(target)`` to designate a target
    topic, then call ``.run()`` to execute. Without ``.to(...)`` the
    pipeline simply collects the return values of ``fn``.

    Args:
        fn: Callable that takes a model instance of this topic's type
            and returns either a model instance for the target topic,
            a ``dict`` that validates against the target's model, or
            ``None`` (which acks the source record without writing).

    Returns:
        A ``Pipeline[ModelT, Any]`` ready for ``.to(target)``.

    Examples:
        ```python
        raw.pipe(clean).to(clean_topic).run()
        ```
    """
    return Pipeline(self, fn)

set_handled

set_handled(_id: int | None = None, *, record: Record[Any] | None = None) -> None

Mark a record as handled.

Pass either the integer record id positionally or a Record instance via the record= keyword. The transition is idempotent: calling set_handled on an already-handled row simply refreshes handled_at.

Parameters:

Name Type Description Default
_id int | None

Numeric primary key of the record to ack.

None
record Record[Any] | None

Alternative — a Record returned by iter_new / iter_handled with records=True.

None

Raises:

Type Description
ValueError

Neither _id nor record was provided.

Emits

message_handled with kwargs topic and id.

Source code in src/minikafka/core.py
def set_handled(
    self, _id: int | None = None, *, record: Record[Any] | None = None
) -> None:
    """Mark a record as ``handled``.

    Pass either the integer record id positionally or a ``Record``
    instance via the ``record=`` keyword. The transition is idempotent:
    calling ``set_handled`` on an already-handled row simply refreshes
    ``handled_at``.

    Args:
        _id: Numeric primary key of the record to ack.
        record: Alternative — a ``Record`` returned by ``iter_new`` /
            ``iter_handled`` with ``records=True``.

    Raises:
        ValueError: Neither ``_id`` nor ``record`` was provided.

    Emits:
        ``message_handled`` with kwargs ``topic`` and ``id``.
    """
    message_id = _id if _id is not None else record.id if record is not None else None
    if message_id is None:
        raise ValueError("set_handled requires _id or record")
    self.source._conn.execute(
        """
        UPDATE messages
        SET handled_at = ?, status = 'handled'
        WHERE id = ? AND topic = ?
        """,
        (_utc_now(), message_id, self.name),
    )
    self.source._conn.commit()
    self.source._emit("message_handled", topic=self.name, id=message_id)

to_polars

to_polars() -> Any

Return all rows in this topic as a Polars DataFrame.

Includes both new and handled rows. Only flat (non-nested) payloads are supported — nested dict / list / tuple values raise ValueError.

Returns:

Type Description
Any

A polars.DataFrame with one row per record.

Raises:

Type Description
ImportError

polars is not installed. Install with pip install minikafka[polars].

ValueError

A payload contains a nested collection.

Source code in src/minikafka/core.py
def to_polars(self) -> Any:
    """Return all rows in this topic as a Polars ``DataFrame``.

    Includes both ``new`` and ``handled`` rows. Only flat (non-nested)
    payloads are supported — nested ``dict`` / ``list`` / ``tuple``
    values raise ``ValueError``.

    Returns:
        A ``polars.DataFrame`` with one row per record.

    Raises:
        ImportError: ``polars`` is not installed. Install with
            ``pip install minikafka[polars]``.
        ValueError: A payload contains a nested collection.
    """
    try:
        import polars as pl
    except ImportError as exc:
        raise ImportError(
            "to_polars() requires the optional 'polars' dependency"
        ) from exc
    rows = [
        json.loads(row["payload_json"])
        for row in self.source._conn.execute(
            "SELECT payload_json FROM messages WHERE topic = ? ORDER BY id",
            (self.name,),
        )
    ]
    for row in rows:
        for key, value in row.items():
            if isinstance(value, (dict, list, tuple)):
                raise ValueError(f"to_polars only supports flat payloads; {key!r} is nested")
    return pl.DataFrame(rows)