Skip to content

Record

minikafka.Record dataclass

Record(id: int, topic: str, created_at: datetime, handled_at: datetime | None, status: str, payload_hash: str, dedup_hash: str | None, data: ModelT, _source: Any = None)

Bases: Generic[ModelT]

A stored message returned by Topic.iter_new / Topic.iter_handled.

Records are frozen snapshots of the decoded Pydantic payload plus the storage metadata kept in SQLite. Call record.ack() or Topic.set_handled to transition the stored row from new to handled.

Attributes:

Name Type Description
id int

Auto-incrementing primary key, unique within the database.

topic str

Name of the topic the record belongs to.

created_at datetime

UTC timestamp of when the record was inserted.

handled_at datetime | None

UTC timestamp of when the record was acknowledged, or None if it is still in the new state.

status str

Either "new" or "handled".

payload_hash str

SHA-256 over the canonical JSON of the full payload.

dedup_hash str | None

SHA-256 over the dedup-field subset, or None when the topic has no dedup configuration.

data ModelT

The decoded Pydantic model instance.

Examples:

for record in topic.iter_new(records=True):
    print(record.id, record.created_at, record.data)
    record.ack()

ack

ack() -> None

Mark this record as handled.

Shorthand for topic.set_handled(record=self). The operation is idempotent: calling ack() on an already-handled record simply refreshes handled_at.

Raises:

Type Description
RuntimeError

The record was not produced by a topic iterator (missing source reference).

Source code in src/minikafka/core.py
def ack(self) -> None:
    """Mark this record as ``handled``.

    Shorthand for ``topic.set_handled(record=self)``. The operation
    is idempotent: calling ``ack()`` on an already-handled record
    simply refreshes ``handled_at``.

    Raises:
        RuntimeError: The record was not produced by a topic iterator
            (missing source reference).
    """
    if self._source is None:
        raise RuntimeError(
            "ack() requires a source reference; use records from "
            "Topic.iter_new(records=True) or Topic.iter_handled(records=True)"
        )
    self._source._conn.execute(
        """
        UPDATE messages
        SET handled_at = ?, status = 'handled'
        WHERE id = ? AND topic = ?
        """,
        (_utc_now(), self.id, self.topic),
    )
    self._source._conn.commit()
    self._source._emit("message_handled", topic=self.topic, id=self.id)

FanOutFailure

minikafka.FanOutFailure dataclass

FanOutFailure(record_id: int, source: str, target: str | None, exception: BaseException)

A single sibling failure recorded during a best_effort fan-out run.

Instances are collected on the failures list of FanOutError when FullPipeline.run(strategy="best_effort") is used. The parent record that triggered the failure is left in the new state so a corrected run can retry it.

Attributes:

Name Type Description
record_id int

Primary key of the parent record in its source topic.

source str

Name of the topic that produced the record.

target str | None

Name of the target topic the sibling pipeline was writing to, or None if the pipeline had no target.

exception BaseException

The original exception raised by the sibling pipeline.