流れに従ひて己を失はず.

日々の研究について書きます

pythonのmultiprocessingで複数引数複数戻値を並列処理

毎回忘れるのでメモ.

pythonで複数引数をとって,複数戻値しかも配列を返す処理を並列で行う.

参考: https://qiita.com/wikipediia/items/2919362de582a7d8de9e

import time
import numpy as np
import os
from multiprocessing import Pool

#総スレッド数の取得
cpu_count = os.cpu_count()

#最終的に作りたい出力結果の行数
df_size = 1234

#作りたいデータその1
#df_size*2の2次元データ
#n行目のデータは[n,2*n+1]にしたい
data_frame = np.ones([df_size,2],dtype=int)

#作りたいデータその2
#df_size*2の2次元データ
#n行目のデータは[n,n^2+1]にしたい
data_frame_s = np.ones([df_size,2],dtype=int)

#それぞれのスレッドに割振る行数の開始番号と終了番号の組
sta_gol = np.ones([cpu_count,2],dtype=int)
for i in range(0,cpu_count):
    sta = (df_size//cpu_count)*i
    gol = (df_size//cpu_count)*(i+1)-1
    if i == cpu_count-1:
        gol = df_size-1
    sta_gol[i,:] = [sta,gol]
print(sta_gol)

# ここを並列処理する
def func(i,arg1, arg2):
    sta = int(arg1)
    gol = int(arg2)
    print("Thread for sta:",sta," gol:", gol)
    
    #作りたいデータの割り当てられた各部分を作る処理
    data_frame_sg = np.ones([gol+1-sta,2],dtype=int)
    data_frame_sg_s = np.ones([gol+1-sta,2],dtype=int)
    for r in range(sta,gol+1):
        data_frame_sg[r-sta,:] = [r,2*r+1]
        data_frame_sg_s[r-sta,:] = [r,r*r+1]
    
    #ダミーの処理
    time.sleep(5/(i+1))
    
    return i,data_frame_sg,data_frame_sg_s
    
def wrapper(args):
    # argsは(i,sta, gol)となっている
    return func(*args)

def multi_process(sampleList):
    # プロセス数:cpu_count
    p = Pool(cpu_count)
    #20220718追記
    # M1Macだとp = get_context("fork").Pool(cpu_count)
    # 参考:https://stackoverflow.com/questions/67999589/
    
    #ラッパにわたす
    output = p.map(wrapper, sampleList)
    
    # プロセスの終了
    p.close()
    return output

def mproc():
    # (i,sta, gol)が引数になる
    sampleList = [(i,sta_gol[i,0],sta_gol[i,1]) for i in range(cpu_count)]

    return multi_process(sampleList)

start = time.time()

#実行するところ
mproc_output = mproc()

elapsed_time = time.time() - start
print ("elapsed_time:{0}".format(round(elapsed_time,2)) + "[sec]")

#並列処理の結果をもとのスキーマに直すところ
for i in range(0,len(mproc_output)):
    ind = mproc_output[i][0]
    data_frame[sta_gol[ind,0]:sta_gol[ind,1]+1,:] = mproc_output[i][1]
    data_frame_s[sta_gol[ind,0]:sta_gol[ind,1]+1,:] = mproc_output[i][2]
print(data_frame)
print(data_frame_s)