利用线程池
Python的concurrent.futures库提供了多线程编程接口。concurrent.futures.ThreadPoolExecutor类则提供了线程池化的线程管理基础设施。
import concurrent.futures import multiprocessing # Worker function def DoWork(Param1, Param2, Param3) : # Do some work WorkResult = f"Pokemon#{Param1}{Param2}{Param3}" return WorkResult #End Function # Script entry if __name__ == "__main__" : # Get processor count nProcessorCount = multiprocessing.cpu_count() # Preprocessing # Do some pre processing here # arrWorkerParams is an example of worker's parameters arrWorkerParams = [ (2,4,5), (2,5,0), (3,8,0), (3,8,1) ] # Array of workers arrWorkers = [] # Create thread pool object with concurrent.futures.ThreadPoolExecutor(max_workers=int(nProcessorCount/2)) as polWorkers : for Params in arrWorkerParams : # Assign parameter values Param1Value = Params[0] Param2Value = Params[1] Param3Value = Params[2] # Submit works ftrCurrentWorker = polWorkers.submit(DoWork, Param1Value, Param2Value, Param3Value) # Cache current worker arrWorkers.append(ftrCurrentWorker) #Next # Register completion list arrCompletedTasks = concurrent.futures.as_completed(arrWorkers) # Collect results arrWorkerResults = [] for WorkerResult in arrCompletedTasks : # Do post processting arrWorkerResults.append(WorkerResult.result()) #Next #End With # Collect worker results for CurrentResult in arrWorkerResults : # Do post processting print(CurrentResult) #Next #End If
参考资料:
https://blog.csdn.net/huapingqi/article/details/132521391
https://blog.csdn.net/waitan2018/article/details/108386898
利用进程池
Python的concurrent.futures库提供了多线程编程接口。concurrent.futures.ProcessPoolExecutor类则提供了进程池化的线程管理基础设施。
利用进程池化的多线程编程,可以在一定程度上避免如MatPlotLib的多线程绘图报警等问题。
需要注意的是,在其它脚本中调用会创建进程池或子进程的函数时,需将该脚本的入口代码使用“if __name__ == "__main__" :”结构包裹,否则在执行时会产生“An attempt has been made to start a new process before the current process has finished its bootstrapping phase.”错误。
同时,如果子进程中调用了可能建立concurrent.futures.Future对象的函数或方法(例如:某些并行化计算工具),可能导致进程池在退出(离开with子句确定的域,或显式调用shutdown()方法)时进入长时间甚至无限等待状态。请参考: https://github.com/python/cpython/issues/94440 。
import concurrent.futures import multiprocessing # Worker function def DoWork(Param1, Param2, Param3) : # Do some work WorkResult = f"Pokemon#{Param1}{Param2}{Param3}" return WorkResult #End Function # Script entry if __name__ == "__main__" : # Get processor count nProcessorCount = multiprocessing.cpu_count() # Preprocessing # Do some pre processing here # arrWorkerParams is an example of worker's parameters arrWorkerParams = [ (2,4,5), (2,5,0), (3,8,0), (3,8,1) ] # Array of workers arrWorkers = [] # Create process pool object with concurrent.futures.ProcessPoolExecutor(max_workers=int(nProcessorCount/2)) as polWorkers : for Params in arrWorkerParams : # Assign parameter values Param1Value = Params[0] Param2Value = Params[1] Param3Value = Params[2] # Submit works ftrCurrentWorker = polWorkers.submit(DoWork, Param1Value, Param2Value, Param3Value) # Cache current worker arrWorkers.append(ftrCurrentWorker) #Next # Register completion list arrCompletedTasks = concurrent.futures.as_completed(arrWorkers) # Collect results arrWorkerResults = [] for WorkerResult in arrCompletedTasks : # Do post processting arrWorkerResults.append(WorkerResult.result()) #Next #End With # Collect worker results for CurrentResult in arrWorkerResults : # Do post processting print(CurrentResult) #Next #End If
concurrent.futures.ProcessPoolExecutor在Windows上可能存在性能和处理器数限制问题,因此,可以考虑使用multiprocessing.Pool。
multiprocessing.Pool进程池使用multiprocessing.Pool.starmap_async()提交异步任务,该函数的第一个参数指定目标函数,第二个参数使用包含元组的列表(list)表示需要依次提交的参数组。请注意,即使目标函数只有1个参数,也应将参数置于元组中。即使目标函数只被调用一次,也要将参数元组置于列表(list)结构中。提交的任务返回一个multiprocessing.AsyncResult对象。调用该对象的get()函数将等待对应线程(组)结束,并返回一个结果列表(请注意,即使目标函数只被调用一次,get()函数也将结果封装在列表中)。
import multiprocessing # Worker function def DoWork(Param1, Param2, Param3) : # Do some work WorkResult = f"Pokemon#{Param1}{Param2}{Param3}" return WorkResult #End Function # Script entry if __name__ == "__main__" : # Get processor count nProcessorCount = multiprocessing.cpu_count() # Preprocessing # Do some pre processing here # arrWorkerParams is an example of worker's parameters arrWorkerParams = [ (2,4,5), (2,5,0), (3,8,0), (3,8,1) ] # Array of workers arrWorkers = [] # Create process pool object with multiprocessing.Pool(processes=int(nProcessorCount/2)) as polWorkers : for Params in arrWorkerParams : # Submit works resCurrentWorker = polWorkers.starmap_async(DoWork, [Params]) # Cache async results current worker arrWorkers.append(resCurrentWorker) #Next # Collect results arrWorkerResults = [] for WorkerResult in arrWorkers : # Do post processting arrWorkerResults.append(WorkerResult.get()[0]) #Next #End With # Collect worker results for CurrentResult in arrWorkerResults : # Do post processting print(CurrentResult) #Next #End If
优化进程池
concurrent.futures.ProcessPoolExecutor即使在max_workers为1时也会创建子进程,multiprocessing.Pool亦同,这可能造成严重的性能下降,或在嵌套调用可以创建子进程的函数时产生问题。
因此,可以考虑建立一个包装函数,在指定子进程个数为1时,不创建子进程,该函数还可以进行多线程后端选择等工作:
import concurrent.futures import multiprocessing # Constants PROCESS_POOL_EXECUTOR_MAX_WORKERS_LIMIT_WINDOWS = 61 # Checks if current platform is Windows def IsRunningOnWindows() -> bool : return sys.platform.lower().startswith("win") #End Function # Run jobs in parallel with process pool # This is a wrapper of concurrent.futures.ProcessPoolExecutor # This also prevents child processes from creating when nWorkers <= 1 # arrFunctions is list of functions # arrArgs is list of tuple, with arguments (*args) # arrKwArgs is list of dict, with keyed arguments (**kwargs) # iPoolBackend selects which backend is used: # 0 - Uses concurrent.futures.ProcessPoolExecutor # 1 - Uses multiprocessing.Pool (warning: in this case, arrKwArgs will be ignored) # If IsBackendAutoSelectingEnabled, will use the following logic to select backend: # When running on Windows, nWorkers is None or nWorkers > PROCESS_POOL_EXECUTOR_MAX_WORKERS_LIMIT_WINDOWS, arrKwArgs is None: use multiprocessing.Pool # Otherwise, use concurrent.futures.ProcessPoolExecutor # Returns list of results def RunJobsWithProcessPool(arrFunctions : list, arrArgs : list = None, arrKwArgs : list = None, nWorkers : int = None, iPoolBackend : int = 0, IsBackendAutoSelectingEnabled : bool = False) -> list : # Handling null-references if arrArgs is None : arrArgs = [] #End If if arrKwArgs is None : arrKwArgs = [] #End If if iPoolBackend < 0 : iPoolBackend = 0 elif iPoolBackend > 1 : iPoolBackend = 1 #End If # Initialize result buffer arrResults = [] # Check if we really need to create child workers IsChildWorkersNeeded = False if nWorkers is None : IsChildWorkersNeeded = True elif nWorkers > 1 : IsChildWorkersNeeded = True #End If # Backend auto selecting if IsBackendAutoSelectingEnabled : # Check if we are using Windows if IsRunningOnWindows() : # Check if we need a lot of processor cores IsProcessorCoreCountLarge = False if nWorkers is None : IsProcessorCoreCountLarge = True elif nWorkers > PROCESS_POOL_EXECUTOR_MAX_WORKERS_LIMIT_WINDOWS : IsProcessorCoreCountLarge = True #End If # Select backend if IsProcessorCoreCountLarge : iPoolBackend = 1 else : iPoolBackend = 0 #End If else : iPoolBackend = 0 #End If else : # Avoid ValueError caused by concurrent.futures.ProcessPoolExecutor on Windows if IsRunningOnWindows() : if nWorkers is None : nWorkers = PROCESS_POOL_EXECUTOR_MAX_WORKERS_LIMIT_WINDOWS elif nWorkers > PROCESS_POOL_EXECUTOR_MAX_WORKERS_LIMIT_WINDOWS : nWorkers = PROCESS_POOL_EXECUTOR_MAX_WORKERS_LIMIT_WINDOWS #End If #End If #End If # Run jobs if IsChildWorkersNeeded : # Use backend selected by iPoolBackend to run works if iPoolBackend == 0 : # Using concurrent.futures.ProcessPoolExecutor # Multithread workers arrWorkers = [] # Create thread pool object with concurrent.futures.ProcessPoolExecutor(max_workers=nWorkers) as polWorkers : # Create and submit job package for i in range(0, len(arrFunctions)) : # Get function object and arguments fncFunction = arrFunctions[i] tplArgs = arrArgs[i] if (i < len(arrArgs)) else tuple() dctKwArgs = arrKwArgs[i] if (i < len(arrKwArgs)) else dict() # Submit a job ftrCurrentWorker = polWorkers.submit(fncFunction, *tplArgs, **dctKwArgs) # Cache current worker arrWorkers.append(ftrCurrentWorker) #Next # Register completion list arrCompletedTasks = concurrent.futures.as_completed(arrWorkers) # Collect results for WorkerResult in arrCompletedTasks : arrResults.append(WorkerResult.result()) #Next #End With elif iPoolBackend == 1 : # Using multiprocessing.Pool # Multithread workers arrWorkers = [] # Create thread pool object with multiprocessing.Pool(processes=nWorkers) as polWorkers : # Create and submit job package for i in range(0, len(arrFunctions)) : # Get function object and arguments fncFunction = arrFunctions[i] tplArgs = arrArgs[i] if (i < len(arrArgs)) else tuple() # Submit a job resCurrentWorker = polWorkers.starmap_async(fncFunction, [tplArgs]) # Cache async results current worker arrWorkers.append(resCurrentWorker) #Next # Collect results for WorkerResult in arrWorkers : arrResults.append(WorkerResult.get()[0]) #Next #End With #End If else : # Run jobs in current process directly for i in range(0, len(arrFunctions)) : # Get function object and arguments fncFunction = arrFunctions[i] tplArgs = arrArgs[i] if (i < len(arrArgs)) else () dctKwArgs = arrKwArgs[i] if (i < len(arrKwArgs)) else dict() # Run job resCurrentWorker = fncFunction(*tplArgs, **dctKwArgs) # Collect results arrResults.append(resCurrentWorker) #Next #End If return arrResults #End Fucntion # Worker function def DoWork(Param1, Param2, Param3) : # Do some work WorkResult = f"Pokemon#{Param1}{Param2}{Param3}" return WorkResult #End Function # Main entry point if __name__ == "__main__" : # Run jobs in parallel # Create functions & arguments list arrWorkerFuncs = [] arrWorkerArgs = [] # Reult=DoWork(2,4,5) arrWorkerFuncs.append(DoWork) arrWorkerArgs.append((2,4,5)) # Reult=DoWork(2,5,0) arrWorkerFuncs.append(DoWork) arrWorkerArgs.append((2,5,0)) # Reult=DoWork(3,8,0) arrWorkerFuncs.append(DoWork) arrWorkerArgs.append((3,8,0)) # Reult=DoWork(3,8,1) arrWorkerFuncs.append(DoWork) arrWorkerArgs.append((3,8,1)) # Run jobs nParallelCPUJobCount = int(multiprocessing.cpu_count() / 2) arrWorkerResults = RunJobsWithProcessPool(arrWorkerFuncs, arrWorkerArgs, nWorkers=nParallelCPUJobCount) # Collect worker results for CurrentResult in arrWorkerResults : # Do post processting print(CurrentResult) #Next #End If
参考资料:
https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods
https://gairuo.com/p/python-multiprocessing-pool
https://www.cnblogs.com/piperliu/articles/18615898
https://superfastpython.com/multiprocessing-pool-starmap_async/
https://martinlwx.github.io/en/data-parallel-with-two-different-api/





