Source code for mlshell.producers.dataset

"""
The :mod:`mlshell.producers.dataset` contains examples of `Dataset` class for
empty data object creation and `DataProducer` class for filling it.

:class:`mlshell.Dataset` proposes unified interface to interact with underlying
data. Intended to be used in :class:`mlshell.Workflow`. For new data formats
no need to edit `Workflow` class, adapt `Dataset` in compliance to interface.
Current realization based on dictionary.

:class:`mlshell.DataProducer` specifies methods divided for convenience on:

* :class:`mlshell.DataIO` defining IO related methods.
Currently reading from csv-file implemented.

* :class:`mlshell.DataPreprocessor` preprocessing data to final state.
Implemented data transformation in compliance to `Dataset` class, also common
exploration techniques available.

"""


import copy
import os

import jsbeautifier
import numpy as np
import pandas as pd
import pycnfg
import sklearn
import tabulate

__all__ = ['Dataset', 'DataIO', 'DataPreprocessor', 'DatasetProducer']


[docs]class Dataset(dict): """Unified data interface. Implements interface to access arbitrary data. Interface: x, y, data, meta, subset, dump_pred and whole dict api. Parameters ---------- *args : list Passed to parent class constructor. **kwrags : dict Passed to parent class constructor. Attributes ---------- data : :class:`pandas.DataFrame` Underlying data. subsets : dict {'subset_id' : array-like subset indices, ..}. meta : dict Extracted auxiliary information from data: { 'index': list List of index column label(s). 'features': list List of feature column label(s). 'categoric_features': list List of categorical feature column label(s). 'targets': list List of target column label(s), 'indices': list List of rows indices. 'classes': list of :class:`numpy.ndarray` List of sorted unique labels for each target(s) (n_outputs, n_classes). 'pos_labels': list List of "positive" label(s) for target(s) (n_outputs,). 'pos_labels_ind': list List of "positive" label(s) index in :func:`numpy.unique` for target(s) (n_outputs). categoric_ind_name : dict Dictionary with categorical feature indices as key, and tuple ('feature_name', categories) as value: {'column_index': ('feature_name', ['cat1', 'cat2'])}. numeric_ind_name : dict Dictionary with numeric features indices as key, and tuple ('feature_name', ) as value: {'columns_index':('feature_name',)}. } Notes ----- Inherited from dict class, so attributes section describes keys. """ _required_parameters = []
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
def __hash__(self): return hash(pd.util.hash_pandas_object(self['data']).sum()) @property def oid(self): """str: Dataset identifier.""" return self['_oid'] @oid.setter def oid(self, value): self['_oid'] = value @property def x(self): """:class:`pandas.DataFrame` : Extracted features columns.""" df = self['data'] meta = self['meta'] return df.loc[:, meta['features']] @property def y(self): """:class:`pandas.DataFrame` : Extracted targets columns.""" df = self['data'] meta = self['meta'] # return df[meta['targets']].values res = df.loc[:, meta['targets']].values.ravel() \ if len(meta['targets']) == 1 else df.loc[:, meta['targets']].values return res @property def meta(self): """dict: Access meta.""" return self['meta'] @property def data(self): """:class:`pandas.DataFrame` : Access data.""" return self['data']
[docs] def subset(self, subset_id): """:class:`mlshell.Dataset` : Access subset. """ if subset_id is '': return self df = self['data'] index = self['subsets'][subset_id] # subset of meta['inices'] # Inherit only meta (except indices). # dict(self) will inherit by ref. dataset = Dataset(**{ 'meta': copy.deepcopy(self.meta), 'data': df.loc[index], 'subsets': {}, '_oid': f"{self['_oid']}__{subset_id}"}) # Update indices in meta. dataset.meta['indices'] = index # if reset_index: np.array(dataset.meta['indices'])[index].tolist() return dataset
[docs] def dump_pred(self, filepath, y_pred, **kwargs): """Dump columns to disk. Parameters ---------- filepath: str File path without extension. y_pred: array-like pipeline.predict() result. **kwargs: dict ` Additional kwargs to pass in .to_csv(**kwargs). Returns ------- fullpath : str Full filepath. """ meta = self.meta # Recover original index and names. dic = dict(zip( meta['targets'], [y_pred] if len(meta['targets']) == 1 else np.array(y_pred).T )) obj = pd.DataFrame(index=meta['indices'], data=dic).rename_axis(meta['index'], axis=0) fullpath = f"{filepath}_pred.csv" if "PYTEST_CURRENT_TEST" in os.environ: if 'float_format' not in kwargs: kwargs['float_format'] = '%.8f' with open(fullpath, 'w', newline='') as f: obj.to_csv(f, mode='w', header=True, index=True, sep=',', line_terminator='\n', **kwargs) return fullpath
class DataIO(object): """Get raw data from database. Interface: load. Parameters ---------- project_path: str. Absolute path to current project dir. logger : :class:`logging.Logger` Logger. """ _required_parameters = ['project_path', 'logger'] def __init__(self, project_path, logger): self.logger = logger self.project_path = project_path def load(self, dataset, filepath, key='data', random_skip=False, random_state=None, **kwargs): """Load data from csv-file. Parameters ---------- dataset : :class:`mlshell.Dataset` Template for dataset. filepath : str Absolute path to csv file or relative to 'project__path' started with './'. key : str, optional (default='data') Loaded data identifier to add in dataset dictionary. Useful when load multiple files and combine them in separate step under 'data'. random_skip : bool, optional (default=False) If True randomly skip rows while read file, remain 'nrow' lines. Rewrite `skiprows` kwarg. random_state : int, optional (default=None). Fix random state for `random_skip`. **kwargs : dict Additional parameter passed to the :func:`pandas.read_csv()` . Returns ------- dataset : :class:`mlshell.Dataset` Key added: {'data': :class:`pandas.DataFrame` ,}. Notes: ------ If `nrow` > lines in file, auto set to None. """ if filepath.startswith('./'): filepath = "{}/{}".format(self.project_path, filepath[2:]) # Count lines. with open(filepath, 'r') as f: lines = sum(1 for _ in f) if 'skiprows' in kwargs and random_skip: self.logger.warning("random_skip rewrite skiprows kwarg.") nrows = kwargs.get('nrows', None) skiprows = kwargs.get('skiprows', None) if nrows: if nrows > lines: nrows = None elif random_skip: # skiprows index start from 0. # If no headers, returns nrows+1. random_state = sklearn.utils.check_random_state(random_state) skiprows = random_state.choice(range(1, lines), size=lines - nrows - 1, replace=False, p=None) kwargs['skiprows'] = skiprows kwargs['nrows'] = nrows with open(filepath, 'r') as f: raw = pd.read_csv(f, **kwargs) self.logger.info("Data loaded from:\n {}".format(filepath)) dataset[key] = raw return dataset class DataPreprocessor(object): """Transform raw data in compliance with `Dataset` class. Interface: preprocess, info, split. Parameters ---------- project_path: str. Absolute path to current project dir. logger : :class:`logging.Logger` Logger. """ _required_parameters = ['project_path', 'logger'] def __init__(self, project_path, logger): self.logger = logger self.project_path = project_path def preprocess(self, dataset, targets_names, features_names=None, categor_names=None, pos_labels=None, **kwargs): """Preprocess raw data. Parameters ---------- dataset : :class:`mlshell.Dataset` Raw dataset: {'data': :class:`pandas.DataFrame` }. targets_names: list List of targets columns names in raw dataset. Even if no exist, will be used to name predictions in ``dataset.dump_pred`` . features_names: list, optional (default=None) List of features columns names in raw dataset. If None, all except targets. categor_names: list, optional (default=None) List of categorical features(also binary) identifiers in raw dataset. If None, empty list. pos_labels: list, optional (default=None) Classification only, list of "positive" label(s) in target(s). Could be used in :func:`sklearn.metrics.roc_curve` for threshold analysis and metrics evaluation if classifier supports ``predict_proba``. If None, for each target last label in :func:`numpy.unique` is used . For regression set [] to prevent evaluation. **kwargs : dict Additional parameters to add in dataset. Returns ------- dataset : :class:`mlshell.Dataset` Resulted dataset. Key updated: 'data'. Keys added: 'subsets': dict Storage for data subset(s) indices (filled in split method) {'subset_id': indices}. 'meta' : dict Extracted auxiliary information from data: { 'index': list List of index column label(s). 'features': list List of feature column label(s). 'categoric_features': list List of categorical feature column label(s). 'targets': list List of target column label(s), 'indices': list List of rows indices. 'classes': list of :class:`numpy.ndarray` List of sorted unique labels for each target(s) (n_outputs, n_classes). 'pos_labels': list List of "positive" label(s) for target(s) (n_outputs,). 'pos_labels_ind': list List of "positive" label(s) index in :func:`numpy.unique` for target(s) (n_outputs). categoric_ind_name : dict Dictionary with categorical feature indices as key, and tuple ('feature_name', categories) as value: {'column_index': ('feature_name', ['cat1', 'cat2'])}. numeric_ind_name : dict Dictionary with numeric features indices as key, and tuple ('feature_name', ) as value: {'columns_index': ('feature_name',)}. } Notes ----- Don`t change dataframe shape or index/columns names after ``meta`` generating. Features columns unified: * Fill gaps. * If gap in categorical => set 'unknown'. * If gap in non-categorical => set np.nan. * Cast categorical features to str dtype, and apply Ordinal encoder. * Cast values to np.float64. """ raw = dataset['data'] if categor_names is None: categor_names = [] if features_names is None: features_names = [c for c in raw.columns if c not in targets_names] for i in (targets_names, features_names, categor_names): if not isinstance(i, list): raise TypeError(f"{i} should be a list.") index = raw.index targets_df, raw_info_targets =\ self._process_targets(raw, targets_names, pos_labels) features_df, raw_info_features =\ self._process_features(raw, features_names, categor_names) data = self._combine(index, targets_df, features_df) meta = { 'index': index.name, 'indices': list(index), 'targets': targets_names, 'features': list(features_names), 'categoric_features': categor_names, **raw_info_features, **raw_info_targets, } self.logger.debug(f"Dataset meta:\n {meta}") dataset.update({'data': data, 'meta': meta, 'subsets': {}, **kwargs}) return dataset def info(self, dataset, **kwargs): """Log dataset info. Check: * duplicates. * gaps. Parameters ---------- dataset : :class:`mlshell.Dataset` Dataset to explore. **kwargs : dict Additional parameters to pass in low-level functions. Returns ------- dataset : :class:`mlshell.Dataset` For compliance with producer logic. """ self._check_duplicates(dataset['data'], **kwargs) self._check_gaps(dataset['data'], **kwargs) return dataset def split(self, dataset, **kwargs): """Split dataset on train, test. Parameters ---------- dataset : :class:`mlshell.Dataset` Dataset to unify. **kwargs : dict Additional parameters to pass in: :func:`sklearn.model_selection.train_test_split` . Returns ------- dataset : :class:`mlshell.Dataset` Resulted dataset. 'subset' value updated: {'train': array-like train rows indices, 'test': array-like test rows indices,} Notes ----- If split ``train_size==1.0`` or ``test_size==0``: ``test=train`` , other kwargs ignored. No copy takes place. """ if 'test_size' not in kwargs: kwargs['test_size'] = None if 'train_size' not in kwargs: kwargs['train_size'] = None data = dataset['data'] if (kwargs['train_size'] == 1.0 and kwargs['test_size'] is None or kwargs['train_size'] is None and kwargs['test_size'] == 0): # train = test = data train_index = test_index = data.index else: train, test, train_index, test_index = \ sklearn.model_selection.train_test_split( data, data.index.values, **kwargs) # Add to dataset. dataset['subsets'].update({'train': train_index, 'test': test_index}) return dataset # ============================== preprocess =============================== def _process_targets(self, raw, target_names, pos_labels): """Targets preprocessing.""" try: targets_df = raw[target_names] except KeyError: self.logger.warning(f"No target column(s) found in df:\n" f" {target_names}") targets_df = pd.DataFrame() targets_df, classes, pos_labels, pos_labels_ind =\ self._unify_targets(targets_df, pos_labels) # targets = targets_df.values raw_info_targets = { 'classes': classes, 'pos_labels': pos_labels, 'pos_labels_ind': pos_labels_ind, } return targets_df, raw_info_targets def _process_features(self, raw, features_names, categor_names): """Features preprocessing.""" features_df = raw[features_names] features_df, categoric_ind_name, numeric_ind_name \ = self._unify_features(features_df, categor_names) # features = features_df.values raw_info_features = { 'categoric_ind_name': categoric_ind_name, 'numeric_ind_name': numeric_ind_name, } return features_df, raw_info_features def _combine(self, index, targets_df, features_df): """Combine preprocessed sub-data.""" # targets_df empty dataframe or None is possible return pd.concat( [targets_df, features_df], axis=1, ) def _unify_targets(self, targets, pos_labels=None): """Unify input targets. Extract classes and positive label index (classification only). Parameters ---------- targets : :class:`pandas.DataFrame` Data to unify. pos_labels: list, optional (default=None) Classification only, list of "positive" labels for targets. Could be used for threshold analysis (roc_curve) and metrics evaluation if classifiers supported predict_proba. If None, last label in :func:`numpy.unique` for each target used. For regression set [] to prevent evaluation. Returns ------- targets: :class:`pandas.DataFrame` Unchanged input. classes: list of :class:`numpy.ndarray` List of sorted unique labels for target(s) (n_outputs, n_classes). pos_labels: list List of "positive" label(s) for target(s) (n_outputs,). pos_labels_ind: list List of "positive" label(s) index in :func:`numpy.unique` for target(s) (n_outputs,). """ # Regression. if isinstance(pos_labels, list) and not pos_labels: classes = [] pos_labels_ind = [] return targets, classes, pos_labels, pos_labels_ind # Classification. # Find classes, example: [array([1]), array([2, 7])]. classes = [np.unique(j) for i, j in targets.iteritems()] if pos_labels is None: n_targets = len(classes) pos_labels_ind = [len(classes[i]) - 1 for i in range(n_targets)] pos_labels = [classes[i][pos_labels_ind[i]] for i in range(n_targets)] # [2,4] else: # Find where pos_labels in sorted labels, example: [1, 0]. pos_labels_ind = [np.where(classes[i] == pos_labels[i])[0][0] for i in range(len(classes))] # Could be no target columns in new data. self.logger.debug( f"Labels {pos_labels} identified as positive for target(s):\n" f" when classifier supports predict_proba: prediction=" f"pos_label on sample, if P(pos_label) > classification " f"threshold.") return targets, classes, pos_labels, pos_labels_ind def _unify_features(self, features, categor_names): """Unify input features. Parameters ---------- features : :class:`pandas.DataFrame` Data to unify. categor_names: list List of categorical features (and binary) column names in features. Returns ------- features: :class:`pandas.DataFrame` Input updates: * fill gaps. if gap in categorical => fill 'unknown' if gap in non-categor => np.nan * cast categorical features to str dtype, and apply Ordinalencoder. * cast the whole featuresframe to np.float64. categoric_ind_name : dict {'column_index': ('feature_name', ['cat1', 'cat2'])} Dictionary with categorical feature indices as key, and tuple ('feature_name', categories) as value. numeric_ind_name : dict {'columns_index':('feature_name',)} Dictionary with numeric features indices as key, and tuple ('feature_name', ) as value. """ categoric_ind_name = {} numeric_ind_name = {} # Turn off: SettingWithCopy, excessive. pd.options.mode.chained_assignment = None for ind, column_name in enumerate(features): if column_name in categor_names: # Fill gaps with 'unknown', inplace unreliable (copy!). features.loc[:, column_name] = features[column_name]\ .fillna(value='unknown', method=None, axis=None, inplace=False, limit=None, downcast=None) # Cast dtype to str (copy!). features.loc[:, column_name] = features[column_name].astype(str) # Encode encoder = sklearn.preprocessing.\ OrdinalEncoder(categories='auto') features.loc[:, column_name] = encoder\ .fit_transform(features[column_name] .values.reshape(-1, 1)) # Generate {index: ('feature_id', ['B','A','C'])}. # tolist need for 'hr' cache dump. categoric_ind_name[ind] = (column_name, encoder.categories_[0].tolist()) else: # Fill gaps with np.nan, inplace unreliable (copy!). # Could work with no copy on slice or single col even inplace. features.loc[:, column_name] = features.loc[:, column_name]\ .fillna(value=np.nan, method=None, axis=None, inplace=False, downcast=None) # Generate {'index': ('feature_id',)}. numeric_ind_name[ind] = (column_name,) # Turn on: SettingWithCopy. pd.options.mode.chained_assignment = 'warn' # Cast to np.float64 without copy. # python float = np.float = C double = # np.float64 = np.double(64 bit processor)). # [alternative] sklearn.utils.as_float_array / assert_all_finite features = features.astype(np.float64, copy=False, errors='ignore') # Additional check. self._check_numeric_types(features, categor_names) return features, categoric_ind_name, numeric_ind_name def _check_numeric_types(self, data, categor_names): """Check that all non-categorical features are of numeric type.""" dtypes = data.dtypes misstype = [] for ind, column_name in enumerate(data): if column_name not in categor_names: if not np.issubdtype(dtypes[column_name], np.number): misstype.append(column_name) if misstype: raise ValueError(f"Input data non-categoric columns should be " f"subtype of np.number, check:\n" f" {misstype}") return None # ================================ info =================================== def _check_duplicates(self, data, del_duplicates=False): """Check duplicates rows in dataframe. Parameters ---------- data : :class:`pandas.DataFrame` Dataframe to check. del_duplicates : bool If True, delete rows with duplicated. If False, do nothing. Notes ----- Use del_duplicates=True only before generating dataset `meta`. """ # Duplicate rows index mask. mask = data.duplicated(subset=None, keep='first') dupl_n = np.sum(mask) if dupl_n: self.logger.warning(f"Warning: {dupl_n} duplicates rows found,\n" " see debug.log for details.") # Count unique duplicated rows. rows_count = data[mask].groupby(data.columns.tolist())\ .size().reset_index().rename(columns={0: 'count'}) rows_count.sort_values(by=['count'], axis=0, ascending=False, inplace=True) with pd.option_context('display.max_rows', None, 'display.max_columns', None): pprint = tabulate.tabulate(rows_count, headers='keys', tablefmt='psql') self.logger.debug(f"Duplicates found\n{pprint}") if del_duplicates: # Delete duplicates (without index reset). size_before = data.size data.drop_duplicates(keep='first', inplace=True) size_after = data.size if size_before - size_after != 0: self.logger.warning(f"Warning: delete duplicates rows " f"({size_before - size_after} values).") return None def _check_gaps(self, data, del_gaps=False, nogap_columns=None): """Check gaps in dataframe. Parameters ---------- data : :class:`pandas.DataFrame` Dataframe to check. del_gaps : bool, optional (default=False) If True, delete rows with gaps from `nongap_columns` list. If False, raise Exception when `nongap_columns` contain gaps. nogap_columns : list, optional (default=None) Columns where gaps are forbidden: ['column_1', ..]. if None, []. Notes ----- Use del_geps=True only before generating dataset `meta` (preprocess). """ if nogap_columns is None: nogap_columns = [] gaps_number = data.size - data.count().sum() columns_with_gaps_dic = {} if gaps_number > 0: for column_name in data: column_gaps_namber = data[column_name].size \ - data[column_name].count() if column_gaps_namber > 0: columns_with_gaps_dic[column_name] = column_gaps_namber self.logger.warning('Warning: gaps found: {} {:.3f}%,\n' ' see debug.log for details.' .format(gaps_number, gaps_number / data.size)) pprint = jsbeautifier.beautify(str(columns_with_gaps_dic)) self.logger.debug(f"Gaps per column:\n{pprint}") subset = [column_name for column_name in nogap_columns if column_name in columns_with_gaps_dic] if del_gaps and subset: # Delete rows with gaps in specified columns. data.dropna(axis=0, how='any', thresh=None, subset=[subset], inplace=True) elif subset: raise ValueError(f"Gaps in {subset}.") return None
[docs]class DatasetProducer(pycnfg.Producer, DataIO, DataPreprocessor): """Factory to produce dataset. Parameters ---------- objects : dict Dictionary with objects from previous executed producers: {'section_id__config__id', object,}. oid : str Unique identifier of produced object. path_id : str, optional (default='default') Project path identifier in `objects`. logger_id : str, optional (default='default') Logger identifier in `objects`. Attributes ---------- objects : dict Dictionary with objects from previous executed producers: {'section_id__config__id', object,}. oid : str Unique identifier of produced object. logger : :class:`logging.Logger` Logger. project_path: str Absolute path to project dir. """ _required_parameters = ['objects', 'oid', 'path_id', 'logger_id']
[docs] def __init__(self, objects, oid, path_id='path__default', logger_id='logger__default'): pycnfg.Producer.__init__(self, objects, oid, path_id=path_id, logger_id=logger_id) DataIO.__init__(self, self.project_path, self.logger) DataPreprocessor.__init__(self, self.project_path, self.logger)
if __name__ == '__main__': pass