【Python 平行運算 #7】自動化平行任務程式模板 v2,讓系統發揮最大效能,自動分配資源平行運算

前言

此為 multiprocessing 的程式模板v2,(相比 v2 簡化為一支程式)
將任務封裝好後,直接丟入 list_tasks 即可自動發揮系統最大效能跑平行運算。

封裝的概念,請務必先自行理解。

我們通常在設計程式時,都會是預設以單一 process 去執行程式,
但有時候會要重複執行多種任務。

例:跑影片 x 100、分析結果 x 100、雜項任務 x 100…

會需要寫程式的任務,大部分都是需要被反覆執行的,我們正好利用這個特性

multiprocessing.Pool() 會依據系統現在的能力,自動分配任務,
也就是說會讓你的電腦在“接近極限卻不當掉的”情況下,
發揮最大多工的能力。

範例程式碼

  • start_multiprocess.py

請在這邊將所有封裝好的任務存入 list_tasks,(用 list_tasks.append())

例:list_tasks.append(CLASS_NAME(your_args, …))

封裝的任務必須至少包含以下兩個函數:

  • init(): 初始化變數
  • start_progress(): 讓自動化流程分配任務時,開始執行的入口

注意:請勿將 start_progress() 放在 init() 中執行,這樣在宣告階段程式就會先開始跑起來了! 失去了自動分配資源的效果!

import os
import cv2
import glob
import multiprocessing as mp
from multiprocessing import RLock
from tqdm import tqdm
from termcolor import colored

list_tasks = []

# ------------- [YOU ONLY NEED TO SET HERE] ------------- #
# * append your task in to list_tasks 
#   -> list_tasks.append(CLASS_NAME(your_args, ...))
#
# * your CLASS must have at least two functions:
#   -> __init__() and start_progress()
#---------------------------------------------------------#

# put your task here

#---------------------------------------------------------#
#---------------------------------------------------------#
#---------------------------------------------------------#
#---------------------------------------------------------#
#---------------------------------------------------------#

class howard_print(object):
    @staticmethod
    def info(str):
        print(colored(f"[Info] ", 'green') + str)

    @staticmethod
    def warn(str):
        print(colored(f"[Warning] ", 'yellow') + str)

    @staticmethod
    def error(str):
        print(colored(f"[Error] ", 'red') + str)

    @staticmethod
    def finish(str):
        print(colored(f"[Finished] ", 'cyan') + str)

    @staticmethod
    def undefined(str):
        print(colored(f"[Undefined] ", 'magenta') + str)


class multiprocess_task(object):
    def __init__(self, idx, total_job, each_task):
        os.system('cls' if os.name == 'nt' else 'clear')
        howard_print.info(f'Current working on job {idx}/{len(list_tasks)}.')
        each_task.start_progress(total_job)
        howard_print.info(f"job {idx}/{len(list_tasks)} finished !!!")


if __name__ == '__main__':
    tqdm.set_lock(RLock()) # for managing output contention

    cpu_count = mp.cpu_count()
    print(f" --------------------- [start_multiprocess.py] --------------------- ")
    howard_print.info(f"Your CPU count (worker count): {cpu_count}")
    pool = mp.Pool()

    res_list = []
    howard_print.info(f"Total task count = {len(list_tasks)}")
    howard_print.info(f"Task list = {list_tasks}")


    for idx in range(len(list_tasks)):
        res_list.append(pool.apply_async(multiprocess_task, (idx, len(list_tasks), list_tasks[idx])))

    for idx in range(len(res_list)):
        res_list[idx].get()