並列で記録とか

Pythonのmultiprocessingでメモリ使用量を記録してみる。 memory_profilerくらいに使いやすくできれば結構だが、今回はちょっとした試みなのでそこまで真面目には作らない(作れない)。

方針としては、計測対象の処理を定義した関数を実行するプロセスと、メモリを記録するプロセスを立てて、独立に動かす。 せっかくなので、プロセス間通信だとか共有メモリだとかの機能も試してみる。

ライブラリはこう。

import functools
import memory_profiler
import multiprocessing
import numpy as np
import os
import psutil
import time

まず、記録する側。

class Reporter:
    def __init__(self, pipe, delta_seconds=0.1):
        self.pipe = pipe
        self.delta_seconds = delta_seconds
        self.memory_stock = []

    def step(self, state):
        if state.value == 1:
            return False
        time.sleep(self.delta_seconds)
        if state.value == 1:
            return False
        self.memory_stock.append(self.target_proc.memory_info().rss)
        return True

    def run(self, state, memory_queue):
        pid = self.pipe.recv()
        self.target_proc = psutil.Process(pid)
        try:
            while self.step(state):
                ...
        except psutil.NoSuchProcess:
            ...
        memory_queue.put(self.memory_stock)

上のコードで、pipestatememory_queueの3つは通信に使用する。pipeは実行側プロセスとのパイプ用オブジェクトで、今回はpidを受け取るのに使用する。stateは共有メモリで、実行側プロセスの終了を検知するために使用する。memory_queueは実行結果を吐き出すために用意しているキューである。

次は実行側を作ってみる。

class Runner:
    def __init__(self, pipe, worker, worker_args=None, worker_kwargs=None):
        self.pipe = pipe
        self.worker = worker
        self.worker_args = worker_args
        self.worker_kwargs = worker_kwargs

    def run(self, state, result_queue):
        pid = os.getpid()
        self.pipe.send(pid)
        args = [] if self.worker_args is None else self.worker_args
        kwargs = dict() if self.worker_kwargs is None else self.worker_kwargs
        result = self.worker(*args, **kwargs)
        result_queue.put(result)
        state.value = 1

可もなく不可もなくという感じである。多少ジェネリックにしようということで関数workerとargsとkwargsを引数に入れる形式にしている。

ここまでで作ったものを組み立ててメモリを計測するフローを作る。多少使い勝手を意識して、デコレータにしてみる。

def my_memory_profiler(func):

    @functools.wraps(func)
    def converted_func(*args, **kwargs):
        state = multiprocessing.Value('i', 0)
        pipe1, pipe2 = multiprocessing.Pipe()

        result_queue = multiprocessing.Queue()
        memory_queue = multiprocessing.Queue()

        reporter = Reporter(pipe1)
        runner = Runner(pipe2, func, args, kwargs)

        p_rep = multiprocessing.Process(target=reporter.run,
                                        args=(state, memory_queue))
        p_run = multiprocessing.Process(target=runner.run,
                                        args=(state, result_queue))

        p_rep.start()
        p_run.start()
        memory = memory_queue.get()
        result = result_queue.get()
        p_run.join()
        p_rep.join()

        print('memory usage:')
        print(np.array(memory))

        return result

    return converted_func

これを適当な関数で使うことができる。例えば

@my_memory_profiler
def measure_memory_usage_test(data):
    positive_mask = (data > 0)
    time.sleep(1)
    resulting_data = np.where(positive_mask, np.sqrt(data), 2 * data)
    time.sleep(1)
    return resulting_data


def measure_memory_usage_interface():
    print('preparing data')
    data = np.random.random(1000000)
    print('running operation')
    measure_memory_usage_test(data)

計測結果の妥当性などは検証していないが、とりあえず通信は想定通りに動いていそうである。 まあまあ面白い。graphlibあたりと組み合わせて処理フローを構築するのも面白そうである。