Skip to content

F3Pipeline

F3Pipeline

Bases: SequentialPipeline

A flexible sequential pipeline implementation for machine learning workflows.

F3Pipeline allows chaining multiple filters together and applying metrics for evaluation. It supports fitting, predicting, and evaluating data through the pipeline.

Key Features
  • Sequential application of multiple filters
  • Support for various metrics for evaluation
  • Configurable data storage and logging options
Usage
from framework3.plugins.pipelines.sequential import F3Pipeline
from framework3.plugins.filters.transformation import PCAPlugin
from framework3.plugins.filters.classification import ClassifierSVMPlugin
from framework3.plugins.metrics.classification import F1, Precision, Recall
from framework3.base import XYData
import numpy as np

# Create a pipeline with PCA and SVM
pipeline = F3Pipeline(
    filters=[
        PCAPlugin(n_components=2),
        ClassifierSVMPlugin(kernel='rbf', C=1.0)
    ],
    metrics=[F1(), Precision(), Recall()]
)

# Prepare some dummy data
X = XYData(value=np.random.rand(100, 10))
y = XYData(value=np.random.randint(0, 2, 100))

# Fit the pipeline
pipeline.fit(X, y)

# Make predictions
y_pred = pipeline.predict(X)

# Evaluate the pipeline
results = pipeline.evaluate(X, y, y_pred)
print(results)

Attributes:

Name Type Description
filters List[BaseFilter]

List of filters to be applied in the pipeline.

metrics List[BaseMetric]

List of metrics for evaluation.

overwrite bool

Whether to overwrite existing data in storage.

store bool

Whether to store intermediate results.

log bool

Whether to log pipeline operations.

Methods:

Name Description
fit

XYData, y: Optional[XYData]) -> None | float: Fit the pipeline to the input data.

predict

XYData) -> XYData: Make predictions using the fitted pipeline.

evaluate

XYData, y_true: XYData | None, y_pred: XYData) -> Dict[str, float]: Evaluate the pipeline using specified metrics.

start

XYData, y: Optional[XYData], X_: Optional[XYData]) -> Optional[XYData]: Start the pipeline execution by fitting and optionally predicting.

Source code in framework3/plugins/pipelines/sequential/f3_pipeline.py
@Container.bind()
class F3Pipeline(SequentialPipeline):
    """
    A flexible sequential pipeline implementation for machine learning workflows.

    F3Pipeline allows chaining multiple filters together and applying metrics
    for evaluation. It supports fitting, predicting, and evaluating data through the pipeline.

    Key Features:
        - Sequential application of multiple filters
        - Support for various metrics for evaluation
        - Configurable data storage and logging options

    Usage:
        ```python
        from framework3.plugins.pipelines.sequential import F3Pipeline
        from framework3.plugins.filters.transformation import PCAPlugin
        from framework3.plugins.filters.classification import ClassifierSVMPlugin
        from framework3.plugins.metrics.classification import F1, Precision, Recall
        from framework3.base import XYData
        import numpy as np

        # Create a pipeline with PCA and SVM
        pipeline = F3Pipeline(
            filters=[
                PCAPlugin(n_components=2),
                ClassifierSVMPlugin(kernel='rbf', C=1.0)
            ],
            metrics=[F1(), Precision(), Recall()]
        )

        # Prepare some dummy data
        X = XYData(value=np.random.rand(100, 10))
        y = XYData(value=np.random.randint(0, 2, 100))

        # Fit the pipeline
        pipeline.fit(X, y)

        # Make predictions
        y_pred = pipeline.predict(X)

        # Evaluate the pipeline
        results = pipeline.evaluate(X, y, y_pred)
        print(results)
        ```

    Attributes:
        filters (List[BaseFilter]): List of filters to be applied in the pipeline.
        metrics (List[BaseMetric]): List of metrics for evaluation.
        overwrite (bool): Whether to overwrite existing data in storage.
        store (bool): Whether to store intermediate results.
        log (bool): Whether to log pipeline operations.

    Methods:
        fit(x: XYData, y: Optional[XYData]) -> None | float: Fit the pipeline to the input data.
        predict(x: XYData) -> XYData: Make predictions using the fitted pipeline.
        evaluate(x_data: XYData, y_true: XYData | None, y_pred: XYData) -> Dict[str, float]:
            Evaluate the pipeline using specified metrics.
        start(x: XYData, y: Optional[XYData], X_: Optional[XYData]) -> Optional[XYData]:
            Start the pipeline execution by fitting and optionally predicting.
    """

    model_config = ConfigDict(extra="allow")

    def __init__(
        self,
        filters: List[BaseFilter],
        metrics: List[BaseMetric] = [],
        overwrite: bool = False,
        store: bool = False,
        log: bool = False,
    ) -> None:
        """
        Initialize the F3Pipeline.

        Args:
            filters (List[BaseFilter]): List of filters to be applied in the pipeline.
            metrics (List[BaseMetric], optional): List of metrics for evaluation. Defaults to [].
            overwrite (bool, optional): Whether to overwrite existing data. Defaults to False.
            store (bool, optional): Whether to store intermediate results. Defaults to False.
            log (bool, optional): Whether to log pipeline operations. Defaults to False.
        """
        super().__init__(
            filters=filters, metrics=metrics, overwrite=overwrite, store=store, log=log
        )
        self.filters: List[BaseFilter] = filters
        self.metrics: List[BaseMetric] = metrics
        self.overwrite = overwrite
        self.store = store
        self.log = log
        # self._filters: List[BaseFilter] = []

    def start(
        self, x: XYData, y: Optional[XYData], X_: Optional[XYData]
    ) -> Optional[XYData]:
        """
        Start the pipeline execution by fitting the model and making predictions.

        This method initiates the pipeline process by fitting the model to the input data
        and then making predictions.

        Args:
            x (XYData): The input data for fitting and prediction.
            y (Optional[XYData]): The target data for fitting, if available.
            X_ (Optional[XYData]): Additional input data for prediction, if different from x.

        Returns:
            Optional[XYData]: The predictions made by the pipeline, or None if an error occurs.

        Raises:
            Exception: If an error occurs during the fitting or prediction process.
        """
        try:
            self.fit(x, y)
            if X_ is not None:
                return self.predict(X_)
            else:
                return self.predict(x)
        except Exception as e:
            print(f"Error during pipeline execution: {e}")
            raise e

    def fit(self, x: XYData, y: Optional[XYData]) -> None | float:
        """
        Fit the pipeline to the input data.

        This method applies each filter in the pipeline sequentially to the input data,
        fitting each filter that requires training.

        Args:
            x (XYData): The input data to fit the pipeline on.
            y (Optional[XYData]): The target data, if available.

        Returns:
            None | float: The loss value from the last fitted filter, if any.

        Note:
            Filters that raise NotTrainableFilterError will be initialized instead of fitted.
        """
        self._print_acction("Fitting pipeline")
        loss = None
        for filter in self.filters:
            if self._verbose:
                rprint(filter)
            try:
                loss = filter.fit(x, y)
            except NotTrainableFilterError:
                filter.init()  # Initialize filter

            x = filter.predict(x)

        return loss

    def predict(self, x: XYData) -> XYData:
        """
        Make predictions using the fitted pipeline.

        This method applies each filter in the pipeline sequentially to the input data
        to generate predictions.

        Args:
            x (XYData): The input data to make predictions on.

        Returns:
            XYData: The predictions made by the pipeline.
        """

        self._print_acction("Predicting pipeline")

        for filter_ in self.filters:
            if self._verbose:
                rprint(filter_)
            x = filter_.predict(x)

        return x

    def evaluate(
        self, x_data: XYData, y_true: XYData | None, y_pred: XYData
    ) -> Dict[str, float]:
        """
        Evaluate the pipeline using the specified metrics.

        This method applies each metric in the pipeline to the predicted and true values,
        returning a dictionary of evaluation results.

        Args:
            x_data (XYData): The input data used for evaluation.
            y_true (XYData | None): The true target values, if available.
            y_pred (XYData): The predicted values.

        Returns:
            Dict[str, float]: A dictionary containing the evaluation results for each metric.

        Example:
            ```python
            >>> results = pipeline.evaluate(x_test, y_test, y_pred)
            >>> print(results)
            {'F1': 0.85, 'Precision': 0.80, 'Recall': 0.90}
            ```
        """

        self._print_acction("Evaluating pipeline...")

        evaluations = {}
        for metric in self.metrics:
            evaluations[metric.__class__.__name__] = metric.evaluate(
                x_data, y_true, y_pred
            )
        return evaluations

    def inner(self) -> List[BaseFilter]:
        """
        Get the list of filters in the pipeline.

        Returns:
            List[BaseFilter]: The list of filters in the pipeline.
        """
        return self.filters

__init__(filters, metrics=[], overwrite=False, store=False, log=False)

Initialize the F3Pipeline.

Parameters:

Name Type Description Default
filters List[BaseFilter]

List of filters to be applied in the pipeline.

required
metrics List[BaseMetric]

List of metrics for evaluation. Defaults to [].

[]
overwrite bool

Whether to overwrite existing data. Defaults to False.

False
store bool

Whether to store intermediate results. Defaults to False.

False
log bool

Whether to log pipeline operations. Defaults to False.

False
Source code in framework3/plugins/pipelines/sequential/f3_pipeline.py
def __init__(
    self,
    filters: List[BaseFilter],
    metrics: List[BaseMetric] = [],
    overwrite: bool = False,
    store: bool = False,
    log: bool = False,
) -> None:
    """
    Initialize the F3Pipeline.

    Args:
        filters (List[BaseFilter]): List of filters to be applied in the pipeline.
        metrics (List[BaseMetric], optional): List of metrics for evaluation. Defaults to [].
        overwrite (bool, optional): Whether to overwrite existing data. Defaults to False.
        store (bool, optional): Whether to store intermediate results. Defaults to False.
        log (bool, optional): Whether to log pipeline operations. Defaults to False.
    """
    super().__init__(
        filters=filters, metrics=metrics, overwrite=overwrite, store=store, log=log
    )
    self.filters: List[BaseFilter] = filters
    self.metrics: List[BaseMetric] = metrics
    self.overwrite = overwrite
    self.store = store
    self.log = log

evaluate(x_data, y_true, y_pred)

Evaluate the pipeline using the specified metrics.

This method applies each metric in the pipeline to the predicted and true values, returning a dictionary of evaluation results.

Parameters:

Name Type Description Default
x_data XYData

The input data used for evaluation.

required
y_true XYData | None

The true target values, if available.

required
y_pred XYData

The predicted values.

required

Returns:

Type Description
Dict[str, float]

Dict[str, float]: A dictionary containing the evaluation results for each metric.

Example
>>> results = pipeline.evaluate(x_test, y_test, y_pred)
>>> print(results)
{'F1': 0.85, 'Precision': 0.80, 'Recall': 0.90}
Source code in framework3/plugins/pipelines/sequential/f3_pipeline.py
def evaluate(
    self, x_data: XYData, y_true: XYData | None, y_pred: XYData
) -> Dict[str, float]:
    """
    Evaluate the pipeline using the specified metrics.

    This method applies each metric in the pipeline to the predicted and true values,
    returning a dictionary of evaluation results.

    Args:
        x_data (XYData): The input data used for evaluation.
        y_true (XYData | None): The true target values, if available.
        y_pred (XYData): The predicted values.

    Returns:
        Dict[str, float]: A dictionary containing the evaluation results for each metric.

    Example:
        ```python
        >>> results = pipeline.evaluate(x_test, y_test, y_pred)
        >>> print(results)
        {'F1': 0.85, 'Precision': 0.80, 'Recall': 0.90}
        ```
    """

    self._print_acction("Evaluating pipeline...")

    evaluations = {}
    for metric in self.metrics:
        evaluations[metric.__class__.__name__] = metric.evaluate(
            x_data, y_true, y_pred
        )
    return evaluations

fit(x, y)

Fit the pipeline to the input data.

This method applies each filter in the pipeline sequentially to the input data, fitting each filter that requires training.

Parameters:

Name Type Description Default
x XYData

The input data to fit the pipeline on.

required
y Optional[XYData]

The target data, if available.

required

Returns:

Type Description
None | float

None | float: The loss value from the last fitted filter, if any.

Note

Filters that raise NotTrainableFilterError will be initialized instead of fitted.

Source code in framework3/plugins/pipelines/sequential/f3_pipeline.py
def fit(self, x: XYData, y: Optional[XYData]) -> None | float:
    """
    Fit the pipeline to the input data.

    This method applies each filter in the pipeline sequentially to the input data,
    fitting each filter that requires training.

    Args:
        x (XYData): The input data to fit the pipeline on.
        y (Optional[XYData]): The target data, if available.

    Returns:
        None | float: The loss value from the last fitted filter, if any.

    Note:
        Filters that raise NotTrainableFilterError will be initialized instead of fitted.
    """
    self._print_acction("Fitting pipeline")
    loss = None
    for filter in self.filters:
        if self._verbose:
            rprint(filter)
        try:
            loss = filter.fit(x, y)
        except NotTrainableFilterError:
            filter.init()  # Initialize filter

        x = filter.predict(x)

    return loss

inner()

Get the list of filters in the pipeline.

Returns:

Type Description
List[BaseFilter]

List[BaseFilter]: The list of filters in the pipeline.

Source code in framework3/plugins/pipelines/sequential/f3_pipeline.py
def inner(self) -> List[BaseFilter]:
    """
    Get the list of filters in the pipeline.

    Returns:
        List[BaseFilter]: The list of filters in the pipeline.
    """
    return self.filters

predict(x)

Make predictions using the fitted pipeline.

This method applies each filter in the pipeline sequentially to the input data to generate predictions.

Parameters:

Name Type Description Default
x XYData

The input data to make predictions on.

required

Returns:

Name Type Description
XYData XYData

The predictions made by the pipeline.

Source code in framework3/plugins/pipelines/sequential/f3_pipeline.py
def predict(self, x: XYData) -> XYData:
    """
    Make predictions using the fitted pipeline.

    This method applies each filter in the pipeline sequentially to the input data
    to generate predictions.

    Args:
        x (XYData): The input data to make predictions on.

    Returns:
        XYData: The predictions made by the pipeline.
    """

    self._print_acction("Predicting pipeline")

    for filter_ in self.filters:
        if self._verbose:
            rprint(filter_)
        x = filter_.predict(x)

    return x

start(x, y, X_)

Start the pipeline execution by fitting the model and making predictions.

This method initiates the pipeline process by fitting the model to the input data and then making predictions.

Parameters:

Name Type Description Default
x XYData

The input data for fitting and prediction.

required
y Optional[XYData]

The target data for fitting, if available.

required
X_ Optional[XYData]

Additional input data for prediction, if different from x.

required

Returns:

Type Description
Optional[XYData]

Optional[XYData]: The predictions made by the pipeline, or None if an error occurs.

Raises:

Type Description
Exception

If an error occurs during the fitting or prediction process.

Source code in framework3/plugins/pipelines/sequential/f3_pipeline.py
def start(
    self, x: XYData, y: Optional[XYData], X_: Optional[XYData]
) -> Optional[XYData]:
    """
    Start the pipeline execution by fitting the model and making predictions.

    This method initiates the pipeline process by fitting the model to the input data
    and then making predictions.

    Args:
        x (XYData): The input data for fitting and prediction.
        y (Optional[XYData]): The target data for fitting, if available.
        X_ (Optional[XYData]): Additional input data for prediction, if different from x.

    Returns:
        Optional[XYData]: The predictions made by the pipeline, or None if an error occurs.

    Raises:
        Exception: If an error occurs during the fitting or prediction process.
    """
    try:
        self.fit(x, y)
        if X_ is not None:
            return self.predict(X_)
        else:
            return self.predict(x)
    except Exception as e:
        print(f"Error during pipeline execution: {e}")
        raise e

Overview

The F3Pipeline is a flexible and powerful pipeline implementation in the framework3 ecosystem. It allows you to chain multiple data processing steps, machine learning models, and evaluation metrics into a single, cohesive workflow.

Key Features

  • Seamless integration of multiple plugins (filters, transformers, models)
  • Built-in support for various metrics
  • Caching capabilities for improved performance
  • Nested pipeline support for complex workflows

Basic Usage

Here's a simple example of how to create and use an F3Pipeline:

from framework3.plugins.pipelines import F3Pipeline
from framework3.plugins.filters.transformation import PCAPlugin
from framework3.plugins.filters.classification import SVMClassifier
from framework3.plugins.metrics import F1Score, Accuracy
from framework3.base.base_types import XYData
import numpy as np

# Create a pipeline
pipeline = F3Pipeline(
    plugins=[
        PCAPlugin(n_components=2),
        SVMClassifier(kernel='rbf')
    ],
    metrics=[F1Score(), Accuracy()]
)

# Generate some dummy data
X = XYData(value=np.random.rand(100, 10))
y = XYData(value=np.random.randint(0, 2, 100))

# Fit the pipeline
pipeline.fit(X, y)

# Make predictions
y_pred = pipeline.predict(X)

# Evaluate the pipeline
results = pipeline.evaluate(X, y, y_pred)
print(results)

Advanced Usage

Nested Pipelines

F3Pipeline supports nesting, allowing you to create more complex workflows:

from framework3.plugins.pipelines import F3Pipeline
from framework3.plugins.filters.transformation import NormalizationPlugin
from framework3.plugins.filters.feature_selection import VarianceThresholdPlugin

# Create a sub-pipeline
feature_engineering = F3Pipeline(
    plugins=[
        NormalizationPlugin(),
        VarianceThresholdPlugin(threshold=0.1)
    ],
    metrics=[]
)

# Create the main pipeline
main_pipeline = F3Pipeline(
    plugins=[
        feature_engineering,
        SVMClassifier(kernel='linear')
    ],
    metrics=[F1Score(), Accuracy()]
)

# Use the main pipeline as before
main_pipeline.fit(X, y)
y_pred = main_pipeline.predict(X)
results = main_pipeline.evaluate(X, y, y_pred)

Caching

F3Pipeline supports caching of intermediate results and fitted models for improved performance:

from framework3.plugins.filters.cached_filter import Cached
from framework3.plugins.filters.transformation import PCAPlugin

pipeline = F3Pipeline(
    plugins=[
        Cached(
            filter=PCAPlugin(n_components=2),
            cache_data=True,
            cache_filter=True,
            overwrite=False
        ),
        SVMClassifier()
    ],
    metrics=[F1Score()]
)

# The PCA transformation will be cached after the first run
pipeline.fit(X, y)

API Reference

F3Pipeline

class F3Pipeline(BasePipeline):
    def __init__(self, plugins: List[BasePlugin], metrics: List[BaseMetric], overwrite: bool = False, store: bool = False, log: bool = False) -> None:
        """
        Initialize the F3Pipeline.

        Args:
            plugins (List[BasePlugin]): List of plugins to be applied in the pipeline.
            metrics (List[BaseMetric]): List of metrics for evaluation.
            overwrite (bool, optional): Whether to overwrite existing data. Defaults to False.
            store (bool, optional): Whether to store intermediate results. Defaults to False.
            log (bool, optional): Whether to log pipeline operations. Defaults to False.
        """

Methods

  • fit(self, x: XYData, y: Optional[XYData]): Fit the pipeline to the input data.
  • predict(self, x: XYData) -> XYData: Make predictions using the fitted pipeline.
  • evaluate(self, x_data: XYData, y_true: XYData|None, y_pred: XYData) -> Dict[str, float]: Evaluate the pipeline using the specified metrics.

Best Practices

  1. Order Matters: The order of plugins in the pipeline is crucial. Ensure that your data preprocessing steps come before your model.

  2. Caching: Use caching for computationally expensive steps, especially when you're iterating on your pipeline design.

  3. Nested Pipelines: Use nested pipelines to organize complex workflows into logical sub-components.

  4. Metrics: Include multiple relevant metrics to get a comprehensive view of your pipeline's performance.

  5. Cross-Validation: Consider using cross-validation techniques in conjunction with F3Pipeline for more robust model evaluation.

  6. Logging: Enable logging to get insights into the pipeline's operation and to help with debugging.

  7. Parameter Tuning: Use F3Pipeline in conjunction with hyperparameter tuning techniques to optimize your entire workflow.

Conclusion

F3Pipeline provides a powerful and flexible way to build complex data processing and machine learning workflows in framework3. By combining multiple plugins, nested pipelines, and caching capabilities, you can create efficient and maintainable pipelines for a wide range of tasks.