Skip to content

Pipeline

minikafka.Pipeline

Pipeline(source_topic: Topic[ModelT], fn: Callable[[ModelT], OutputT])

Bases: Generic[ModelT, OutputT]

Single-source transformation pipeline between two topics.

Build with topic.pipe(fn) and optionally .to(target). When run, the pipeline iterates the source topic's new rows, applies fn to each decoded payload, writes the result to the target (if any), and marks the source row as handled. Each row is processed inside a single SQLite transaction.

If fn returns None for a row, the source record is still acked but nothing is written to the target.

Attributes:

Name Type Description
source_topic

The topic rows are read from.

fn

The transformation callable.

target_topic Topic[Any] | None

The target topic, or None until .to(...) is called.

Examples:

pipeline = raw.pipe(clean).to(clean_topic)
results = pipeline.run()
print(pipeline.plot())  # Mermaid graph
Source code in src/minikafka/core.py
def __init__(self, source_topic: Topic[ModelT], fn: Callable[[ModelT], OutputT]):
    self.source_topic = source_topic
    self.fn = fn
    self.target_topic: Topic[Any] | None = None

source_name property

source_name: str

Name of the source topic.

target_name property

target_name: str | None

Name of the target topic, or None if no target was set.

plot

plot() -> str

Return a one-edge Mermaid graph TD diagram for this pipeline.

Equivalent to wrapping self in a FullPipeline and calling plot on it. Drop the returned string into a fenced ```mermaid block in markdown to render the diagram.

Source code in src/minikafka/core.py
def plot(self) -> str:
    """Return a one-edge Mermaid ``graph TD`` diagram for this pipeline.

    Equivalent to wrapping ``self`` in a ``FullPipeline`` and calling
    ``plot`` on it. Drop the returned string into a fenced
    ` ```mermaid ` block in markdown to render the diagram.
    """
    return FullPipeline([self]).plot()

run

run(*, dry_run: bool = False) -> list[Any]

Execute the pipeline over every new row in the source topic.

For each row: decode the payload, call fn, validate the result against the target's model (if any), then within a single transaction insert the result and mark the source row as handled. Returns the list of transformed outputs in insertion order.

Parameters:

Name Type Description Default
dry_run bool

If True, run fn and validate the results but do not write anything to the target topic or transition source rows. Useful for previewing.

False

Returns:

Type Description
list[Any]

The list of outputs in source-row order. If the pipeline has

list[Any]

no target, this is the raw return value of fn; with a

list[Any]

target, it is the validated target-model instance (or None

list[Any]

for rows where fn returned None).

Raises:

Type Description
DuplicateMessageError

An output row would violate the target topic's dedup constraint.

ValidationError

fn returned a value that does not validate against the target topic's model.

Emits

pipeline_start before processing and pipeline_end after (with count and dry_run). Each handled source row also emits message_handled.

Source code in src/minikafka/core.py
def run(self, *, dry_run: bool = False) -> list[Any]:
    """Execute the pipeline over every ``new`` row in the source topic.

    For each row: decode the payload, call ``fn``, validate the result
    against the target's model (if any), then within a single
    transaction insert the result and mark the source row as
    ``handled``. Returns the list of transformed outputs in
    insertion order.

    Args:
        dry_run: If ``True``, run ``fn`` and validate the results but
            do not write anything to the target topic or transition
            source rows. Useful for previewing.

    Returns:
        The list of outputs in source-row order. If the pipeline has
        no target, this is the raw return value of ``fn``; with a
        target, it is the validated target-model instance (or ``None``
        for rows where ``fn`` returned ``None``).

    Raises:
        DuplicateMessageError: An output row would violate the target
            topic's dedup constraint.
        pydantic.ValidationError: ``fn`` returned a value that does not
            validate against the target topic's model.

    Emits:
        ``pipeline_start`` before processing and ``pipeline_end`` after
        (with ``count`` and ``dry_run``). Each handled source row also
        emits ``message_handled``.
    """
    source = self.source_topic.source
    source._emit(
        "pipeline_start",
        source=self.source_topic.name,
        target=self.target_name,
    )
    rows = list(self.source_topic.iter_new(records=True))
    results: list[Any] = []
    for record in rows:
        assert isinstance(record, Record)
        result = self.fn(record.data)
        if self.target_topic is None:
            results.append(result)
            continue
        if result is None:
            results.append(None)
            if not dry_run:
                self.source_topic.set_handled(record=record)
            continue
        target_model = self.target_topic._validate(result)
        results.append(target_model)
        if dry_run:
            continue
        target_payload = _model_to_payload(target_model)
        with source._conn:
            self.target_topic._insert_payload_in_current_transaction(target_payload)
            source._conn.execute(
                """
                UPDATE messages
                SET handled_at = ?, status = 'handled'
                WHERE id = ? AND topic = ?
                """,
                (_utc_now(), record.id, self.source_topic.name),
            )
        source._emit(
            "message_handled", topic=self.source_topic.name, id=record.id
        )
    source._emit(
        "pipeline_end",
        source=self.source_topic.name,
        target=self.target_name,
        count=len(results),
        dry_run=dry_run,
    )
    return results

to

to(target: Topic[OutputT] | str) -> Pipeline[ModelT, OutputT]

Designate the target topic for this pipeline.

Parameters:

Name Type Description Default
target Topic[OutputT] | str

Either a Topic instance or the string name of a topic already registered on the source's Source.

required

Returns:

Type Description
Pipeline[ModelT, OutputT]

self — so calls can be chained

Pipeline[ModelT, OutputT]

(topic.pipe(fn).to(target).run()).

Raises:

Type Description
KeyError

target is a string and no topic by that name is registered on the underlying Source (call Source.topic(name, model) first to register it).

Source code in src/minikafka/core.py
def to(self, target: Topic[OutputT] | str) -> Pipeline[ModelT, OutputT]:
    """Designate the target topic for this pipeline.

    Args:
        target: Either a ``Topic`` instance or the string name of a
            topic already registered on the source's ``Source``.

    Returns:
        ``self`` — so calls can be chained
        (``topic.pipe(fn).to(target).run()``).

    Raises:
        KeyError: ``target`` is a string and no topic by that name is
            registered on the underlying ``Source`` (call
            ``Source.topic(name, model)`` first to register it).
    """
    if isinstance(target, str):
        target = self.source_topic.source._registered_topic(target)
    self.target_topic = target
    return self