Source code for mlshell.producers.workflow

"""
The :mod:`mlshell.producers.workflow` contains examples of `Workflow` class to
produce results for typical machine learning task.

:class:`mlshell.Workflow` class uses unified interface to work with underlying
pipelines/datasets/metrics. Current implementation specifies methods to
fit/predict/optimize/validate/dump pipeline and plot results.

Workflow support multi-stage optimization for a pipeline-dataset pair. Each
stage combined results with previous to find the best hp combination and apply
it in the next stage.

"""

import inspect
import os
import pathlib
import platform
import threading
import time

import jsbeautifier
import mlshell
import numpy as np
import pandas as pd
import pycnfg
import sklearn

__all__ = ['Workflow']


[docs]class Workflow(pycnfg.Producer): """Interface to ML task. Interface: fit, predict, optimize, validate, dump, plot. Parameters ---------- objects : dict Dictionary with resulted objects from previous executed producers: {'section_id__config__id', object}. oid : str Unique identifier of produced object. path_id : str, optional (default='default') Project path identifier in `objects`. logger_id : str, optional (default='default') Logger identifier in `objects`. Attributes ---------- objects : dict Dictionary with resulted objects from previous executed producers: {'section_id__config__id', object,} oid : str Unique identifier of produced object. logger : :class:`logging.Logger` Logger. project_path : str Absolute path to project dir. See also -------- :class:`mlshell.Dataset` : Dataset interface. :class:`mlshell.Metric` : Metric inteface. :class:`mlshell.Pipeline` : Pipeline inteface. """ _required_parameters = ['objects', 'oid', 'path_id', 'logger_id']
[docs] def __init__(self, objects, oid, path_id='path__default', logger_id='logger__default'): pycnfg.Producer.__init__(self, objects, oid, path_id=path_id, logger_id=logger_id) self._optional()
[docs] def fit(self, res, pipeline_id, dataset_id, subset_id='train', hp=None, resolver=None, resolve_params=None, fit_params=None): """Fit pipeline. Parameters ---------- res : dict For compliance with producer logic. pipeline_id : str Pipeline identifier in ``objects``. Will be fitted on `dataset_id__ subset_id`: ``pipeline.fit(subset.x, subset.y, **fit_params)`` . dataset_id : str Dataset identifier in ``objects``. subset_id : str, optional (default='train') Data subset identifier to fit on. If '', use full dataset. hp : dict, optional (default=None) Hyper-parameters to use in pipeline: {`hp_name`: val/container}. If range provided for any hp, zero position will be used. If None, {}. resolver : :class:`mlshell.model_selection.Resolver`, optional (default=None) If hp value = 'auto', hp will be resolved: ``resolver.resolve()``. Auto initialized if necessary. :class:`mlshell.model_selection. Resolver` if None. resolve_params : dict, optional (default=None) Additional kwargs to pass in: ``resolver.resolve(*args, **resolve_params[hp_name])``. If None, {}. fit_params : dict, optional (default=None) Additional kwargs to pass in ``pipeline.fit(*args, **fit_params)``. If None, {}. Returns ------- res : dict Unchanged input, for compliance with producer logic. Notes ----- Pipeline updated in ``objects`` attribute. See Also -------- :class:`mlshell.model_selection.Resolver` : Hp resolver. """ if hp is None: hp = {} if resolver is None: resolver = mlshell.model_selection.Resolver if inspect.isclass(resolver): resolver = resolver() if resolve_params is None: resolve_params = {} if fit_params is None: fit_params = {} pipeline = self.objects[pipeline_id] dataset = self.objects[dataset_id] pipeline = self._set_hp( hp, pipeline, resolver, dataset, resolve_params) train = dataset.subset(subset_id) pipeline.fit(train.x, train.y, **fit_params) pipeline.dataset_id = train.oid self.objects[pipeline_id] = pipeline return res
[docs] def optimize(self, res, pipeline_id, dataset_id, subset_id='train', metric_id=None, hp_grid=None, resolver=None, optimizer=None, dirpath=None, resolve_params=None, fit_params=None, gs_params=None, dump_params=None): """Optimize pipeline. Parameters ---------- res : dict For compliance with producer logic. pipeline_id : str Pipeline identifier in ``objects``. Will be cross-validate on `dataset_id__subset_id`: ``optimizer.fit(subset.x, subset.y, **fit_params)`` . dataset_id : str Dataset identifier in `objects`. subset_id : str, optional (default='train') Data subset identifier to CV on. If '', use full dataset. metric_id : str, List/tuple of str, optional (default=None) List of 'metric_id' to use in optimizer scoring. Known 'metric_id' will be resolved via `objects` or sklearn built-in, otherwise raise ``KeyError``. If None, 'accuracy' or 'r2' depends on pipeline estimator type. hp_grid : dict, optional (default=None) Hyper-parameters to grid search: {`hp_name`: optimizer format}. If None, {}. resolver : :class:`mlshell.model_selection.Resolver`, optional (default=None) If hp value = ['auto'] in ``hp_grid``, hp will be resolved via ``resolver.resolve()``. Auto initialized if class provided. If None, :class:`mlshell.model_selection.Resolver` used. optimizer : :class:`mlshell.model_selection.Optimizer``, optional (default=None) Class to optimize ``hp_grid``. Will be called ``optimizer(pipeline, hp_grid, scoring, **gs_params).fit(x, y, **fit_params)``. If None, :class:`mlshell.model_selection.RandomizedSearchOptimizer` . dirpath : str, optional (default=None) Absolute path to the dump result 'runs' dir or relative to 'project__path' started with './'. If None, "project__path /results/runs" is used. See Notes for runs description. resolve_params : dict, optional (default=None) Additional kwargs to pass in ``resolver.resolve(*args, **resolve_params[hp_name])`` . If None, {}. fit_params : dict, optional (default=None) Additional kwargs to pass in ``optimizer.fit(*args, **fit_params)``. If None, {}. gs_params : dict, optional (default=None) Additional kwargs to ``optimizer(pipeline, hp_grid, scoring, **gs_params)`` initialization. If None, {}. dump_params: dict, optional (default=None) Additional kwargs to pass in ``optimizer.dump_runs(**dump_params)``. If None, {}. Returns ------- res : dict Input`s key added/updated: { 'runs': dict Storage of optimization results for pipeline-data pair. {'pipeline_id|dataset_id__subset_id': optimizer.update_best output} } Notes ----- Optimization flow: * Call grid search. ``optimizer(pipeline.pipeline, hp_grid, scoring, **gs_params) .fit(x, y, **fit_params)`` . * Call dump runs. ``optimizer.dump_runs(logger, dirpath, **dump_params)``, where each run = probing one hp combination. * Combine optimization results with previous for pipeline-data pair: ``optimizer.update_best(prev_runs)`` . * Upfate pipeline object in ``objects``. Onle if 'best_estimator_' in 'runs'. See Also -------- :class:`mlshell.model_selection.Resolver` : Hp resolver. :class:`mlshell.model_selection.Optimizer` : Hp optimizer. """ if hp_grid is None: hp_grid = {} if resolver is None: resolver = mlshell.model_selection.Resolver if inspect.isclass(resolver): resolver = resolver() if resolve_params is None: resolve_params = {} if fit_params is None: fit_params = {} if gs_params is None: gs_params = {} if optimizer is None: optimizer = mlshell.model_selection.RandomizedSearchOptimizer if dirpath is None: dirpath = f"{self.project_path}/results/runs" elif dirpath.startswith('./'): dirpath = f"{self.project_path}/{dirpath[2:]}" if not os.path.exists(dirpath): os.makedirs(dirpath) if dump_params is None: dump_params = {} pipeline = self.objects[pipeline_id] dataset = self.objects[dataset_id] # Resolve and set hp. Otherwise could be problem if hp_grid={}, as # pipeline initially could be unresolved. pipeline = self._set_hp( {}, pipeline, resolver, dataset, resolve_params) # Resolve hp_grid. hp_grid = self._resolve_hp( hp_grid, pipeline, resolver, dataset, resolve_params) msg = jsbeautifier.beautify(str(hp_grid)) self.logger.info(f"hp_grid:\n {msg}") # Resolve scoring. scoring = self._resolve_scoring(metric_id, pipeline) train = dataset.subset(subset_id) optimizer = optimizer(pipeline.pipeline, hp_grid, scoring, **gs_params) optimizer.fit(train.x, train.y, **fit_params) optimizer.dump_runs(self.logger, dirpath, pipeline, dataset, **dump_params) if 'runs' not in res: res['runs'] = {} runs = res['runs'] key = f"{pipeline_id}|{train.oid}" runs[key] = optimizer.update_best(runs.get(key, {})) if 'best_estimator_' in runs[key]: self.objects[pipeline_id].pipeline = runs[key].get( 'best_estimator_') self.objects[pipeline_id].dataset_id = train.oid return res
[docs] def validate(self, res, pipeline_id, dataset_id, metric_id, subset_id=('train', 'test'), validator=None): """Make and score prediction. Parameters ---------- res : dict For compliance with producer logic. pipeline_id : str Pipeline identifier in ``objects``. Will be validated on `dataset_id__subset_id`. dataset_id : str Dataset identifier in `objects`. subset_id : str,list/tuple of str, optional (default=('train', 'test')) Data subset(s) identifier(s) to validate on. '' for full dataset. metric_id : srt, list/tuple of str Metric(s) identifier in `objects`. validator : :class:`mlshell.model_selection.Validator`, optional (default=None) Auto initialized if class provided. If None, :class:`mlshell.model_selection.Validator` . Returns ------- res : dict Unchanged input, for compliance with producer logic. """ if validator is None: validator = mlshell.model_selection.Validator if inspect.isclass(validator): validator = validator() if not isinstance(metric_id, (list, tuple)): metric_id = [metric_id] if not isinstance(subset_id, (list, tuple)): subset_id = [subset_id] dataset = self.objects[dataset_id] pipeline = self.objects[pipeline_id] metrics = [self.objects[i] for i in metric_id] subsets = [dataset.subset(id_) for id_ in subset_id] validator.validate(pipeline, metrics, subsets, self.logger) return res
[docs] def dump(self, res, pipeline_id, dirpath=None, **kwargs): """Dump pipeline. Parameters ---------- res : dict For compliance with producer logic. pipeline_id : str Pipeline identifier in ``objects``. Will be dumped via ``pipeline.dump(**kwargs)`` . dirpath : str, optional(default=None) Absolute path to dump dir or relative to 'project__path' started with './'. If None,"project__path/results/models" is used. **kwargs: dict Additional kwargs to pass in ``pipeline.dump(**kwargs)`` . Returns ------- res : dict Unchanged input, for compliance with producer logic. Notes ----- Resulted filename includes prefix: ``workflow_id|pipeline_id|fit_dataset_id|best_score|pipeline_hash| fit_dataset_hash|os_type|timestamp``. fit_dataset_id = None if pipeline not fitted or hasn`t such attribute. The 'best_score' available after optimize step(s) only if optimizer supported. """ if dirpath is None: dirpath = f"{self.project_path}/results/models" elif dirpath.startswith('./'): dirpath = f"{self.project_path}/{dirpath[2:]}" if not os.path.exists(dirpath): os.makedirs(dirpath) pipeline = self.objects[pipeline_id] filepath = self._prefix(res, dirpath, pipeline, pipeline_id) fullpath = pipeline.dump(filepath, **kwargs) self.logger.log(25, f"Save fitted model to file:\n" f" {fullpath}") return res
[docs] def predict(self, res, pipeline_id, dataset_id, subset_id='', dirpath=None, **kwargs): """Make and dump prediction. Parameters ---------- res : dict For compliance with producer logic. pipeline_id : str Pipeline identifier in `objects` to make prediction on `dataset_id__subset_id`: ``pipeline.predict(subset.x)`` . dataset_id : str Dataset identifier in `objects`. subset_id : str, optional (default='test') Data subset identifier to predict on. If '', use full dataset. dirpath : str, optional (default=None) Absolute path to dump dir or relative to 'project__path' started with './'. If None, "project__path/results/models" is used. **kwargs: dict Additional kwargs to pass in ``dataset.dump_pred(**kwargs)`` . Returns ------- res : dict Unchanged input, for compliance with producer logic. Notes ----- Resulted filename includes prefix: ``workflow_id|pipeline_id|fit_dataset_id|best_score|pipeline_hash|fit_ dataset_hash|predict_dataset_id|predict_dataset_hash|os_type|timestamp`` fit_dataset_id = None if pipeline not fitted or hasn`t such attribute. The `best_score` available only after optimize step(s) if optimizer supported. """ if dirpath is None: dirpath = f"{self.project_path}/results/models" elif dirpath.startswith('./'): dirpath = f"{self.project_path}/{dirpath[2:]}" if not os.path.exists(dirpath): os.makedirs(dirpath) pipeline = self.objects[pipeline_id] dataset = self.objects[dataset_id] test = dataset.subset(subset_id) y_pred = pipeline.predict(test.x) filepath = self._prefix(res, dirpath, pipeline, pipeline_id, test, test.oid) fullpath = test.dump_pred(filepath, y_pred, **kwargs) self.logger.log(25, f"Save predictions for {dataset_id} to file:" f"\n {fullpath}") return res
[docs] def plot(self, res, pipeline_id, dataset_id, metric_id, validator=None, subset_id=('train', 'test'), plotter=None, **kwargs): """Plot metrics. Parameters ---------- res : dict For compliance with producer logic. pipeline_id : str Pipeline identifier in ``objects``. dataset_id : str Dataset identifier in ``objects``. subset_id : str,list/tuple of str, optional (default=('train', 'test')) Data subset(s) identifier(s) to plot on. Set '' for full dataset. metric_id : srt, list/tuple of str Metric(s) identifier in `objects`. validator : :class:`mlshell.model_selection.Validator`, optional (default=None) Auto initialized if class provided. If None, :class:`mlshell.model_selection.Validator` . plotter : :class:`mlshell.plot.Plotter`, optional (default=None) Auto initialized if class provided. If None, :class:`mlshell.plot.Plotter` . **kwargs: dict Additional kwargs to pass in ``plotter.plot(**kwargs)`` . Returns ------- res : dict Unchanged input, for compliance with producer logic. See Also -------- :class:`mlshell.plot.Plotter` : Metric plotter. """ if validator is None: validator = mlshell.model_selection.Validator if inspect.isclass(validator): validator = validator() if plotter is None: plotter = mlshell.plot.Plotter if inspect.isclass(plotter): plotter = plotter() if not isinstance(metric_id, (list,tuple)): metric_id = [metric_id] if not isinstance(subset_id, (list, tuple)): subset_id = [subset_id] dataset = self.objects[dataset_id] pipeline = self.objects[pipeline_id] metrics = [self.objects[i] for i in metric_id] subsets = {id_: dataset.subset(id_) for id_ in subset_id} runs = res.get('runs', {}) subruns = {id_: runs.get(f"{pipeline_id}|{dataset_id}|{id_}", {}) for id_ in subset_id} args = (subruns, pipeline, metrics, subsets, validator, self.logger) thread = threading.Thread(target=plotter.plot, args=args, kwargs=kwargs, daemon=False) thread.start() return res
# ========================== init ================================= def _optional(self): # Turn on: inf as NaN. pd.options.mode.use_inf_as_na = True # Handle numpy errors. np.seterr(all='call') self._check_results_size(self.project_path) self._np_error_stat = {} np.seterrcall(self._np_error_callback) def _check_results_size(self, project_path): root_directory = pathlib.Path(f"{project_path}/results") size = sum(f.stat().st_size for f in root_directory.glob('**/*') if f.is_file()) size_mb = size/(2**30) n = 5 # Check if dir > n Mb. if size_mb > n: self.logger.warning(f"Warning: results/ directory size " f"{size_mb:.2f}Gb more than {n}Gb") def _np_error_callback(self, *args): """Numpy errors handler, count errors by type.""" if args[0] in self._np_error_stat.keys(): self._np_error_stat[args[0]] += 1 else: self._np_error_stat[args[0]] = 1 # ========================== fit/optimize ================================= def _set_hp(self, hp, pipeline, resolver, dataset, resolve_params): """Get => update => resolve => set pipeline hp.""" _hp_full = pipeline.get_params() _hp_full.update(self._get_zero_position(hp)) hps = self._resolve_hp(_hp_full, pipeline, resolver, dataset, resolve_params) pipeline.set_params(**hps) return pipeline def _get_zero_position(self, hp): """Get zero position if hp_grid provided. Notes ----- In case of generator/iterator in hp value, hp_grid changes will be irreversible. """ # Get zero position params from hp. zero_hp = {} for name, vals in hp.items(): # Check if not distribution in hp. if hasattr(type(vals), '__iter__'): # Container type. iterator = iter(vals) zero_hp.update(**{name: iterator.__next__()}) else: zero_hp[name] = vals return zero_hp def _resolve_hp(self, hp, pipeline, resolver, dataset, resolve_params): """Resolve hyper-parameter based on dataset value. For example, categorical features indices are dataset dependent. Resolve lets to set it before fit/optimize step. Parameters ---------- hp : dict {hp_name: val/container}. If val=='auto'/['auto'] hp will be resolved. pipeline : :class:`mlshell.Pipeline` Pipeline, passed to `resolver` resolver : :class:`mlshell.model_selection.Resolver` Interface to resolve hp. dataset : :class:`mlshell.Dataset` Dataset, passed to `resolver`. **resolve_params: : dict Additional parameters to pass in ``resolver.resolve(*arg, **resolve_params['hp_name'])`` for specific hp: {hp_name: kwargs}. Returns ------- hp: dict Resolved input hyper-parameters. """ for hp_name, val in hp.items(): if val == 'auto' or val == ['auto']: kwargs = resolve_params.get(hp_name, {}) value = resolver.resolve(hp_name, val, pipeline, dataset, **kwargs) hp[hp_name] = [value] if val == ['auto'] else value return hp def _resolve_scoring(self, metric_id, pipeline): """Resolve scoring for grid search. Notes ----- If None, 'accuracy' or 'r2' depends on estimator type. If list, resolve known metric id via `objects` and sklearn built-in, otherwise passed unchanged. """ if metric_id is None: # Hard-code (default estimator could not exist). if pipeline.is_classifier(): scoring = 'accuracy' elif pipeline.is_regressor(): scoring = 'r2' else: assert False, "Unknown pipeline type." else: if not isinstance(metric_id, (list, tuple)): metric_id = [metric_id] # Resolve if exist, else use sklearn built-in. scoring = {i: self.objects[i] if i in self.objects else sklearn.metrics.SCORERS[i] for i in metric_id} return scoring # ========================== dump/predict ================================= def _prefix(self, res, dirpath, pipeline, pipeline_id, pred_dataset=None, pred_dataset_id='', add_hash=False): """Generate informative file prefix. Parameters ---------- add_hash : bool, optional (default=False) Insert pipeline/dataset(s) hashes into prefix. """ pred_dataset_hash = None if pred_dataset is None else hash(pred_dataset) fit_dataset_id = getattr(pipeline, 'dataset_id', None) # Could be dataset a__b or subset a__b__c. tmp = f"{fit_dataset_id}__".split('__') fit_dataset = self.objects.get('__'.join(tmp[0:2]), None) if fit_dataset is None: fit_dataset_hash = None else: fit_dataset_hash = hash(fit_dataset.subset(tmp[2])) best_score = str(res.get('runs', {}) .get(f"{pipeline_id}|{fit_dataset_id}", {}) .get('best_score_', '') ).lower() if add_hash: filepath = f"{dirpath}/{self.oid}|{pipeline_id}|{fit_dataset_id}|" \ f"{best_score}|{hash(pipeline)}|{fit_dataset_hash}|" \ f"{pred_dataset_id}|{pred_dataset_hash}|" \ f"{platform.system()}|{int(time.time())}" else: filepath = f"{dirpath}/{self.oid}|{pipeline_id}|{fit_dataset_id}|" \ f"{best_score}|{pred_dataset_id}|" \ f"{platform.system()}|{int(time.time())}" return filepath
if __name__ == '__main__': pass