并行化¶
本教程涵盖了使用 Dask 并行化 TPOT 的高级设置。如果您只想在单台计算机上使用多个进程并行化 TPOT,只需将 n_jobs
参数设置为您希望使用的进程数,然后跳过本教程即可。
TPOT 使用 Dask 进行并行化,并默认为本地并行化使用 dask.distributed.LocalCluster
。用户可以传入自定义的 Dask 客户端或集群以进行高级使用。例如,使用 dask-jobqueue
包可以实现多节点并行化。
通过设置 n_jobs
和 memory_limit
参数,TPOT 可以在本地计算机上轻松并行化。
n_jobs
指定要启动多少个 dask worker。在 TPOT 中,这对应于并行评估的管道数量。
memory_limit
是每个 worker 使用的内存量(RAM)。
In [ ]
已复制!
import tpot
import sklearn
import sklearn.datasets
import numpy as np
scorer = sklearn.metrics.get_scorer('roc_auc_ovr')
X, y = sklearn.datasets.load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)
graph_search_space = tpot.search_spaces.pipelines.GraphSearchPipeline(
root_search_space= tpot.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]),
leaf_search_space = tpot.config.get_search_space("selectors"),
inner_search_space = tpot.config.get_search_space(["transformers"]),
max_size = 10,
)
est = tpot.TPOTEstimator(
scorers = ["roc_auc_ovr"],
scorers_weights = [1],
classification = True,
cv = 10,
search_space = graph_search_space,
max_time_mins = 60,
verbose = 2,
n_jobs=16,
memory_limit="4GB"
)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))
import tpot import sklearn import sklearn.datasets import numpy as np scorer = sklearn.metrics.get_scorer('roc_auc_ovr') X, y = sklearn.datasets.load_iris(return_X_y=True) X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25) graph_search_space = tpot.search_spaces.pipelines.GraphSearchPipeline( root_search_space= tpot.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]), leaf_search_space = tpot.config.get_search_space("selectors"), inner_search_space = tpot.config.get_search_space(["transformers"]), max_size = 10, ) est = tpot.TPOTEstimator( scorers = ["roc_auc_ovr"], scorers_weights = [1], classification = True, cv = 10, search_space = graph_search_space, max_time_mins = 60, verbose = 2, n_jobs=16, memory_limit="4GB" ) est.fit(X_train, y_train) print(scorer(est, X_test, y_test))
/opt/anaconda3/envs/tpotenv/lib/python3.10/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html from .autonotebook import tqdm as notebook_tqdm Generation: : 8it [01:00, 7.57s/it] /opt/anaconda3/envs/tpotenv/lib/python3.10/site-packages/sklearn/decomposition/_fastica.py:595: UserWarning: n_components is too large: it will be set to 4 warnings.warn( /opt/anaconda3/envs/tpotenv/lib/python3.10/site-packages/sklearn/linear_model/_sag.py:349: ConvergenceWarning: The max_iter was reached which means the coef_ did not converge warnings.warn(
0.997905982905983
初始化基本的 Dask 本地集群
In [2]
已复制!
from dask.distributed import Client, LocalCluster
n_jobs = 4
memory_limit = "4GB"
cluster = LocalCluster(n_workers=n_jobs, #if no client is passed in and no global client exists, create our own
threads_per_worker=1,
memory_limit=memory_limit)
client = Client(cluster)
from dask.distributed import Client, LocalCluster n_jobs = 4 memory_limit = "4GB" cluster = LocalCluster(n_workers=n_jobs, #if no client is passed in and no global client exists, create our own threads_per_worker=1, memory_limit=memory_limit) client = Client(cluster)
获取查看 Dask 仪表盘的链接。
In [3]
已复制!
client.dashboard_link
client.dashboard_link
Out[3]
'http://127.0.0.1:8787/status'
In [ ]
已复制!
graph_search_space = tpot.search_spaces.pipelines.GraphSearchPipeline(
root_search_space= tpot.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]),
leaf_search_space = tpot.config.get_search_space("selectors"),
inner_search_space = tpot.config.get_search_space(["transformers"]),
max_size = 10,
)
est = tpot.TPOTEstimator(
client = client,
scorers = ["roc_auc_ovr"],
scorers_weights = [1],
classification = True,
cv = 10,
search_space = graph_search_space,
max_time_mins = 60,
early_stop=10,
verbose = 2,
)
# this is equivalent to:
# est = tpot.TPOTClassifier(population_size= 8, generations=5, n_jobs=4, memory_limit="4GB", verbose=1)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))
#It is good to close the client and cluster when you are done with them
client.close()
cluster.close()
graph_search_space = tpot.search_spaces.pipelines.GraphSearchPipeline( root_search_space= tpot.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]), leaf_search_space = tpot.config.get_search_space("selectors"), inner_search_space = tpot.config.get_search_space(["transformers"]), max_size = 10, ) est = tpot.TPOTEstimator( client = client, scorers = ["roc_auc_ovr"], scorers_weights = [1], classification = True, cv = 10, search_space = graph_search_space, max_time_mins = 60, early_stop=10, verbose = 2, ) # this is equivalent to: # est = tpot.TPOTClassifier(population_size= 8, generations=5, n_jobs=4, memory_limit="4GB", verbose=1) est.fit(X_train, y_train) print(scorer(est, X_test, y_test)) #It is good to close the client and cluster when you are done with them client.close() cluster.close()
Generation: : 8it [01:01, 7.69s/it] /opt/anaconda3/envs/tpotenv/lib/python3.10/site-packages/sklearn/linear_model/_sag.py:349: ConvergenceWarning: The max_iter was reached which means the coef_ did not converge warnings.warn( 2025-02-21 16:37:55,843 - distributed.worker.state_machine - WARNING - Async instruction for <Task cancelled name="execute('eval_objective_list-8bdcf8a1c1f54374fc47664011238a6d')" coro=<Worker.execute() done, defined at /opt/anaconda3/envs/tpotenv/lib/python3.10/site-packages/distributed/worker_state_machine.py:3607>> ended with CancelledError
0.997905982905983
选项 2
您可以使用上下文管理器初始化集群和客户端,这会自动关闭它们。
In [ ]
已复制!
from dask.distributed import Client, LocalCluster
import tpot
import sklearn
import sklearn.datasets
import numpy as np
scorer = sklearn.metrics.get_scorer('roc_auc_ovr')
X, y = sklearn.datasets.load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)
n_jobs = 4
memory_limit = "4GB"
with LocalCluster(
n_workers=n_jobs,
threads_per_worker=1,
memory_limit='4GB',
) as cluster, Client(cluster) as client:
graph_search_space = tpot.search_spaces.pipelines.GraphSearchPipeline(
root_search_space= tpot.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]),
leaf_search_space = tpot.config.get_search_space("selectors"),
inner_search_space = tpot.config.get_search_space(["transformers"]),
max_size = 10,
)
est = tpot.TPOTEstimator(
client = client,
scorers = ["roc_auc_ovr"],
scorers_weights = [1],
classification = True,
cv = 5,
search_space = graph_search_space,
max_time_mins = 60,
early_stop=10,
verbose = 2,
)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))
from dask.distributed import Client, LocalCluster import tpot import sklearn import sklearn.datasets import numpy as np scorer = sklearn.metrics.get_scorer('roc_auc_ovr') X, y = sklearn.datasets.load_iris(return_X_y=True) X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25) n_jobs = 4 memory_limit = "4GB" with LocalCluster( n_workers=n_jobs, threads_per_worker=1, memory_limit='4GB', ) as cluster, Client(cluster) as client: graph_search_space = tpot.search_spaces.pipelines.GraphSearchPipeline( root_search_space= tpot.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]), leaf_search_space = tpot.config.get_search_space("selectors"), inner_search_space = tpot.config.get_search_space(["transformers"]), max_size = 10, ) est = tpot.TPOTEstimator( client = client, scorers = ["roc_auc_ovr"], scorers_weights = [1], classification = True, cv = 5, search_space = graph_search_space, max_time_mins = 60, early_stop=10, verbose = 2, ) est.fit(X_train, y_train) print(scorer(est, X_test, y_test))
Generation: : 10it [01:00, 6.07s/it] /opt/anaconda3/envs/tpotenv/lib/python3.10/site-packages/sklearn/linear_model/_sag.py:349: ConvergenceWarning: The max_iter was reached which means the coef_ did not converge warnings.warn( 2025-02-21 16:38:57,976 - distributed.worker.state_machine - WARNING - Async instruction for <Task cancelled name="execute('eval_objective_list-87c6eded7038f6c8291a3ee9879aef3f')" coro=<Worker.execute() done, defined at /opt/anaconda3/envs/tpotenv/lib/python3.10/site-packages/distributed/worker_state_machine.py:3607>> ended with CancelledError
1.0
2025-02-21 16:39:01,975 - distributed.nanny - WARNING - Worker process still alive after 4.0 seconds, killing
在 HPC 上使用 Dask 进行多节点并行化¶
Dask 可以通过作业排队系统跨多个节点进行并行化。这通过使用 Dask-Jobqueue
包来完成。可以在官方文档此处找到更多信息。
要使用 Dask-Jobqueue
并行化 TPOT,只需将基于 Jobqueue 集群并带有所需设置的客户端传入到 client
参数中即可。每个作业将评估一个管道。
请注意,TPOT 将忽略 n_jobs
和 memory_limit
,因为这些应该在 Dask 集群内部设置。
以下示例专门针对 Sun Grid Engine。其他受支持的集群可以在Dask-Jobqueue 文档此处找到。
In [ ]
已复制!
from dask.distributed import Client, LocalCluster
import sklearn
import sklearn.datasets
import sklearn.metrics
import sklearn.model_selection
import tpot
from dask_jobqueue import SGECluster # or SLURMCluster, PBSCluster, etc. Replace SGE with your scheduler.
import os
if os.system("which qsub") != 0:
print("Sun Grid Engine is not installed. This example requires Sun Grid Engine to be installed.")
else:
print("Sun Grid Engine is installed.")
cluster = SGECluster(
queue='all.q',
cores=2,
memory="50 GB"
)
cluster.adapt(minimum_jobs=10, maximum_jobs=100) # auto-scale between 10 and 100 jobs
client = Client(cluster)
scorer = sklearn.metrics.get_scorer('roc_auc_ovr')
X, y = sklearn.datasets.load_digits(return_X_y=True)
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)
graph_search_space = tpot.search_spaces.pipelines.GraphSearchPipeline(
root_search_space= tpot.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]),
leaf_search_space = tpot.config.get_search_space("selectors"),
inner_search_space = tpot.config.get_search_space(["transformers"]),
max_size = 10,
)
est = tpot.TPOTEstimator(
client = client,
scorers = ["roc_auc"],
scorers_weights = [1],
classification = True,
cv = 10,
search_space = graph_search_space,
max_time_mins = 60,
early_stop=10,
verbose = 2,
)
est.fit(X_train, y_train)
# this is equivalent to:
# est = tpot.TPOTClassifier(population_size= 8, generations=5, n_jobs=4, memory_limit="4GB", verbose=1)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))
#It is good to close the client and cluster when you are done with them
client.close()
cluster.close()
from dask.distributed import Client, LocalCluster import sklearn import sklearn.datasets import sklearn.metrics import sklearn.model_selection import tpot from dask_jobqueue import SGECluster # or SLURMCluster, PBSCluster, etc. Replace SGE with your scheduler. import os if os.system("which qsub") != 0: print("Sun Grid Engine is not installed. This example requires Sun Grid Engine to be installed.") else: print("Sun Grid Engine is installed.") cluster = SGECluster( queue='all.q', cores=2, memory="50 GB" ) cluster.adapt(minimum_jobs=10, maximum_jobs=100) # auto-scale between 10 and 100 jobs client = Client(cluster) scorer = sklearn.metrics.get_scorer('roc_auc_ovr') X, y = sklearn.datasets.load_digits(return_X_y=True) X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25) graph_search_space = tpot.search_spaces.pipelines.GraphSearchPipeline( root_search_space= tpot.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]), leaf_search_space = tpot.config.get_search_space("selectors"), inner_search_space = tpot.config.get_search_space(["transformers"]), max_size = 10, ) est = tpot.TPOTEstimator( client = client, scorers = ["roc_auc"], scorers_weights = [1], classification = True, cv = 10, search_space = graph_search_space, max_time_mins = 60, early_stop=10, verbose = 2, ) est.fit(X_train, y_train) # this is equivalent to: # est = tpot.TPOTClassifier(population_size= 8, generations=5, n_jobs=4, memory_limit="4GB", verbose=1) est.fit(X_train, y_train) print(scorer(est, X_test, y_test)) #It is good to close the client and cluster when you are done with them client.close() cluster.close()
Sun Grid Engine is not installed. This example requires Sun Grid Engine to be installed.