Source¶
minikafka.Source ¶
SQLite-backed queue source — entry point to minikafka.
A Source owns a single SQLite connection and tracks the Pydantic
model classes registered to its topics. Use Source.topic to
create or attach to a topic, and Source.full_pipeline to compose
multi-topic pipelines.
Pass on_event to observe activity. The callable is invoked as
on_event(event_name, **kwargs) for these events:
topic_created— kwargs:name,model,dedupmessage_appended— kwargs:topic,payloadmessage_handled— kwargs:topic,idpipeline_start— kwargs:source,targetpipeline_end— kwargs:source,target,count,dry_run
Exceptions raised inside on_event are swallowed so logging cannot
break the pipeline.
Examples:
from pydantic import BaseModel
from minikafka import Source
class Video(BaseModel):
url: str
title: str
with Source(":memory:") as source:
videos = source.topic("videos", Video, dedup=("url",))
videos.append({"url": "https://example.com", "title": "hello"})
Open or create a SQLite-backed source.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path
|
Path to the SQLite database file. Use |
required |
on_event
|
Callable[..., None] | None
|
Optional observer callback invoked as
|
None
|
Source code in src/minikafka/core.py
close ¶
Close the underlying SQLite connection.
After calling close the Source (and any Topic /
Pipeline referencing it) can no longer be used.
full_pipeline ¶
Compose multiple pipelines into a single executable DAG.
Pipelines that share a source topic become siblings (fan-out);
pipelines whose target is another pipeline's source form a chain
(fan-in). The resulting FullPipeline runs source topics in
topological order so that downstream pipelines see the rows
produced upstream within the same run() call.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*pipelines
|
Pipeline[Any, Any]
|
Any number of |
()
|
Returns:
| Type | Description |
|---|---|
FullPipeline
|
A |
Examples:
Source code in src/minikafka/core.py
topic ¶
Create or attach to a typed topic.
If the topic does not exist in the database it is created with the
given model's JSON schema and dedup fields. If it already exists,
both the schema hash and the dedup-field tuple must match — if
either differs, SchemaMismatchError is raised.
The model class is also remembered on this Source instance so
that pipelines can resolve topics by name and return decoded
Pydantic instances.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Topic name. Must be stable across reopens. |
required |
model
|
type[ModelT]
|
Pydantic |
required |
dedup
|
Sequence[str] | None
|
Tuple of field names (must all exist on |
required |
Returns:
| Type | Description |
|---|---|
Topic[ModelT]
|
A |
Raises:
| Type | Description |
|---|---|
SchemaMismatchError
|
The topic exists with a different schema or different dedup configuration. |
TypeError
|
|
ValueError
|
|
Emits
topic_created only when a new row is inserted in the
topics table (not when re-attaching to an existing topic).
Source code in src/minikafka/core.py
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 | |