並列で記録とか
Pythonのmultiprocessingでメモリ使用量を記録してみる。 memory_profilerくらいに使いやすくできれば結構だが、今回はちょっとした試みなのでそこまで真面目には作らない(作れない)。
方針としては、計測対象の処理を定義した関数を実行するプロセスと、メモリを記録するプロセスを立てて、独立に動かす。 せっかくなので、プロセス間通信だとか共有メモリだとかの機能も試してみる。
ライブラリはこう。
import functools import memory_profiler import multiprocessing import numpy as np import os import psutil import time
まず、記録する側。
class Reporter: def __init__(self, pipe, delta_seconds=0.1): self.pipe = pipe self.delta_seconds = delta_seconds self.memory_stock = [] def step(self, state): if state.value == 1: return False time.sleep(self.delta_seconds) if state.value == 1: return False self.memory_stock.append(self.target_proc.memory_info().rss) return True def run(self, state, memory_queue): pid = self.pipe.recv() self.target_proc = psutil.Process(pid) try: while self.step(state): ... except psutil.NoSuchProcess: ... memory_queue.put(self.memory_stock)
上のコードで、pipe
、state
、memory_queue
の3つは通信に使用する。pipe
は実行側プロセスとのパイプ用オブジェクトで、今回はpidを受け取るのに使用する。state
は共有メモリで、実行側プロセスの終了を検知するために使用する。memory_queue
は実行結果を吐き出すために用意しているキューである。
次は実行側を作ってみる。
class Runner: def __init__(self, pipe, worker, worker_args=None, worker_kwargs=None): self.pipe = pipe self.worker = worker self.worker_args = worker_args self.worker_kwargs = worker_kwargs def run(self, state, result_queue): pid = os.getpid() self.pipe.send(pid) args = [] if self.worker_args is None else self.worker_args kwargs = dict() if self.worker_kwargs is None else self.worker_kwargs result = self.worker(*args, **kwargs) result_queue.put(result) state.value = 1
可もなく不可もなくという感じである。多少ジェネリックにしようということで関数workerとargsとkwargsを引数に入れる形式にしている。
ここまでで作ったものを組み立ててメモリを計測するフローを作る。多少使い勝手を意識して、デコレータにしてみる。
def my_memory_profiler(func): @functools.wraps(func) def converted_func(*args, **kwargs): state = multiprocessing.Value('i', 0) pipe1, pipe2 = multiprocessing.Pipe() result_queue = multiprocessing.Queue() memory_queue = multiprocessing.Queue() reporter = Reporter(pipe1) runner = Runner(pipe2, func, args, kwargs) p_rep = multiprocessing.Process(target=reporter.run, args=(state, memory_queue)) p_run = multiprocessing.Process(target=runner.run, args=(state, result_queue)) p_rep.start() p_run.start() memory = memory_queue.get() result = result_queue.get() p_run.join() p_rep.join() print('memory usage:') print(np.array(memory)) return result return converted_func
これを適当な関数で使うことができる。例えば
@my_memory_profiler def measure_memory_usage_test(data): positive_mask = (data > 0) time.sleep(1) resulting_data = np.where(positive_mask, np.sqrt(data), 2 * data) time.sleep(1) return resulting_data def measure_memory_usage_interface(): print('preparing data') data = np.random.random(1000000) print('running operation') measure_memory_usage_test(data)
計測結果の妥当性などは検証していないが、とりあえず通信は想定通りに動いていそうである。 まあまあ面白い。graphlibあたりと組み合わせて処理フローを構築するのも面白そうである。