此文件是 TPOT 库的一部分。
TPOT 当前版本由 Cedars-Sinai 的以下人员开发: - Pedro Henrique Ribeiro (https://github.com/perib, https://www.linkedin.com/in/pedro-ribeiro/) - Anil Saini (anil.saini@cshs.org) - Jose Hernandez (jgh9094@gmail.com) - Jay Moran (jay.moran@cshs.org) - Nicholas Matsumoto (nicholas.matsumoto@cshs.org) - Hyunjun Choi (hyunjun.choi@cshs.org) - Gabriel Ketron (gabriel.ketron@cshs.org) - Miguel E. Hernandez (miguel.e.hernandez@cshs.org) - Jason Moore (moorejh28@gmail.com)
TPOT 原始版本主要由宾夕法尼亚大学的以下人员开发: - Randal S. Olson (rso@randalolson.com) - Weixuan Fu (weixuanf@upenn.edu) - Daniel Angell (dpa34@drexel.edu) - Jason Moore (moorejh28@gmail.com) - 以及许多慷慨的开源贡献者
TPOT 是自由软件:您可以根据自由软件基金会发布的 GNU 宽通用公共许可证(或您选择的任何更高版本)的条款重新分发和/或修改它。
分发 TPOT 是希望它会有用,但没有任何保证;甚至没有适销性或特定用途适用性的默示保证。有关更多详细信息,请参阅 GNU 宽通用公共许可证。
您应该已经随 TPOT 收到了一份 GNU 宽通用公共许可证的副本。如果没有,请参阅 https://gnu.ac.cn/licenses/。
DynamicUnionPipeline
基类: SearchSpace
源代码位于 tpot/search_spaces/pipelines/dynamicunion.py
| class DynamicUnionPipeline(SearchSpace):
def __init__(self, search_space : SearchSpace, max_estimators=None, allow_repeats=False ) -> None:
"""
Takes in a list of search spaces. will produce a pipeline of Sequential length. Each step in the pipeline will correspond to the the search space provided in the same index.
"""
self.search_space = search_space
self.max_estimators = max_estimators
self.allow_repeats = allow_repeats
def generate(self, rng=None):
rng = np.random.default_rng(rng)
return DynamicUnionPipelineIndividual(self.search_space, max_estimators=self.max_estimators, allow_repeats=self.allow_repeats, rng=rng)
|
__init__(search_space, max_estimators=None, allow_repeats=False)
接收一个搜索空间列表。将生成一个顺序长度的流水线。流水线中的每个步骤将对应于在相同索引处提供的搜索空间。
源代码位于 tpot/search_spaces/pipelines/dynamicunion.py
| def __init__(self, search_space : SearchSpace, max_estimators=None, allow_repeats=False ) -> None:
"""
Takes in a list of search spaces. will produce a pipeline of Sequential length. Each step in the pipeline will correspond to the the search space provided in the same index.
"""
self.search_space = search_space
self.max_estimators = max_estimators
self.allow_repeats = allow_repeats
|
DynamicUnionPipelineIndividual
基类: SklearnIndividual
接收一个搜索空间。将生成一个最多包含 max_estimators 个步骤的 FeatureUnion。FeatureUnion 的输出将是所有步骤的串联结果。
源代码位于 tpot/search_spaces/pipelines/dynamicunion.py
| class DynamicUnionPipelineIndividual(SklearnIndividual):
"""
Takes in one search space.
Will produce a FeatureUnion of up to max_estimators number of steps.
The output of the FeatureUnion will the all of the steps concatenated together.
"""
def __init__(self, search_space : SearchSpace, max_estimators=None, allow_repeats=False, rng=None) -> None:
super().__init__()
self.search_space = search_space
if max_estimators is None:
self.max_estimators = np.inf
else:
self.max_estimators = max_estimators
self.allow_repeats = allow_repeats
self.union_dict = {}
if self.max_estimators == np.inf:
init_max = 3
else:
init_max = self.max_estimators
rng = np.random.default_rng(rng)
for _ in range(rng.integers(1, init_max)):
self._mutate_add_step(rng)
def mutate(self, rng=None):
rng = np.random.default_rng(rng)
mutation_funcs = [self._mutate_add_step, self._mutate_remove_step, self._mutate_replace_step, self._mutate_note]
rng.shuffle(mutation_funcs)
for mutation_func in mutation_funcs:
if mutation_func(rng):
return True
def _mutate_add_step(self, rng):
rng = np.random.default_rng(rng)
max_attempts = 10
if len(self.union_dict) < self.max_estimators:
for _ in range(max_attempts):
new_step = self.search_space.generate(rng)
if new_step.unique_id() not in self.union_dict:
self.union_dict[new_step.unique_id()] = new_step
return True
return False
def _mutate_remove_step(self, rng):
rng = np.random.default_rng(rng)
if len(self.union_dict) > 1:
self.union_dict.pop( rng.choice(list(self.union_dict.keys())))
return True
return False
def _mutate_replace_step(self, rng):
rng = np.random.default_rng(rng)
changed = self._mutate_remove_step(rng) or self._mutate_add_step(rng)
return changed
#TODO mutate one step or multiple?
def _mutate_note(self, rng):
rng = np.random.default_rng(rng)
changed = False
values = list(self.union_dict.values())
for step in values:
if rng.random() < 0.5:
changed = step.mutate(rng) or changed
self.union_dict = {step.unique_id(): step for step in values}
return changed
def crossover(self, other, rng=None):
rng = np.random.default_rng(rng)
cx_funcs = [self._crossover_swap_multiple_nodes, self._crossover_node]
rng.shuffle(cx_funcs)
for cx_func in cx_funcs:
if cx_func(other, rng):
return True
return False
def _crossover_swap_multiple_nodes(self, other, rng):
rng = np.random.default_rng(rng)
self_values = list(self.union_dict.values())
other_values = list(other.union_dict.values())
rng.shuffle(self_values)
rng.shuffle(other_values)
self_idx = rng.integers(0,len(self_values))
other_idx = rng.integers(0,len(other_values))
#Note that this is not one-point-crossover since the sequence doesn't matter. this is just a quick way to swap multiple random items
self_values[:self_idx], other_values[:other_idx] = other_values[:other_idx], self_values[:self_idx]
self.union_dict = {step.unique_id(): step for step in self_values}
other.union_dict = {step.unique_id(): step for step in other_values}
return True
def _crossover_node(self, other, rng):
rng = np.random.default_rng(rng)
changed = False
self_values = list(self.union_dict.values())
other_values = list(other.union_dict.values())
rng.shuffle(self_values)
rng.shuffle(other_values)
for self_step, other_step in zip(self_values, other_values):
if rng.random() < 0.5:
changed = self_step.crossover(other_step, rng) or changed
self.union_dict = {step.unique_id(): step for step in self_values}
other.union_dict = {step.unique_id(): step for step in other_values}
return changed
def export_pipeline(self, **kwargs):
values = list(self.union_dict.values())
return sklearn.pipeline.make_union(*[step.export_pipeline(**kwargs) for step in values])
def unique_id(self):
values = list(self.union_dict.values())
l = [step.unique_id() for step in values]
# if all items are strings, then sort them
if all([isinstance(x, str) for x in l]):
l.sort()
l = ["FeatureUnion"] + l
return TupleIndex(frozenset(l))
|