Skip to content

Debug pipeline

This document demonstrates how you might use a DebugPipeline. It is much like a normal scikit-learn Pipeline but it offers more debugging options.

We'll first set up libraries and config.

Setup
import logging
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin

from sklego.pipeline import DebugPipeline

logging.basicConfig(
    format=("[%(funcName)s:%(lineno)d] - %(message)s"),
    level=logging.INFO
)

Next up, let's make a simple transformer.

Simple transformer
n_samples, n_features = 3, 5
X = np.zeros((n_samples, n_features))
y = np.arange(n_samples)


class Adder(TransformerMixin, BaseEstimator):
    def __init__(self, value):
        self._value = value

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return X + self._value

    def __repr__(self):
        return f"Adder(value={self._value})"


steps = [
    ("add_1", Adder(value=1)),
    ("add_10", Adder(value=10)),
    ("add_100", Adder(value=100)),
    ("add_1000", Adder(value=1000)),
]

This pipeline behaves exactly the same as a normal pipeline. So let's use it.

Simple transformer
pipe = DebugPipeline(steps)
_ = pipe.fit(X, y=y)

X_out = pipe.transform(X)
print("Transformed X:\n", X_out)
Transformed X:
 [[1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]]

Log statements

It is possible to set a log_callback variable that logs in between each step.

Note

There are three log statements while there are four steps, because there are three moments in between the steps. The output can be checked outside of the pipeline.

'default' log_callback
pipe = DebugPipeline(steps, log_callback="default")
_ = pipe.fit(X, y=y)

X_out = pipe.transform(X)
print("Transformed X:\n", X_out)
[default_log_callback:38] - [Adder(value=1)] shape=(3, 5) time=0s
[default_log_callback:38] - [Adder(value=10)] shape=(3, 5) time=0s
[default_log_callback:38] - [Adder(value=100)] shape=(3, 5) time=0s
Transformed X:
 [[1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]]

Set the log_callback function later

It is possible to set the log_callback later then initialisation.

log_callback after initialisation
pipe = DebugPipeline(steps)
pipe.log_callback = "default"

_ = pipe.fit(X, y=y)

X_out = pipe.transform(X)
print("Transformed X:\n", X_out)
[default_log_callback:38] - [Adder(value=1)] shape=(3, 5) time=0s
[default_log_callback:38] - [Adder(value=10)] shape=(3, 5) time=0s
[default_log_callback:38] - [Adder(value=100)] shape=(3, 5) time=0s
Transformed X:
 [[1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]]

Custom log_callback

The custom log callback function expect the output of each step, which is an tuple containing the output of the step and the step itself, and the execution time of the step.

Custom log_callback
def log_callback(output, execution_time, **kwargs):
    """My custom `log_callback` function

    Parameters
    ----------
    output : tuple(
            numpy.ndarray or pandas.DataFrame
            :class:estimator or :class:transformer
        )
        The output of the step and a step in the pipeline.
    execution_time : float
        The execution time of the step.
    """
    logger = logging.getLogger(__name__)
    step_result, step = output
    logger.info(f"[{step}] shape={step_result.shape} "
                f"nbytes={step_result.nbytes} time={execution_time}")


pipe.log_callback = log_callback
_ = pipe.fit(X, y=y)

X_out = pipe.transform(X)
print("Transformed X:\n", X_out)
[log_callback:16] - [Adder(value=1)] shape=(3, 5) nbytes=120 time=5.340576171875e-05
[log_callback:16] - [Adder(value=10)] shape=(3, 5) nbytes=120 time=6.651878356933594e-05
[log_callback:16] - [Adder(value=100)] shape=(3, 5) nbytes=120 time=6.723403930664062e-05
Transformed X:
 [[1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]]

Feature union

Feature union also works with the debug pipeline.

Feature union
from sklearn.pipeline import FeatureUnion

pipe_w_default_log_callback = DebugPipeline(steps, log_callback='default')
pipe_w_custom_log_callback = DebugPipeline(steps, log_callback=log_callback)

pipe_union = FeatureUnion([
    ('pipe_w_default_log_callback', pipe_w_default_log_callback),
    ('pipe_w_custom_log_callback', pipe_w_custom_log_callback),
])

_ = pipe_union.fit(X, y=y)

X_out = pipe_union.transform(X)
print('Transformed X:\n', X_out)
[default_log_callback:38] - [Adder(value=1)] shape=(3, 5) time=0s
[default_log_callback:38] - [Adder(value=10)] shape=(3, 5) time=0s
[default_log_callback:38] - [Adder(value=100)] shape=(3, 5) time=0s
[log_callback:16] - [Adder(value=1)] shape=(3, 5) nbytes=120 time=4.482269287109375e-05
[log_callback:16] - [Adder(value=10)] shape=(3, 5) nbytes=120 time=5.1021575927734375e-05
[log_callback:16] - [Adder(value=100)] shape=(3, 5) nbytes=120 time=6.365776062011719e-05
Transformed X:
 [[1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111. 1111.]]

Enough logging

Remove the log_callback function when not needed anymore.

Remove log_callback
pipe.log_callback = None
_ = pipe.fit(X, y=y)

X_out = pipe.transform(X)
print('Transformed X:\n', X_out)
Transformed X:
 [[1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]
 [1111. 1111. 1111. 1111. 1111.]]