Skip to content

MonoPipeline

The MonoPipeline class is part of the framework3.plugins.pipelines.parallel module and is designed to run multiple pipelines in parallel, combining their outputs to create new features. This pipeline is ideal for scenarios where different models or transformations need to be applied simultaneously to the same dataset.

Module Contents

framework3.plugins.pipelines.parallel.parallel_mono_pipeline

MonoPipeline

Bases: ParallelPipeline

A pipeline that combines multiple filters in parallel and constructs new features from their outputs.

This pipeline allows for simultaneous execution of multiple filters on the same input data, and then combines their outputs to create new features. It's particularly useful for feature engineering and ensemble methods.

Key Features
  • Parallel execution of multiple filters
  • Combination of filter outputs for feature construction
  • Support for evaluation metrics
Usage
from framework3.plugins.pipelines.parallel import MonoPipeline
from framework3.plugins.filters.transformation import PCAPlugin
from framework3.plugins.filters.classification import KnnFilter
from framework3.plugins.metrics import F1Score
from framework3.base import XYData

pipeline = MonoPipeline(
    filters=[
        PCAPlugin(n_components=2),
        KnnFilter(n_neighbors=3)
    ],
    metrics=[F1Score()]
)

x_train = XYData(...)
y_train = XYData(...)
x_test = XYData(...)
y_test = XYData(...)

pipeline.fit(x_train, y_train)
predictions = pipeline.predict(x_test)
evaluation = pipeline.evaluate(x_test, y_test, predictions)
print(evaluation)

Attributes:

Name Type Description
filters Sequence[BaseFilter]

A sequence of filters to be applied in parallel.

Methods:

Name Description
fit

XYData, y: Optional[XYData], evaluator: BaseMetric | None = None) -> Optional[float]: Fit all filters in parallel.

predict

XYData) -> XYData: Make predictions using all filters in parallel.

evaluate

XYData, y_true: XYData | None, y_pred: XYData) -> Dict[str, Any]: Evaluate the pipeline using provided metrics.

combine_features

list[XYData]) -> XYData: Combine features from all filter outputs.

start

XYData, y: XYData | None, X_: XYData | None) -> XYData | None: Start the pipeline execution.

Source code in framework3/plugins/pipelines/parallel/parallel_mono_pipeline.py
@Container.bind()
class MonoPipeline(ParallelPipeline):
    """
    A pipeline that combines multiple filters in parallel and constructs new features from their outputs.

    This pipeline allows for simultaneous execution of multiple filters on the same input data,
    and then combines their outputs to create new features. It's particularly useful for
    feature engineering and ensemble methods.

    Key Features:
        - Parallel execution of multiple filters
        - Combination of filter outputs for feature construction
        - Support for evaluation metrics

    Usage:
        ```python
        from framework3.plugins.pipelines.parallel import MonoPipeline
        from framework3.plugins.filters.transformation import PCAPlugin
        from framework3.plugins.filters.classification import KnnFilter
        from framework3.plugins.metrics import F1Score
        from framework3.base import XYData

        pipeline = MonoPipeline(
            filters=[
                PCAPlugin(n_components=2),
                KnnFilter(n_neighbors=3)
            ],
            metrics=[F1Score()]
        )

        x_train = XYData(...)
        y_train = XYData(...)
        x_test = XYData(...)
        y_test = XYData(...)

        pipeline.fit(x_train, y_train)
        predictions = pipeline.predict(x_test)
        evaluation = pipeline.evaluate(x_test, y_test, predictions)
        print(evaluation)
        ```

    Attributes:
        filters (Sequence[BaseFilter]): A sequence of filters to be applied in parallel.

    Methods:
        fit(x: XYData, y: Optional[XYData], evaluator: BaseMetric | None = None) -> Optional[float]:
            Fit all filters in parallel.
        predict(x: XYData) -> XYData: Make predictions using all filters in parallel.
        evaluate(x_data: XYData, y_true: XYData | None, y_pred: XYData) -> Dict[str, Any]:
            Evaluate the pipeline using provided metrics.
        combine_features(pipeline_outputs: list[XYData]) -> XYData:
            Combine features from all filter outputs.
        start(x: XYData, y: XYData | None, X_: XYData | None) -> XYData | None:
            Start the pipeline execution.
    """

    def __init__(self, filters: Sequence[BaseFilter]):
        """
        Initialize the MonoPipeline.

        Args:
            filters (Sequence[BaseFilter]): A sequence of filters to be applied in parallel.
        """
        super().__init__(filters=filters)
        self.filters = filters

    def fit(
        self, x: XYData, y: Optional[XYData], evaluator: BaseMetric | None = None
    ) -> Optional[float]:
        """
        Fit all filters in the pipeline to the input data in parallel.

        This method applies the fit operation to each filter in the pipeline
        using the provided input data.

        Args:
            x (XYData): The input data to fit the filters on.
            y (XYData | None): The target data, if available.
            evaluator (BaseMetric | None, optional): An evaluator metric, if needed. Defaults to None.

        Returns:
            Optional[float]: The mean of the losses returned by the filters, if any.

        Note:
            Filters that raise NotTrainableFilterError will be initialized instead of fitted.
        """
        losses = []
        for f in self.filters:
            try:
                losses.append(f.fit(deepcopy(x), y))
            except NotTrainableFilterError:
                f.init()

        # filtre los valores None

        match list(filter(lambda x: x is not None, losses)):
            case []:
                return None
            case lss:
                return float(np.mean(lss))

    def predict(self, x: XYData) -> XYData:
        """
        Make predictions using all filters in parallel and combine their outputs.

        This method applies the predict operation to each filter in the pipeline
        and then combines the outputs using the combine_features method.

        Args:
            x (XYData): The input data to make predictions on.

        Returns:
            XYData: The combined predictions from all filters.
        """
        outputs: List[XYData] = [filter.predict(deepcopy(x)) for filter in self.filters]
        return self.combine_features(outputs)

    def evaluate(
        self, x_data: XYData, y_true: XYData | None, y_pred: XYData
    ) -> Dict[str, Any]:
        """
        Evaluate the pipeline using the provided metrics.

        This method applies each metric in the pipeline to the predicted and true values,
        returning a dictionary of evaluation results.

        Args:
            x_data (XYData): Input data used for evaluation.
            y_true (XYData | None): True target data, if available.
            y_pred (XYData): Predicted target data.

        Returns:
            Dict[str, Any]: A dictionary containing the evaluation results for each metric.

        Example:
            ```python
            >>> evaluation = pipeline.evaluate(x_test, y_test, predictions)
            >>> print(evaluation)
            {'F1Score': 0.85, 'Accuracy': 0.90}
            ```
        """
        results = {}
        for metric in self.metrics:
            results[metric.__class__.__name__] = metric.evaluate(x_data, y_true, y_pred)
        return results

    @staticmethod
    def combine_features(pipeline_outputs: list[XYData]) -> XYData:
        """
        Combine features from all filter outputs.

        This method concatenates the features from all filter outputs along the last axis.

        Args:
            pipeline_outputs (List[XYData]): List of outputs from each filter.

        Returns:
            XYData: Combined output with concatenated features.

        Note:
            This method assumes that all filter outputs can be concatenated along the last axis.
            Ensure that your filters produce compatible outputs.
        """
        return XYData.concat(
            [XYData.ensure_dim(output.value) for output in pipeline_outputs], axis=-1
        )

    def start(self, x: XYData, y: XYData | None, X_: XYData | None) -> XYData | None:
        """
        Start the pipeline execution by fitting the model and making predictions.

        This method initiates the pipeline process by fitting the model to the input data
        and then making predictions.

        Args:
            x (XYData): The input data for fitting and prediction.
            y (XYData | None): The target data for fitting, if available.
            X_ (XYData | None): Additional input data for prediction, if different from x.

        Returns:
            XYData | None: The predictions made by the pipeline, or None if an error occurs.

        Raises:
            Exception: If an error occurs during the fitting or prediction process.
        """
        try:
            self.fit(x, y)
            if X_ is not None:
                return self.predict(X_)
            else:
                return self.predict(x)
        except Exception as e:
            print(f"Error during pipeline execution: {e}")
            raise e
__init__(filters)

Initialize the MonoPipeline.

Parameters:

Name Type Description Default
filters Sequence[BaseFilter]

A sequence of filters to be applied in parallel.

required
Source code in framework3/plugins/pipelines/parallel/parallel_mono_pipeline.py
def __init__(self, filters: Sequence[BaseFilter]):
    """
    Initialize the MonoPipeline.

    Args:
        filters (Sequence[BaseFilter]): A sequence of filters to be applied in parallel.
    """
    super().__init__(filters=filters)
    self.filters = filters
combine_features(pipeline_outputs) staticmethod

Combine features from all filter outputs.

This method concatenates the features from all filter outputs along the last axis.

Parameters:

Name Type Description Default
pipeline_outputs List[XYData]

List of outputs from each filter.

required

Returns:

Name Type Description
XYData XYData

Combined output with concatenated features.

Note

This method assumes that all filter outputs can be concatenated along the last axis. Ensure that your filters produce compatible outputs.

Source code in framework3/plugins/pipelines/parallel/parallel_mono_pipeline.py
@staticmethod
def combine_features(pipeline_outputs: list[XYData]) -> XYData:
    """
    Combine features from all filter outputs.

    This method concatenates the features from all filter outputs along the last axis.

    Args:
        pipeline_outputs (List[XYData]): List of outputs from each filter.

    Returns:
        XYData: Combined output with concatenated features.

    Note:
        This method assumes that all filter outputs can be concatenated along the last axis.
        Ensure that your filters produce compatible outputs.
    """
    return XYData.concat(
        [XYData.ensure_dim(output.value) for output in pipeline_outputs], axis=-1
    )
evaluate(x_data, y_true, y_pred)

Evaluate the pipeline using the provided metrics.

This method applies each metric in the pipeline to the predicted and true values, returning a dictionary of evaluation results.

Parameters:

Name Type Description Default
x_data XYData

Input data used for evaluation.

required
y_true XYData | None

True target data, if available.

required
y_pred XYData

Predicted target data.

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: A dictionary containing the evaluation results for each metric.

Example
>>> evaluation = pipeline.evaluate(x_test, y_test, predictions)
>>> print(evaluation)
{'F1Score': 0.85, 'Accuracy': 0.90}
Source code in framework3/plugins/pipelines/parallel/parallel_mono_pipeline.py
def evaluate(
    self, x_data: XYData, y_true: XYData | None, y_pred: XYData
) -> Dict[str, Any]:
    """
    Evaluate the pipeline using the provided metrics.

    This method applies each metric in the pipeline to the predicted and true values,
    returning a dictionary of evaluation results.

    Args:
        x_data (XYData): Input data used for evaluation.
        y_true (XYData | None): True target data, if available.
        y_pred (XYData): Predicted target data.

    Returns:
        Dict[str, Any]: A dictionary containing the evaluation results for each metric.

    Example:
        ```python
        >>> evaluation = pipeline.evaluate(x_test, y_test, predictions)
        >>> print(evaluation)
        {'F1Score': 0.85, 'Accuracy': 0.90}
        ```
    """
    results = {}
    for metric in self.metrics:
        results[metric.__class__.__name__] = metric.evaluate(x_data, y_true, y_pred)
    return results
fit(x, y, evaluator=None)

Fit all filters in the pipeline to the input data in parallel.

This method applies the fit operation to each filter in the pipeline using the provided input data.

Parameters:

Name Type Description Default
x XYData

The input data to fit the filters on.

required
y XYData | None

The target data, if available.

required
evaluator BaseMetric | None

An evaluator metric, if needed. Defaults to None.

None

Returns:

Type Description
Optional[float]

Optional[float]: The mean of the losses returned by the filters, if any.

Note

Filters that raise NotTrainableFilterError will be initialized instead of fitted.

Source code in framework3/plugins/pipelines/parallel/parallel_mono_pipeline.py
def fit(
    self, x: XYData, y: Optional[XYData], evaluator: BaseMetric | None = None
) -> Optional[float]:
    """
    Fit all filters in the pipeline to the input data in parallel.

    This method applies the fit operation to each filter in the pipeline
    using the provided input data.

    Args:
        x (XYData): The input data to fit the filters on.
        y (XYData | None): The target data, if available.
        evaluator (BaseMetric | None, optional): An evaluator metric, if needed. Defaults to None.

    Returns:
        Optional[float]: The mean of the losses returned by the filters, if any.

    Note:
        Filters that raise NotTrainableFilterError will be initialized instead of fitted.
    """
    losses = []
    for f in self.filters:
        try:
            losses.append(f.fit(deepcopy(x), y))
        except NotTrainableFilterError:
            f.init()

    # filtre los valores None

    match list(filter(lambda x: x is not None, losses)):
        case []:
            return None
        case lss:
            return float(np.mean(lss))
predict(x)

Make predictions using all filters in parallel and combine their outputs.

This method applies the predict operation to each filter in the pipeline and then combines the outputs using the combine_features method.

Parameters:

Name Type Description Default
x XYData

The input data to make predictions on.

required

Returns:

Name Type Description
XYData XYData

The combined predictions from all filters.

Source code in framework3/plugins/pipelines/parallel/parallel_mono_pipeline.py
def predict(self, x: XYData) -> XYData:
    """
    Make predictions using all filters in parallel and combine their outputs.

    This method applies the predict operation to each filter in the pipeline
    and then combines the outputs using the combine_features method.

    Args:
        x (XYData): The input data to make predictions on.

    Returns:
        XYData: The combined predictions from all filters.
    """
    outputs: List[XYData] = [filter.predict(deepcopy(x)) for filter in self.filters]
    return self.combine_features(outputs)
start(x, y, X_)

Start the pipeline execution by fitting the model and making predictions.

This method initiates the pipeline process by fitting the model to the input data and then making predictions.

Parameters:

Name Type Description Default
x XYData

The input data for fitting and prediction.

required
y XYData | None

The target data for fitting, if available.

required
X_ XYData | None

Additional input data for prediction, if different from x.

required

Returns:

Type Description
XYData | None

XYData | None: The predictions made by the pipeline, or None if an error occurs.

Raises:

Type Description
Exception

If an error occurs during the fitting or prediction process.

Source code in framework3/plugins/pipelines/parallel/parallel_mono_pipeline.py
def start(self, x: XYData, y: XYData | None, X_: XYData | None) -> XYData | None:
    """
    Start the pipeline execution by fitting the model and making predictions.

    This method initiates the pipeline process by fitting the model to the input data
    and then making predictions.

    Args:
        x (XYData): The input data for fitting and prediction.
        y (XYData | None): The target data for fitting, if available.
        X_ (XYData | None): Additional input data for prediction, if different from x.

    Returns:
        XYData | None: The predictions made by the pipeline, or None if an error occurs.

    Raises:
        Exception: If an error occurs during the fitting or prediction process.
    """
    try:
        self.fit(x, y)
        if X_ is not None:
            return self.predict(X_)
        else:
            return self.predict(x)
    except Exception as e:
        print(f"Error during pipeline execution: {e}")
        raise e

Class Hierarchy

  • MonoPipeline

MonoPipeline

MonoPipeline extends ParallelPipeline and provides functionality for executing multiple pipelines in parallel, combining their outputs to enhance feature sets.

Key Methods:

  • __init__(filters: Sequence[BaseFilter]): Initializes the pipeline with a sequence of filters to be applied in parallel.
  • start(x: XYData, y: XYData|None, X_: XYData|None) -> XYData|None: Starts the pipeline execution, fitting the data and making predictions.
  • fit(x: XYData, y: XYData|None = None): Fits all pipelines in parallel using the provided input data.
  • predict(x: XYData) -> XYData: Runs predictions on all pipelines in parallel and combines their outputs.
  • evaluate(x_data: XYData, y_true: XYData|None, y_pred: XYData) -> Dict[str, Any]: Evaluates the pipeline using the provided metrics.
  • combine_features(pipeline_outputs: list[XYData]) -> XYData: Combines features from all pipeline outputs.

Usage Examples

Creating and Using MonoPipeline

from framework3.plugins.pipelines.parallel.mono_pipeline import MonoPipeline
from framework3.base import XYData

# Define filters (assuming filters are defined elsewhere)
filters = [Filter1(), Filter2()]

# Initialize the MonoPipeline
mono_pipeline = MonoPipeline(filters=filters)

# Example data
x_data = XYData(_hash='x_data', _path='/tmp', _value=[[1, 2], [3, 4]])
y_data = XYData(_hash='y_data', _path='/tmp', _value=[0, 1])

# Start the pipeline
results = mono_pipeline.start(x_data, y_data, None)

# Evaluate the pipeline
evaluation_results = mono_pipeline.evaluate(x_data, y_data, results)
print(evaluation_results)

Best Practices

  1. Ensure that the filters used in the pipeline are compatible with the input data.
  2. Define a clear combiner function if the default concatenation does not meet your needs.
  3. Monitor the performance and resource utilization to optimize parallel execution.

Conclusion

MonoPipeline provides a flexible and efficient way to execute multiple pipelines in parallel, enhancing feature sets through combined outputs. By following the best practices and examples provided, you can effectively integrate this pipeline into your machine learning workflows.