Source code for mlshell.producers.pipeline

"""
The :mod:`mlshell.producers.pipeline` contains examples of `Pipeline` class to
create empty pipeline object and `PipelineProducer` class to fill it.

:class:`mlshell.Pipeline` proposes unified interface to work with
underlying pipeline. Intended to be used in :mod:`mlshell.Workflow`.
For new pipeline formats no need to edit `Workflow` class, just adapt in
compliance to `Pipeline` interface.

:class:`mlshell.PipelineProducer` specifies methods to create/load
pipeline. Model loading currently implemented via :mod:`joblib` and model
creation via :class:`sklearn.pipeline.Pipeline` .

"""


import os

import joblib
import mlshell
import pycnfg
import sklearn.utils.estimator_checks

__all__ = ['Pipeline', 'PipelineProducer']


[docs]class Pipeline(object): """Unified pipeline interface. Implements interface to access arbitrary pipeline. Interface: is_classifier, is_regressor, dump, set_params and all underlying pipeline object methods. Attributes ---------- pipeline : :mod:`sklearn` estimator Underlying pipeline. dataset_id : str If pipeline is fitted, train dataset identifier, otherwise None. Notes ----- Calling unspecified methods are redirected to underlying pipeline object. """
[docs] def __init__(self, pipeline=None, oid=None, dataset_id=None): """ Parameters ---------- pipeline : :mod:`sklearn` estimator, optional (default=None) Pipeline to wrap. oid : str Instance identifier. dataset_id : str, optional (default=None), Train dataset identifier if pipeline is fitted, otherwise None. """ self.pipeline = pipeline self.oid = oid self.dataset_id = dataset_id
def __hash__(self): s = str(self.pipeline.get_params()) return hash(s) def __getattr__(self, name): """Redirect unknown methods to pipeline object.""" def wrapper(*args, **kwargs): return getattr(self.pipeline, name)(*args, **kwargs) return wrapper def __getstate__(self): # Allow pickle. return self.__dict__ def __setstate__(self, d): # Allow unpickle. self.__dict__ = d
[docs] def fit(self, *args, **kwargs): """Fit pipeline.""" self.dataset_id = kwargs.pop('dataset_id', None) self.pipeline.fit(*args, **kwargs)
[docs] def set_params(self, *args, **kwargs): """Set pipeline params.""" self.dataset_id = None self.pipeline.set_params(*args, **kwargs)
[docs] def is_classifier(self): """Check if pipeline classifier.""" return sklearn.base.is_classifier(self.pipeline)
[docs] def is_regressor(self): """Check if pipeline regressor.""" return sklearn.base.is_regressor(self.pipeline)
[docs] def dump(self, filepath, **kwargs): """Dump the pipeline on disk. Parameters ---------- filepath : str File path without extension. **kwargs : dict ` Additional kwargs to pass in dump(**kwargs). Returns ------- fullpath : str Full file path. """ fullpath = f'{filepath}.model' joblib.dump(self.pipeline, fullpath, **kwargs) return fullpath
[docs]class PipelineProducer(pycnfg.Producer): """Factory to produce pipeline. Interface: set, make, load, info. Parameters ---------- objects : dict Dictionary with objects from previous executed producers: {'section_id__config__id', object,} oid : str Unique identifier of produced object. path_id : str, optional (default='default') Project path identifier in `objects`. logger_id : str, optional (default='default') Logger identifier in `objects`. Attributes ---------- objects : dict Dictionary with objects from previous executed producers: {'section_id__config__id', object,} oid : str Unique identifier of produced object. logger : :class:`logging.Logger` Logger. project_path : str Absolute path to project dir. """ _required_parameters = ['objects', 'oid', 'path_id', 'logger_id']
[docs] def __init__(self, objects, oid, path_id='path__default', logger_id='logger__default'): pycnfg.Producer.__init__(self, objects, oid, path_id=path_id, logger_id=logger_id)
[docs] def set(self, pipeline, estimator): """Set estimator as pipeline. Parameters ---------- pipeline : :class:`mlshell.Pipeline` Pipeline object, will be updated. estimator : :mod:`sklearn` estimator Estimator to set in :class:`mlshell.Pipeline` interface. Returns ------- pipeline : :class:`mlshell.Pipeline` Resulted pipeline. """ pipeline.pipeline = estimator return pipeline
[docs] def make(self, pipeline, steps=None, estimator=None, memory=None, **kwargs): """Create pipeline from steps. Parameters ---------- pipeline : :class:`mlshell.Pipeline` Pipeline object, will be updated. steps: list, class, optional (default=None) Pipeline steps to pass in :class:`sklearn.pipeline.Pipeline` . Could be a class with ``steps`` attribute, will be initialized ``steps(estimator, **kwargs)``. If None, :class:`mlshell.pipeline. Steps` is used. Set ``[]`` to use``estimator`` direct as pipeline. estimator : :mod:`sklearn` estimator Estimator to use in the last step if ``steps`` is a class or direct if ``steps=[]``. memory : str, :class:`joblib.Memory` interface, optional (default=None) `memory` argument passed to :class:`sklearn.pipeline.Pipeline` . If 'auto', "project_path/.temp/pipeline" is used. **kwargs : dict Additional kwargs to initialize `steps` (if provided as class). Returns ------- pipeline : :class:`mlshell.Pipeline` Resulted pipeline. """ steps = self._steps_resolve(steps, estimator=estimator, **kwargs) if steps: memory = self._memory_resolve(memory) pipeline.pipeline = sklearn.pipeline.Pipeline(steps, memory=memory) else: pipeline.pipeline = estimator return pipeline
[docs] def load(self, pipeline, filepath, **kwargs): """Load fitted model from disk. Parameters ---------- pipeline : :class:`mlshell.Pipeline` Pipeline object, will be updated. filepath : str Absolute path to load file or relative to 'project__path' started with './'. kwargs : dict Additional parameters to pass in load(). Returns ------- pipeline : :class:`mlshell.Pipeline` Resulted pipeline. """ if filepath.startswith('./'): filepath = f"{self.project_path}/{filepath[2:]}" pipeline.pipeline = joblib.load(filepath, **kwargs) self.logger.info('Load fitted model from file:\n' ' {}'.format(filepath)) return pipeline
[docs] def info(self, pipeline, **kwargs): """Log pipeline info. Parameters ---------- pipeline : :class:`mlshell.Pipeline` Pipeline to explore (if 'steps' attribute available). **kwargs : dict Additional parameters to pass in low-level functions. Returns ------- pipeline : :class:`mlshell.Pipeline` For compliance with producer logic. """ self._print_steps(pipeline, **kwargs) return pipeline
# ================================ make =================================== def _steps_resolve(self, steps, **kwargs): """Prepare pipeline steps. Returns ------- steps: list :class:`sklearn.pipeline.Pipeline` steps. """ if isinstance(steps, list): steps = steps else: if steps is None: clss = mlshell.pipeline.Steps else: clss = steps steps = clss(**kwargs).steps return steps def _memory_resolve(self, memory): if memory == 'auto': memory = f"{self.project_path}/.temp/pipeline" if memory is not None and not os.path.exists(memory): # Create temp dir for cache if not exist. os.makedirs(memory) return memory # ================================ info =================================== def _print_steps(self, pipeline, **kwargs): """"Nice print of pipeline steps.""" params = pipeline.get_params() self.logger.debug('Pipeline steps:') if 'steps' not in params: return for i, step in enumerate(params['steps']): step_name = step[0] # step_hp = {key: params[key] for key in params.keys() # if step_name + '__' in key} self.logger.debug(f" ({i}) {step[0]}\n" f" {step[1]}") self.logger.debug(' hp:\n' ' {jsbeautifier.beautify(str(step_hp))}') self.logger.debug('+' * 100) return
if __name__ == '__main__': pass