Skip to content

FullPipeline

minikafka.FullPipeline

FullPipeline(pipelines: list[Pipeline[Any, Any]])

Multi-pipeline DAG runner with fan-out and fan-in support.

Build with Source.full_pipeline(p1, p2, ...). Pipelines that share a source topic become siblings (fan-out): every source row is fed to each sibling's fn. Pipelines whose source topic is another pipeline's target form a chain (fan-in): source topics are run in topological order so that the same call processes the new rows produced upstream.

Two execution strategies are available:

  • "strict" (default) — sibling transforms are computed for a row, then the inserts and the source-row ack happen inside one SQLite transaction. Any exception aborts the whole run with no partial state.
  • "best_effort" — each sibling runs in isolation; failures are collected into FanOutFailure records. The parent row is marked handled only when every sibling succeeded. After the run finishes, any failures raise FanOutError so they cannot be ignored, but the successful sibling writes are preserved.

Attributes:

Name Type Description
pipelines

The list of pipelines, in the order they were passed in.

Source code in src/minikafka/core.py
def __init__(self, pipelines: list[Pipeline[Any, Any]]):
    self.pipelines = pipelines

plot

plot() -> str

Return a Mermaid graph TD representation of the DAG.

Each pipeline contributes one edge source --> target; pipelines without a target produce a bare node. Embed the returned string in a fenced ```mermaid block to render.

Returns:

Type Description
str

The Mermaid graph as a single string.

Source code in src/minikafka/core.py
def plot(self) -> str:
    """Return a Mermaid ``graph TD`` representation of the DAG.

    Each pipeline contributes one edge ``source --> target``;
    pipelines without a target produce a bare node. Embed the
    returned string in a fenced ` ```mermaid ` block to render.

    Returns:
        The Mermaid graph as a single string.
    """
    lines = ["graph TD"]
    for pipeline in self.pipelines:
        if pipeline.target_name is None:
            lines.append(f"    {pipeline.source_name}")
        else:
            lines.append(f"    {pipeline.source_name} --> {pipeline.target_name}")
    return "\n".join(lines)

run

run(*, dry_run: bool = False, strategy: Literal['strict', 'best_effort'] = 'strict') -> list[list[Any]]

Execute the DAG.

Parameters:

Name Type Description Default
dry_run bool

If True, run all transforms and validate their outputs but do not write to any target topic or transition source rows.

False
strategy Literal['strict', 'best_effort']

Either "strict" (transactional, abort on any error) or "best_effort" (collect failures, ack parent only when all siblings succeed, raise at the end).

'strict'

Returns:

Type Description
list[list[Any]]

A list of result lists — one inner list per input pipeline,

list[list[Any]]

in the same order that pipelines was constructed in.

Raises:

Type Description
ValueError

strategy is not one of "strict" or "best_effort". Also raised if the DAG contains a cycle.

FanOutError

One or more siblings failed during a best_effort run.

Examples:

results = source.full_pipeline(
    raw.pipe(clean).to(clean_topic),
    clean_topic.pipe(score).to(feed),
).run(strategy="best_effort")
Source code in src/minikafka/core.py
def run(
    self,
    *,
    dry_run: bool = False,
    strategy: Literal["strict", "best_effort"] = "strict",
) -> list[list[Any]]:
    """Execute the DAG.

    Args:
        dry_run: If ``True``, run all transforms and validate their
            outputs but do not write to any target topic or transition
            source rows.
        strategy: Either ``"strict"`` (transactional, abort on any
            error) or ``"best_effort"`` (collect failures, ack parent
            only when all siblings succeed, raise at the end).

    Returns:
        A list of result lists — one inner list per input pipeline,
        in the same order that ``pipelines`` was constructed in.

    Raises:
        ValueError: ``strategy`` is not one of ``"strict"`` or
            ``"best_effort"``. Also raised if the DAG contains a cycle.
        FanOutError: One or more siblings failed during a
            ``best_effort`` run.

    Examples:
        ```python
        results = source.full_pipeline(
            raw.pipe(clean).to(clean_topic),
            clean_topic.pipe(score).to(feed),
        ).run(strategy="best_effort")
        ```
    """
    if strategy not in ("strict", "best_effort"):
        raise ValueError(
            f"strategy must be 'strict' or 'best_effort', got {strategy!r}"
        )

    results: dict[int, list[Any]] = {id(p): [] for p in self.pipelines}
    failures: list[FanOutFailure] = []

    for source_topic, siblings in self._grouped_in_order():
        for record in list(source_topic.iter_new(records=True)):
            assert isinstance(record, Record)
            if strategy == "strict":
                self._run_row_strict(
                    source_topic, siblings, record, dry_run, results
                )
            else:
                self._run_row_best_effort(
                    source_topic, siblings, record, dry_run, results, failures
                )

    if failures:
        raise FanOutError(failures)

    return [results[id(p)] for p in self.pipelines]