在Python中进行多线程编程

利用线程池

Python的concurrent.futures库提供了多线程编程接口。concurrent.futures.ThreadPoolExecutor类则提供了线程池化的线程管理基础设施。

import concurrent.futures
import multiprocessing
 
# Worker function
def DoWork(Param1, Param2, Param3) :
    # Do some work
    WorkResult = f"Pokemon#{Param1}{Param2}{Param3}"
 
    return WorkResult
#End Function
 
# Script entry
if __name__ == "__main__" :
    # Get processor count
    nProcessorCount = multiprocessing.cpu_count()
 
    # Preprocessing
    # Do some pre processing here
    # arrWorkerParams is an example of worker's parameters
    arrWorkerParams = [
        (2,4,5),
        (2,5,0),
        (3,8,0),
        (3,8,1)
    ]
 
    # Array of workers
    arrWorkers = []
 
    # Create thread pool object
    with concurrent.futures.ThreadPoolExecutor(max_workers=int(nProcessorCount/2)) as polWorkers :
        for Params in arrWorkerParams :
            # Assign parameter values
            Param1Value = Params[0]
            Param2Value = Params[1]
            Param3Value = Params[2]
 
            # Submit works
            ftrCurrentWorker = polWorkers.submit(DoWork,
                Param1Value,
                Param2Value,
                Param3Value)
 
            # Cache current worker
            arrWorkers.append(ftrCurrentWorker)
        #Next
 
        # Register completion list
        arrCompletedTasks = concurrent.futures.as_completed(arrWorkers)
 
        # Collect results
        arrWorkerResults = []
        for WorkerResult in arrCompletedTasks :
            # Do post processting
            arrWorkerResults.append(WorkerResult.result())
        #Next
    #End With
 
    # Collect worker results
    for CurrentResult in arrWorkerResults :
        # Do post processting
        print(CurrentResult)
    #Next
#End If

参考资料:

https://blog.csdn.net/huapingqi/article/details/132521391

https://deepinout.com/python/python-qa/18_python_pass_multiple_parameters_to_concurrentfuturesexecutormap.html

https://blog.csdn.net/waitan2018/article/details/108386898

利用进程池

Python的concurrent.futures库提供了多线程编程接口。concurrent.futures.ProcessPoolExecutor类则提供了进程池化的线程管理基础设施。

利用进程池化的多线程编程,可以在一定程度上避免如MatPlotLib的多线程绘图报警等问题。

需要注意的是,在其它脚本中调用会创建进程池或子进程的函数时,需将该脚本的入口代码使用“if __name__ == "__main__" :”结构包裹,否则在执行时会产生“An attempt has been made to start a new process before the current process has finished its bootstrapping phase.”错误。

同时,如果子进程中调用了可能建立concurrent.futures.Future对象的函数或方法(例如:某些并行化计算工具),可能导致进程池在退出(离开with子句确定的域,或显式调用shutdown()方法)时进入长时间甚至无限等待状态。请参考: https://github.com/python/cpython/issues/94440

import concurrent.futures
import multiprocessing
 
# Worker function
def DoWork(Param1, Param2, Param3) :
    # Do some work
    WorkResult = f"Pokemon#{Param1}{Param2}{Param3}"
 
    return WorkResult
#End Function
 
# Script entry
if __name__ == "__main__" :
    # Get processor count
    nProcessorCount = multiprocessing.cpu_count()
 
    # Preprocessing
    # Do some pre processing here
    # arrWorkerParams is an example of worker's parameters
    arrWorkerParams = [
        (2,4,5),
        (2,5,0),
        (3,8,0),
        (3,8,1)
    ]
 
    # Array of workers
    arrWorkers = []
 
    # Create process pool object
    with concurrent.futures.ProcessPoolExecutor(max_workers=int(nProcessorCount/2)) as polWorkers :
        for Params in arrWorkerParams :
            # Assign parameter values
            Param1Value = Params[0]
            Param2Value = Params[1]
            Param3Value = Params[2]
 
            # Submit works
            ftrCurrentWorker = polWorkers.submit(DoWork,
                Param1Value,
                Param2Value,
                Param3Value)
 
            # Cache current worker
            arrWorkers.append(ftrCurrentWorker)
        #Next
 
        # Register completion list
        arrCompletedTasks = concurrent.futures.as_completed(arrWorkers)
 
        # Collect results
        arrWorkerResults = []
        for WorkerResult in arrCompletedTasks :
            # Do post processting
            arrWorkerResults.append(WorkerResult.result())
        #Next
    #End With
 
    # Collect worker results
    for CurrentResult in arrWorkerResults :
        # Do post processting
        print(CurrentResult)
    #Next
#End If

concurrent.futures.ProcessPoolExecutor在Windows上可能存在性能和处理器数限制问题,因此,可以考虑使用multiprocessing.Pool

multiprocessing.Pool进程池使用multiprocessing.Pool.starmap_async()提交异步任务,该函数的第一个参数指定目标函数,第二个参数使用包含元组的列表(list)表示需要依次提交的参数组。请注意,即使目标函数只有1个参数,也应将参数置于元组中。即使目标函数只被调用一次,也要将参数元组置于列表(list)结构中。提交的任务返回一个multiprocessing.AsyncResult对象。调用该对象的get()函数将等待对应线程(组)结束,并返回一个结果列表(请注意,即使目标函数只被调用一次,get()函数也将结果封装在列表中)。

import multiprocessing
 
# Worker function
def DoWork(Param1, Param2, Param3) :
    # Do some work
    WorkResult = f"Pokemon#{Param1}{Param2}{Param3}"
 
    return WorkResult
#End Function
 
# Script entry
if __name__ == "__main__" :
    # Get processor count
    nProcessorCount = multiprocessing.cpu_count()
 
    # Preprocessing
    # Do some pre processing here
    # arrWorkerParams is an example of worker's parameters
    arrWorkerParams = [
        (2,4,5),
        (2,5,0),
        (3,8,0),
        (3,8,1)
    ]
 
    # Array of workers
    arrWorkers = []
 
    # Create process pool object
    with multiprocessing.Pool(processes=int(nProcessorCount/2)) as polWorkers :
        for Params in arrWorkerParams :
            # Submit works
            resCurrentWorker = polWorkers.starmap_async(DoWork, [Params])
 
            # Cache async results current worker
            arrWorkers.append(resCurrentWorker)
        #Next
 
        # Collect results
        arrWorkerResults = []
        for WorkerResult in arrWorkers :
            # Do post processting
            arrWorkerResults.append(WorkerResult.get()[0])
        #Next
    #End With
 
    # Collect worker results
    for CurrentResult in arrWorkerResults :
        # Do post processting
        print(CurrentResult)
    #Next
#End If

优化进程池

concurrent.futures.ProcessPoolExecutor即使在max_workers1时也会创建子进程,multiprocessing.Pool亦同,这可能造成严重的性能下降,或在嵌套调用可以创建子进程的函数时产生问题。

因此,可以考虑建立一个包装函数,在指定子进程个数为1时,不创建子进程,该函数还可以进行多线程后端选择等工作:

import concurrent.futures
import multiprocessing
 
# Constants
PROCESS_POOL_EXECUTOR_MAX_WORKERS_LIMIT_WINDOWS = 61
 
# Checks if current platform is Windows
def IsRunningOnWindows() -> bool :
    return sys.platform.lower().startswith("win")
#End Function
 
# Run jobs in parallel with process pool
# This is a wrapper of concurrent.futures.ProcessPoolExecutor
# This also prevents child processes from creating when nWorkers <= 1
#     arrFunctions is list of functions
#     arrArgs is list of tuple, with arguments (*args)
#     arrKwArgs is list of dict, with keyed arguments (**kwargs)
#     iPoolBackend selects which backend is used:
#         0 - Uses concurrent.futures.ProcessPoolExecutor
#         1 - Uses multiprocessing.Pool (warning: in this case, arrKwArgs will be ignored)
#     If IsBackendAutoSelectingEnabled, will use the following logic to select backend:
#         When running on Windows, nWorkers is None or nWorkers > PROCESS_POOL_EXECUTOR_MAX_WORKERS_LIMIT_WINDOWS, arrKwArgs is None: use multiprocessing.Pool
#         Otherwise, use concurrent.futures.ProcessPoolExecutor
# Returns list of results
def RunJobsWithProcessPool(arrFunctions : list, arrArgs : list = None, arrKwArgs : list = None,
    nWorkers : int = None, iPoolBackend : int = 0, IsBackendAutoSelectingEnabled : bool = False) -> list :
 
    # Handling null-references
    if arrArgs is None :
        arrArgs = []
    #End If
    if arrKwArgs is None :
        arrKwArgs = []
    #End If
    if iPoolBackend < 0 :
        iPoolBackend = 0
    elif iPoolBackend > 1 :
        iPoolBackend = 1
    #End If
 
    # Initialize result buffer
    arrResults = []
 
    # Check if we really need to create child workers
    IsChildWorkersNeeded = False
    if nWorkers is None :
        IsChildWorkersNeeded = True
    elif nWorkers > 1 :
        IsChildWorkersNeeded = True
    #End If
 
    # Backend auto selecting
    if IsBackendAutoSelectingEnabled :
        # Check if we are using Windows
        if IsRunningOnWindows() :
            # Check if we need a lot of processor cores
            IsProcessorCoreCountLarge = False
            if nWorkers is None :
                IsProcessorCoreCountLarge = True
            elif nWorkers > PROCESS_POOL_EXECUTOR_MAX_WORKERS_LIMIT_WINDOWS :
                IsProcessorCoreCountLarge = True
            #End If
            # Select backend
            if IsProcessorCoreCountLarge :
                iPoolBackend = 1
            else :
                iPoolBackend = 0
            #End If
        else :
            iPoolBackend = 0
        #End If
    else :
        # Avoid ValueError caused by concurrent.futures.ProcessPoolExecutor on Windows
        if IsRunningOnWindows() :
            if nWorkers is None :
                nWorkers = PROCESS_POOL_EXECUTOR_MAX_WORKERS_LIMIT_WINDOWS
            elif nWorkers > PROCESS_POOL_EXECUTOR_MAX_WORKERS_LIMIT_WINDOWS :
                nWorkers = PROCESS_POOL_EXECUTOR_MAX_WORKERS_LIMIT_WINDOWS
            #End If
        #End If
    #End If
 
    # Run jobs
    if IsChildWorkersNeeded :
        # Use backend selected by iPoolBackend to run works
        if iPoolBackend == 0 :
            # Using concurrent.futures.ProcessPoolExecutor
            # Multithread workers
            arrWorkers = []
 
            # Create thread pool object
            with concurrent.futures.ProcessPoolExecutor(max_workers=nWorkers) as polWorkers :
                # Create and submit job package
                for i in range(0, len(arrFunctions)) :
                    # Get function object and arguments
                    fncFunction = arrFunctions[i]
                    tplArgs = arrArgs[i] if (i < len(arrArgs)) else tuple()
                    dctKwArgs = arrKwArgs[i] if (i < len(arrKwArgs)) else dict()
 
                    # Submit a job
                    ftrCurrentWorker = polWorkers.submit(fncFunction, *tplArgs, **dctKwArgs)
 
                    # Cache current worker
                    arrWorkers.append(ftrCurrentWorker)
                #Next
 
                # Register completion list
                arrCompletedTasks = concurrent.futures.as_completed(arrWorkers)
 
                # Collect results
                for WorkerResult in arrCompletedTasks :
                    arrResults.append(WorkerResult.result())
                #Next
            #End With
        elif iPoolBackend == 1 :
            # Using multiprocessing.Pool
            # Multithread workers
            arrWorkers = []
 
            # Create thread pool object
            with multiprocessing.Pool(processes=nWorkers) as polWorkers :
                # Create and submit job package
                for i in range(0, len(arrFunctions)) :
                    # Get function object and arguments
                    fncFunction = arrFunctions[i]
                    tplArgs = arrArgs[i] if (i < len(arrArgs)) else tuple()
 
                    # Submit a job
                    resCurrentWorker = polWorkers.starmap_async(fncFunction, [tplArgs])
 
                    # Cache async results current worker
                    arrWorkers.append(resCurrentWorker)
                #Next
 
                # Collect results
                for WorkerResult in arrWorkers :
                    arrResults.append(WorkerResult.get()[0])
                #Next
            #End With
        #End If
    else :
        # Run jobs in current process directly
        for i in range(0, len(arrFunctions)) :
            # Get function object and arguments
            fncFunction = arrFunctions[i]
            tplArgs = arrArgs[i] if (i < len(arrArgs)) else ()
            dctKwArgs = arrKwArgs[i] if (i < len(arrKwArgs)) else dict()
 
            # Run job
            resCurrentWorker = fncFunction(*tplArgs, **dctKwArgs)
 
            # Collect results
            arrResults.append(resCurrentWorker)
        #Next
    #End If
 
    return arrResults
#End Fucntion
 
# Worker function
def DoWork(Param1, Param2, Param3) :
    # Do some work
    WorkResult = f"Pokemon#{Param1}{Param2}{Param3}"
 
    return WorkResult
#End Function
 
# Main entry point
if __name__ == "__main__" :
    # Run jobs in parallel
    # Create functions & arguments list
    arrWorkerFuncs = []
    arrWorkerArgs = []
    # Reult=DoWork(2,4,5)
    arrWorkerFuncs.append(DoWork)
    arrWorkerArgs.append((2,4,5))
    # Reult=DoWork(2,5,0)
    arrWorkerFuncs.append(DoWork)
    arrWorkerArgs.append((2,5,0))
    # Reult=DoWork(3,8,0)
    arrWorkerFuncs.append(DoWork)
    arrWorkerArgs.append((3,8,0))
    # Reult=DoWork(3,8,1)
    arrWorkerFuncs.append(DoWork)
    arrWorkerArgs.append((3,8,1))
    # Run jobs
    nParallelCPUJobCount = int(multiprocessing.cpu_count() / 2)
    arrWorkerResults = RunJobsWithProcessPool(arrWorkerFuncs, arrWorkerArgs,
        nWorkers=nParallelCPUJobCount)
    # Collect worker results
    for CurrentResult in arrWorkerResults :
        # Do post processting
        print(CurrentResult)
    #Next
#End If

参考资料:

https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods

https://gairuo.com/p/python-multiprocessing-pool

https://www.cnblogs.com/piperliu/articles/18615898

https://superfastpython.com/multiprocessing-pool-starmap_async/

https://martinlwx.github.io/en/data-parallel-with-two-different-api/

https://www.cnblogs.com/midworld/p/14614634.html

it
除非特别注明,本页内容采用以下授权方式: Creative Commons Attribution-ShareAlike 3.0 License