FullPipeline¶
minikafka.FullPipeline ¶
Multi-pipeline DAG runner with fan-out and fan-in support.
Build with Source.full_pipeline(p1, p2, ...). Pipelines that share
a source topic become siblings (fan-out): every source row is fed
to each sibling's fn. Pipelines whose source topic is another
pipeline's target form a chain (fan-in): source topics are run in
topological order so that the same call processes the new rows
produced upstream.
Two execution strategies are available:
"strict"(default) — sibling transforms are computed for a row, then the inserts and the source-row ack happen inside one SQLite transaction. Any exception aborts the whole run with no partial state."best_effort"— each sibling runs in isolation; failures are collected intoFanOutFailurerecords. The parent row is markedhandledonly when every sibling succeeded. After the run finishes, any failures raiseFanOutErrorso they cannot be ignored, but the successful sibling writes are preserved.
Attributes:
| Name | Type | Description |
|---|---|---|
pipelines |
The list of pipelines, in the order they were passed in. |
Source code in src/minikafka/core.py
plot ¶
Return a Mermaid graph TD representation of the DAG.
Each pipeline contributes one edge source --> target;
pipelines without a target produce a bare node. Embed the
returned string in a fenced ```mermaid block to render.
Returns:
| Type | Description |
|---|---|
str
|
The Mermaid graph as a single string. |
Source code in src/minikafka/core.py
run ¶
run(*, dry_run: bool = False, strategy: Literal['strict', 'best_effort'] = 'strict') -> list[list[Any]]
Execute the DAG.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dry_run
|
bool
|
If |
False
|
strategy
|
Literal['strict', 'best_effort']
|
Either |
'strict'
|
Returns:
| Type | Description |
|---|---|
list[list[Any]]
|
A list of result lists — one inner list per input pipeline, |
list[list[Any]]
|
in the same order that |
Raises:
| Type | Description |
|---|---|
ValueError
|
|
FanOutError
|
One or more siblings failed during a
|
Examples:
results = source.full_pipeline(
raw.pipe(clean).to(clean_topic),
clean_topic.pipe(score).to(feed),
).run(strategy="best_effort")