Skip to content

Source

minikafka.Source

Source(path: str | Path, *, on_event: Callable[..., None] | None = None)

SQLite-backed queue source — entry point to minikafka.

A Source owns a single SQLite connection and tracks the Pydantic model classes registered to its topics. Use Source.topic to create or attach to a topic, and Source.full_pipeline to compose multi-topic pipelines.

Pass on_event to observe activity. The callable is invoked as on_event(event_name, **kwargs) for these events:

  • topic_created — kwargs: name, model, dedup
  • message_appended — kwargs: topic, payload
  • message_handled — kwargs: topic, id
  • pipeline_start — kwargs: source, target
  • pipeline_end — kwargs: source, target, count, dry_run

Exceptions raised inside on_event are swallowed so logging cannot break the pipeline.

Examples:

from pydantic import BaseModel
from minikafka import Source

class Video(BaseModel):
    url: str
    title: str

with Source(":memory:") as source:
    videos = source.topic("videos", Video, dedup=("url",))
    videos.append({"url": "https://example.com", "title": "hello"})

Open or create a SQLite-backed source.

Parameters:

Name Type Description Default
path str | Path

Path to the SQLite database file. Use ":memory:" for an in-process database that is discarded on close().

required
on_event Callable[..., None] | None

Optional observer callback invoked as on_event(event_name, **kwargs). See the class docstring for the event names and their kwargs.

None
Source code in src/minikafka/core.py
def __init__(
    self,
    path: str | Path,
    *,
    on_event: Callable[..., None] | None = None,
):
    """Open or create a SQLite-backed source.

    Args:
        path: Path to the SQLite database file. Use ``":memory:"`` for
            an in-process database that is discarded on ``close()``.
        on_event: Optional observer callback invoked as
            ``on_event(event_name, **kwargs)``. See the class docstring
            for the event names and their kwargs.
    """
    self.path = str(path)
    self._on_event = on_event
    self._conn = sqlite3.connect(self.path)
    self._conn.row_factory = sqlite3.Row
    self._models: dict[str, type[BaseModel]] = {}
    self._init_db()

close

close() -> None

Close the underlying SQLite connection.

After calling close the Source (and any Topic / Pipeline referencing it) can no longer be used.

Source code in src/minikafka/core.py
def close(self) -> None:
    """Close the underlying SQLite connection.

    After calling ``close`` the ``Source`` (and any ``Topic`` /
    ``Pipeline`` referencing it) can no longer be used.
    """
    self._conn.close()

full_pipeline

full_pipeline(*pipelines: Pipeline[Any, Any]) -> FullPipeline

Compose multiple pipelines into a single executable DAG.

Pipelines that share a source topic become siblings (fan-out); pipelines whose target is another pipeline's source form a chain (fan-in). The resulting FullPipeline runs source topics in topological order so that downstream pipelines see the rows produced upstream within the same run() call.

Parameters:

Name Type Description Default
*pipelines Pipeline[Any, Any]

Any number of Pipeline instances built via topic.pipe(fn).to(target).

()

Returns:

Type Description
FullPipeline

A FullPipeline ready to run() or plot().

Examples:

source.full_pipeline(
    raw.pipe(clean).to(clean_topic),
    clean_topic.pipe(score).to(feed),
).run()
Source code in src/minikafka/core.py
def full_pipeline(self, *pipelines: Pipeline[Any, Any]) -> FullPipeline:
    """Compose multiple pipelines into a single executable DAG.

    Pipelines that share a source topic become **siblings** (fan-out);
    pipelines whose target is another pipeline's source form a chain
    (fan-in). The resulting ``FullPipeline`` runs source topics in
    topological order so that downstream pipelines see the rows
    produced upstream within the same ``run()`` call.

    Args:
        *pipelines: Any number of ``Pipeline`` instances built via
            ``topic.pipe(fn).to(target)``.

    Returns:
        A ``FullPipeline`` ready to ``run()`` or ``plot()``.

    Examples:
        ```python
        source.full_pipeline(
            raw.pipe(clean).to(clean_topic),
            clean_topic.pipe(score).to(feed),
        ).run()
        ```
    """
    return FullPipeline(list(pipelines))

topic

topic(name: str, model: type[ModelT], *, dedup: Sequence[str] | None) -> Topic[ModelT]

Create or attach to a typed topic.

If the topic does not exist in the database it is created with the given model's JSON schema and dedup fields. If it already exists, both the schema hash and the dedup-field tuple must match — if either differs, SchemaMismatchError is raised.

The model class is also remembered on this Source instance so that pipelines can resolve topics by name and return decoded Pydantic instances.

Parameters:

Name Type Description Default
name str

Topic name. Must be stable across reopens.

required
model type[ModelT]

Pydantic BaseModel subclass describing the payload.

required
dedup Sequence[str] | None

Tuple of field names (must all exist on model) used to enforce uniqueness, or None to disable dedup.

required

Returns:

Type Description
Topic[ModelT]

A Topic[ModelT] bound to this source.

Raises:

Type Description
SchemaMismatchError

The topic exists with a different schema or different dedup configuration.

TypeError

model is not a Pydantic BaseModel subclass.

ValueError

dedup references fields that are not on model, or contains duplicates.

Emits

topic_created only when a new row is inserted in the topics table (not when re-attaching to an existing topic).

Source code in src/minikafka/core.py
def topic(
    self,
    name: str,
    model: type[ModelT],
    *,
    dedup: Sequence[str] | None,
) -> Topic[ModelT]:
    """Create or attach to a typed topic.

    If the topic does not exist in the database it is created with the
    given model's JSON schema and dedup fields. If it already exists,
    both the schema hash and the dedup-field tuple must match — if
    either differs, ``SchemaMismatchError`` is raised.

    The model class is also remembered on this ``Source`` instance so
    that pipelines can resolve topics by name and return decoded
    Pydantic instances.

    Args:
        name: Topic name. Must be stable across reopens.
        model: Pydantic ``BaseModel`` subclass describing the payload.
        dedup: Tuple of field names (must all exist on ``model``) used
            to enforce uniqueness, or ``None`` to disable dedup.

    Returns:
        A ``Topic[ModelT]`` bound to this source.

    Raises:
        SchemaMismatchError: The topic exists with a different schema
            or different dedup configuration.
        TypeError: ``model`` is not a Pydantic ``BaseModel`` subclass.
        ValueError: ``dedup`` references fields that are not on
            ``model``, or contains duplicates.

    Emits:
        ``topic_created`` only when a new row is inserted in the
        ``topics`` table (not when re-attaching to an existing topic).
    """
    model = _validate_model_class(model)  # type: ignore[assignment]
    dedup_fields = _validate_dedup_fields(model, dedup)
    schema = _schema_for(model)
    schema_hash = _schema_hash(model)
    existing = self._topic_row(name)

    if existing is None:
        now = _utc_now()
        self._conn.execute(
            """
            INSERT INTO topics (name, schema_json, schema_hash, dedup_fields, created_at)
            VALUES (?, ?, ?, ?, ?)
            """,
            (
                name,
                _canonical_json(schema),
                schema_hash,
                _canonical_json(list(dedup_fields)),
                now,
            ),
        )
        self._conn.commit()
        self._emit(
            "topic_created",
            name=name,
            model=model.__name__,
            dedup=dedup_fields,
        )
    else:
        existing_dedup = tuple(json.loads(existing["dedup_fields"]))
        if existing["schema_hash"] != schema_hash:
            raise SchemaMismatchError(
                f"topic {name!r} exists with a different schema"
            )
        if existing_dedup != dedup_fields:
            raise SchemaMismatchError(
                f"topic {name!r} exists with a different dedup config"
            )

    self._models[name] = model
    return Topic(self, name, model)