Pipeline¶
minikafka.Pipeline ¶
Bases: Generic[ModelT, OutputT]
Single-source transformation pipeline between two topics.
Build with topic.pipe(fn) and optionally .to(target). When run,
the pipeline iterates the source topic's new rows, applies fn
to each decoded payload, writes the result to the target (if any), and
marks the source row as handled. Each row is processed inside a
single SQLite transaction.
If fn returns None for a row, the source record is still
acked but nothing is written to the target.
Attributes:
| Name | Type | Description |
|---|---|---|
source_topic |
The topic rows are read from. |
|
fn |
The transformation callable. |
|
target_topic |
Topic[Any] | None
|
The target topic, or |
Examples:
pipeline = raw.pipe(clean).to(clean_topic)
results = pipeline.run()
print(pipeline.plot()) # Mermaid graph
Source code in src/minikafka/core.py
target_name
property
¶
Name of the target topic, or None if no target was set.
plot ¶
Return a one-edge Mermaid graph TD diagram for this pipeline.
Equivalent to wrapping self in a FullPipeline and calling
plot on it. Drop the returned string into a fenced
```mermaid block in markdown to render the diagram.
Source code in src/minikafka/core.py
run ¶
Execute the pipeline over every new row in the source topic.
For each row: decode the payload, call fn, validate the result
against the target's model (if any), then within a single
transaction insert the result and mark the source row as
handled. Returns the list of transformed outputs in
insertion order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dry_run
|
bool
|
If |
False
|
Returns:
| Type | Description |
|---|---|
list[Any]
|
The list of outputs in source-row order. If the pipeline has |
list[Any]
|
no target, this is the raw return value of |
list[Any]
|
target, it is the validated target-model instance (or |
list[Any]
|
for rows where |
Raises:
| Type | Description |
|---|---|
DuplicateMessageError
|
An output row would violate the target topic's dedup constraint. |
ValidationError
|
|
Emits
pipeline_start before processing and pipeline_end after
(with count and dry_run). Each handled source row also
emits message_handled.
Source code in src/minikafka/core.py
907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 | |
to ¶
Designate the target topic for this pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target
|
Topic[OutputT] | str
|
Either a |
required |
Returns:
| Type | Description |
|---|---|
Pipeline[ModelT, OutputT]
|
|
Pipeline[ModelT, OutputT]
|
( |
Raises:
| Type | Description |
|---|---|
KeyError
|
|