跳到内容

估计器工具

此文件是 TPOT 库的一部分。

TPOT 当前版本由 Cedars-Sinai 开发,开发人员包括: - Pedro Henrique Ribeiro (https://github.com/perib, https://www.linkedin.com/in/pedro-ribeiro/) - Anil Saini (anil.saini@cshs.org) - Jose Hernandez (jgh9094@gmail.com) - Jay Moran (jay.moran@cshs.org) - Nicholas Matsumoto (nicholas.matsumoto@cshs.org) - Hyunjun Choi (hyunjun.choi@cshs.org) - Gabriel Ketron (gabriel.ketron@cshs.org) - Miguel E. Hernandez (miguel.e.hernandez@cshs.org) - Jason Moore (moorejh28@gmail.com)

TPOT 原始版本主要由宾夕法尼亚大学开发,开发人员包括: - Randal S. Olson (rso@randalolson.com) - Weixuan Fu (weixuanf@upenn.edu) - Daniel Angell (dpa34@drexel.edu) - Jason Moore (moorejh28@gmail.com) - 以及更多慷慨的开源贡献者

TPOT 是免费软件:您可以在自由软件基金会发布的 GNU 宽通用公共许可证(版本 3 或您选择的任何更高版本)的条款下重新分发和/或修改它。

分发 TPOT 是希望它有用,但没有任何担保;甚至不包括适销性或特定用途适用性的默示担保。详情请参阅 GNU 宽通用公共许可证。

您应该已收到 TPOT 附带的 GNU 宽通用公共许可证副本。如果未收到,请参阅 https://gnu.ac.cn/licenses/

apply_make_pipeline(ind, preprocessing_pipeline=None, export_graphpipeline=False, **pipeline_kwargs)

帮助函数,用于从 tpot individual 类创建一列 sklearn 流水线。

参数

名称 类型 描述 默认值
ind

要转换为流水线的个体。

必需
preprocessing_pipeline

在个体流水线之前包含的预处理流水线。

None
export_graphpipeline

强制将流水线导出为图流水线。将所有嵌套流水线、FeatureUnions 和 GraphPipelines 展平为单个 GraphPipeline。

False
pipeline_kwargs

要传递给 export_pipeline 或 export_flattened_graphpipeline 方法的关键字参数。

{}

返回值

类型 描述
sklearn 估计器
源代码位于 tpot/tpot_estimator/estimator_utils.py
def apply_make_pipeline(ind, preprocessing_pipeline=None, export_graphpipeline=False, **pipeline_kwargs):
    """
    Helper function to create a column of sklearn pipelines from the tpot individual class.

    Parameters
    ----------
    ind: tpot.SklearnIndividual
        The individual to convert to a pipeline.
    preprocessing_pipeline: sklearn.pipeline.Pipeline, optional
        The preprocessing pipeline to include before the individual's pipeline.
    export_graphpipeline: bool, default=False
        Force the pipeline to be exported as a graph pipeline. Flattens all nested pipelines, FeatureUnions, and GraphPipelines into a single GraphPipeline.
    pipeline_kwargs: dict
        Keyword arguments to pass to the export_pipeline or export_flattened_graphpipeline method.

    Returns
    -------
    sklearn estimator
    """

    try:

        if export_graphpipeline:
            est = ind.export_flattened_graphpipeline(**pipeline_kwargs)
        else:
            est = ind.export_pipeline(**pipeline_kwargs)


        if preprocessing_pipeline is None:
            return est
        else:
            return sklearn.pipeline.make_pipeline(sklearn.base.clone(preprocessing_pipeline), est)
    except:
        return None

check_if_y_is_encoded(y)

检查目标 y 是否由从 0 到 N 的连续整数组成。XGBoost 要求目标以此方式编码。

参数

名称 类型 描述 默认值
y

目标向量。

必需

返回值

类型 描述
布尔值

如果目标编码为从 0 到 N 的连续整数则为 True,否则为 False

源代码位于 tpot/tpot_estimator/estimator_utils.py
def check_if_y_is_encoded(y):
    '''
    Checks if the target y is composed of sequential ints from 0 to N.
    XGBoost requires the target to be encoded in this way.

    Parameters
    ----------
    y: np.ndarray
        The target vector.

    Returns
    -------
    bool
        True if the target is encoded as sequential ints from 0 to N, False otherwise
    '''
    y = sorted(set(y))
    return all(i == j for i, j in enumerate(y))

convert_parents_tuples_to_integers(row, object_to_int)

帮助函数,用于将父行转换为表示父行在种群中索引的整数。

原始 pandas 数据框,为父行使用了自定义索引。此函数将自定义索引转换为整数索引,以便最终用户更方便地操作。

参数

名称 类型 描述 默认值
row

要转换的行。

必需
object_to_int

将对象映射到整数索引的字典。

必需
返回值

元组 带有已转换为整数索引的自定义索引的行。

源代码位于 tpot/tpot_estimator/estimator_utils.py
def convert_parents_tuples_to_integers(row, object_to_int):
    """
    Helper function to convert the parent rows into integers representing the index of the parent in the population.

    Original pandas dataframe using a custom index for the parents. This function converts the custom index to an integer index for easier manipulation by end users.

    Parameters
    ----------
    row: list, np.ndarray, tuple
        The row to convert.
    object_to_int: dict
        A dictionary mapping the object to an integer index.

    Returns 
    -------
    tuple
        The row with the custom index converted to an integer index.
    """
    if type(row) == list or type(row) == np.ndarray or type(row) == tuple:
        return tuple(object_to_int[obj] for obj in row)
    else:
        return np.nan

objective_function_generator(pipeline, x, y, scorers, cv, other_objective_functions, step=None, budget=None, is_classification=True, export_graphpipeline=False, **pipeline_kwargs)

使用交叉验证评估流水线,并将其结果与来自独立的其他目标函数的得分连接起来。

参数

名称 类型 描述 默认值
pipeline

要评估的个体。

必需
x

特征矩阵。

必需
y

目标向量。

必需
scorers

用于交叉验证的评分器。

必需
cv

要使用的交叉验证器。例如,sklearn.model_selection.KFold 或 sklearn.model_selection.StratifiedKFold。如果为整数,则将使用 n_splits=cv 的 sklearn.model_selection.KFold。

必需
other_objective_functions

用于评估流水线的独立目标函数列表。签名格式为 obj(pipeline) -> float 或 obj(pipeline) -> np.ndarray 这些函数接受未拟合的估计器。

必需
step

要返回分数的折叠。如果为 None,将返回所有分数的平均值(按评分器)。默认值为 None。

None
budget

用于子采样数据的预算。如果为 None,将使用完整数据集。默认值为 None。将子采样 budget*len(x) 个样本。

None
is_classification

如果为 True,将对子采样进行分层。默认值为 True。

True
export_graphpipeline

强制将流水线导出为图流水线。将所有嵌套的 sklearn 流水线、FeatureUnions 和 GraphPipelines 展平为单个 GraphPipeline。

False
pipeline_kwargs

要传递给 export_pipeline 或 export_flattened_graphpipeline 方法的关键字参数。

{}

返回值

类型 描述
ndarray

流水线的连接分数。前 len(scorers) 个元素是交叉验证分数,其余元素是独立的站立目标函数。

源代码位于 tpot/tpot_estimator/estimator_utils.py
def objective_function_generator(pipeline, x,y, scorers, cv, other_objective_functions, step=None, budget=None, is_classification=True, export_graphpipeline=False, **pipeline_kwargs):
    """
    Uses cross validation to evaluate the pipeline using the scorers, and concatenates results with scores from standalone other objective functions.

    Parameters
    ----------
    pipeline: tpot.SklearnIndividual
        The individual to evaluate.
    x: np.ndarray
        The feature matrix.
    y: np.ndarray
        The target vector.
    scorers: list
        The scorers to use for cross validation. 
    cv: int, float, or sklearn cross-validator
        The cross-validator to use. For example, sklearn.model_selection.KFold or sklearn.model_selection.StratifiedKFold.
        If an int, will use sklearn.model_selection.KFold with n_splits=cv.
    other_objective_functions: list
        A list of standalone objective functions to evaluate the pipeline. With signature obj(pipeline) -> float. or obj(pipeline) -> np.ndarray
        These functions take in the unfitted estimator.
    step: int, optional
        The fold to return the scores for. If None, will return the mean of all the scores (per scorer). Default is None.
    budget: float, optional
        The budget to subsample the data. If None, will use the full dataset. Default is None.
        Will subsample budget*len(x) samples.
    is_classification: bool, default=True
        If True, will stratify the subsampling. Default is True.
    export_graphpipeline: bool, default=False
        Force the pipeline to be exported as a graph pipeline. Flattens all nested sklearn pipelines, FeatureUnions, and GraphPipelines into a single GraphPipeline.
    pipeline_kwargs: dict
        Keyword arguments to pass to the export_pipeline or export_flattened_graphpipeline method.

    Returns
    -------
    np.ndarray
        The concatenated scores for the pipeline. The first len(scorers) elements are the cross validation scores, and the remaining elements are the standalone objective functions.

    """

    if export_graphpipeline:
        pipeline = pipeline.export_flattened_graphpipeline(**pipeline_kwargs)
    else:
        pipeline = pipeline.export_pipeline(**pipeline_kwargs)

    if budget is not None and budget < 1:
        if is_classification:
            x,y = sklearn.utils.resample(x,y, stratify=y, n_samples=int(budget*len(x)), replace=False, random_state=1)
        else:
            x,y = sklearn.utils.resample(x,y, n_samples=int(budget*len(x)), replace=False, random_state=1)

        if isinstance(cv, int) or isinstance(cv, float):
            n_splits = cv
        else:
            n_splits = cv.n_splits

    if len(scorers) > 0:
        cv_obj_scores = cross_val_score_objective(sklearn.base.clone(pipeline),x,y,scorers=scorers, cv=cv , fold=step)
    else:
        cv_obj_scores = []

    if other_objective_functions is not None and len(other_objective_functions) >0:
        other_scores = [obj(sklearn.base.clone(pipeline)) for obj in other_objective_functions]
        #flatten
        other_scores = np.array(other_scores).flatten().tolist()
    else:
        other_scores = []

    return np.concatenate([cv_obj_scores,other_scores])

remove_underrepresented_classes(x, y, min_count)

帮助函数,用于从数据集中移除样本数少于 min_count 的类别。

参数

名称 类型 描述 默认值
x

特征矩阵。

必需
y

目标向量。

必需
min_count

保留类别的最小样本数。

必需

返回值

类型 描述
(ndarray, ndarray)

特征矩阵和目标向量,其中样本数少于 min_count 的类别的行已被移除。

源代码位于 tpot/tpot_estimator/estimator_utils.py
def remove_underrepresented_classes(x, y, min_count):
    """
    Helper function to remove classes with less than min_count samples from the dataset.

    Parameters
    ----------
    x: np.ndarray or pd.DataFrame
        The feature matrix.
    y: np.ndarray or pd.Series
        The target vector.
    min_count: int
        The minimum number of samples to keep a class.

    Returns
    -------
    np.ndarray, np.ndarray
        The feature matrix and target vector with rows from classes with less than min_count samples removed.
    """
    if isinstance(y, (np.ndarray, pd.Series)):
        unique, counts = np.unique(y, return_counts=True)
        if min(counts) >= min_count:
            return x, y
        keep_classes = unique[counts >= min_count]
        mask = np.isin(y, keep_classes)
        x = x[mask]
        y = y[mask]
    elif isinstance(y, pd.DataFrame):
        counts = y.apply(pd.Series.value_counts)
        if min(counts) >= min_count:
            return x, y
        keep_classes = counts.index[counts >= min_count].tolist()
        mask = y.isin(keep_classes).all(axis=1)
        x = x[mask]
        y = y[mask]
    else:
        raise TypeError("y must be a numpy array or a pandas Series/DataFrame")
    return x, y

val_objective_function_generator(pipeline, X_train, y_train, X_test, y_test, scorers, other_objective_functions, export_graphpipeline=False, **pipeline_kwargs)

在训练集上训练流水线,并使用评分器和其他目标函数在测试集上评估它。

参数

名称 类型 描述 默认值
pipeline

要评估的个体。

必需
X_train

训练集的特征矩阵。

必需
y_train

训练集的目标向量。

必需
X_test

测试集的特征矩阵。

必需
y_test

测试集的目标向量。

必需
scorers

用于交叉验证的评分器。

必需
other_objective_functions

用于评估流水线的独立目标函数列表。签名格式为 obj(pipeline) -> float 或 obj(pipeline) -> np.ndarray 这些函数接受未拟合的估计器。

必需
export_graphpipeline

强制将流水线导出为图流水线。将所有嵌套的 sklearn 流水线、FeatureUnions 和 GraphPipelines 展平为单个 GraphPipeline。

False
pipeline_kwargs

要传递给 export_pipeline 或 export_flattened_graphpipeline 方法的关键字参数。

{}

返回值

类型 描述
ndarray

流水线的连接分数。前 len(scorers) 个元素是交叉验证分数,其余元素是独立的站立目标函数。

源代码位于 tpot/tpot_estimator/estimator_utils.py
def val_objective_function_generator(pipeline, X_train, y_train, X_test, y_test, scorers, other_objective_functions, export_graphpipeline=False, **pipeline_kwargs):
    """
    Trains a pipeline on a training set and evaluates it on a test set using the scorers and other objective functions.

    Parameters
    ----------

    pipeline: tpot.SklearnIndividual
        The individual to evaluate.
    X_train: np.ndarray
        The feature matrix of the training set.
    y_train: np.ndarray
        The target vector of the training set.
    X_test: np.ndarray
        The feature matrix of the test set.
    y_test: np.ndarray
        The target vector of the test set.
    scorers: list
        The scorers to use for cross validation.
    other_objective_functions: list
        A list of standalone objective functions to evaluate the pipeline. With signature obj(pipeline) -> float. or obj(pipeline) -> np.ndarray
        These functions take in the unfitted estimator.
    export_graphpipeline: bool, default=False
        Force the pipeline to be exported as a graph pipeline. Flattens all nested sklearn pipelines, FeatureUnions, and GraphPipelines into a single GraphPipeline.
    pipeline_kwargs: dict
        Keyword arguments to pass to the export_pipeline or export_flattened_graphpipeline method.

    Returns
    -------
    np.ndarray
        The concatenated scores for the pipeline. The first len(scorers) elements are the cross validation scores, and the remaining elements are the standalone objective functions.


    """

    #subsample the data
    if export_graphpipeline:
        pipeline = pipeline.export_flattened_graphpipeline(**pipeline_kwargs)
    else:
        pipeline = pipeline.export_pipeline(**pipeline_kwargs)

    fitted_pipeline = sklearn.base.clone(pipeline)
    fitted_pipeline.fit(X_train, y_train)

    if len(scorers) > 0:
        scores =[sklearn.metrics.get_scorer(scorer)(fitted_pipeline, X_test, y_test) for scorer in scorers]

    other_scores = []
    if other_objective_functions is not None and len(other_objective_functions) >0:
        other_scores = [obj(sklearn.base.clone(pipeline)) for obj in other_objective_functions]

    return np.concatenate([scores,other_scores])