>  > ツバキ チェーン Tsubaki Chain kit FZS1000 YAMAHA FAZER (530 kind SIGMA 2 XRS)【ヨーロッパ直輸入品】 16 39 FZS1000 FAZER (1000) 01-06

ツバキ チェーン Tsubaki Chain kit FZS1000 YAMAHA FAZER (530 kind SIGMA 2 XRS)【ヨーロッパ直輸入品】 16 39 FZS1000 FAZER (1000) 01-06

index

01-06 (530 Tsubaki YAMAHA FZS1000 FAZER FZS1000 (1000) FAZER 2 Chain ツバキ kind XRS)【ヨーロッパ直輸入品】 チェーン 39 SIGMA FZS1000 kit 16 FAZER

ver2.6から追加された multiprocessingモジュールを使って並列処理させる. threadingを使う方法もあるが,クラス継承しないと行けないとか ちょっと遅いとか上手くCPU使えてないとかちょっと不満があるので,multiprocessingを使って話をする. これはthreadingと似たようなAPIを提供するのだが,GILを効率的に回避する.(theadではなくsubprocessを使う)

何はともあれサンプルを示す.単純にカウントダウンするという面白みもなんともないもの. 1Gをカウントダウンする.この場合4スレッドに分散させるので250Mをカウントしていることになるが….
threading01.py

import threading
import time
def countDown(n):
 while n > 0:
 n -= 1
class TestThread(threading.Thread):
 def __init__(self, n):
 threading.Thread.__init__(self)
 self._n = n
 def run(self):
 countDown(self._n)
n = int(1e9)
n1, n2, n3, n4 = int(n/4), int(n/4), int(n/4), int(n/4)
jobs = [TestThread(n1), TestThread(n2), TestThread(n3), TestThread(n4)]
start_time = time.time()
for j in jobs:
 j.start()
for j in jobs:
 j.join()
finish_time = time.time()
print(finish_time - start_time)

やることは単純で,threading.threadを継承するクラスを作ってrunメソッドをoverwrite(上書き)するだけ. で,あとはインスタンスを作ってstartとしてjoinする.

multiprocessing

次.multiprocessingを使った場合.こちらはクラスを継承する必要がなく,簡単にスレッドを実装できる. 上と同じでカウントダウンをさせましょうか.
multiprocessing01.py

from multiprocessing import Process
import time
def countDown(n):
 while n > 0:
 n -= 1
n = int(1e9)
n1, n2, n3, n4 = int(n/4), int(n/4), int(n/4), int(n/4)
jobs = [
 Process(target=countDown, args=(n1,)),
 Process(target=countDown, args=(n2,)),
 Process(target=countDown, args=(n3,)),
 Process(target=countDown, args=(n4,)),
 ]
start_time = time.time()
for j in jobs:
 j.start()
for j in jobs:
 j.join()
finish_time = time.time()
print(finish_time - start_time)

こちらのほうが簡単だね.関数を作ってProcessに投げる.後はstartしてjoinするだけ. 実行すると気づくと思うが,multiprocessingはちゃんと子プロセスを作る. topやらで見ると確かにProcessの数だけpythonが動いている.
一方でthreadingの方はそうではない.プロセスが一つだけ動くだけ. これや実行速度やCPU占有時間にも現れている. threadingは一つプロセスの中で複数のスレッドを立てて実行しているのに対して, multiprocessingは作ったスレッド分だけ子プロセスを作ってそれを制御する. (GILが大きなネックになっている証拠?) という事で サマータイヤ 215/55R18 95V ダンロップ エナセーブ RV504 & ロジャム スプレッド チタンアッシュカット 7.0-18 タイヤホイール4本セット,今回はmultiprocessingを中心に書く.

ツバキ チェーン Tsubaki Chain kit FZS1000 TC-10 YAMAHA FAZER (530 kind SIGMA 2 XRS)【ヨーロッパ直輸入品】 16 39 FZS1000 FAZER ピストン径:Φ71.95mm (1000) 01-06

もう少しmultiprocessingについて

一方のプロセスでデータを取って片方で処理する場合,データ取り終わって処理開始させるよりはデータは連続して取って処理は別に動かすというのが普通である. だがしかし,これが結構難しい. threadingは一つのプロセスだけで処理するから通信が楽であるが,独立したプロセスを作ってそれの通信と言うのは難しい. C++での実装の際にも問題になる(らしい)のだが,どうやって解決するか?
いくつかの方法が供給されている.

  1. shared memoryを使う
  2. server processを使う

1は一般的な共有メモリを使うという方法.型は ,arrayモジュールの書式に従う(ctypeと言うべきか). 大きく分けて2つあり,ArrayとValueである. 公式サンプル (ctypeについてはいつか…いつの日にか….cのsharedが使えるようにする,早い話)

2は上記ctypeでは記述が難しいlist,dictionary, Namespace, Lock, etcを扱える. サーバとクライアントの仕組みを使ってpythonオブジェクトプロキシでやり取りする。
multiproc_manager01.py

#!/usr/bin/env python
#coding:utf-8
from multiprocessing import Process, Manager
def func(l, d):
 l.append(1)
 d.update({2:"hello"})
 d[10] = "world"
if __name__ == "__main__":
 manager = Manager()
 l = manager.list(range(2))
 d = manager.dict()
 p = Process(target=func, args=(l,d))
 p.start()
 p.join()
 print(l)
 print(d)
$ python multiproc01.py
[0, 1, 1]
{2:"hello", 10:"world"}

と通信が出来る.これ以上は骨が折れるので,簡単なサンプルを示す.
dstat-modoki.py

#!/usr/bin/env python
#coding:utf-8
import time
from multiprocessing import Process, Manager, Array
def getCPUInfo():
 
 with open("/proc/stat") as f:
 lines = [[float(elm) for elm in v.split()[1:]] 
 for v in f.readlines() if v[:3] == "cpu"]
 return lines
 
def getCPUUsage(interval=1.):
 s1 = getCPUInfo()
 time.sleep(interval)
 s2 = getCPUInfo()
 num_of_cpu = len(s1) - 1
 cpu_total = [map(lambda x,y:(y-x)/num_of_cpu/interval/100. , s1.pop(0), s2.pop(0))]
 return cpu_total + [[(s2[i][j] - s1[i][j])/interval/100. for j,eml in enumerate(v)] \
 for i,v in enumerate(s1)]
def getMEMInfo():
 
 with open("/proc/meminfo") as f:
 lines = [ v.strip().split(":") for v in f.readlines()]
 
 mem_dict = {}
 for l in lines:
 mem_dict[l[0]] = float(l[1].strip().split()[0])
 return mem_dict
def getMEMUsage():
 memory_usage = getMEMInfo()
 return ((memory_usage["MemFree"] + memory_usage["Buffers"] + memory_usage["Cached"]), \
 memory_usage["MemTotal"])
def getLoadAvgInfo():
 with open("/proc/loadavg") as f:
 return [float(v) for v in f.read().split()[:3]]
def getNETInfo():
 """
 Inter -| Receive | Transmit
 face | bytes packets errs drop fifo frame compressed multicast| bytes ....
 lo: 0 0 0 ...
 """
 with open("/proc/net/dev") as f:
 lines = [v.split(":") for v in f.readlines()[2:]]
 net_dict = {}
 for l in lines:
 net_dict[l[0].strip()] = [int(v) for v in l[1].split()]
 return net_dict
def getNetUsage(interval=1., total=False):
 """
 dictionary of list
 receive's bytes, packets, errs, drop, fifo, compressed, multicast, 
 transmit's bytes, packets, errs, drop, fifo, compressed, multicast
 total 16 elements
 """
 net_dict1 = getNETInfo()
 time.sleep(interval)
 net_dict2 = getNETInfo()
 
 net_usage = {}
 for k,v in net_dict1.items():
 net_usage[k] = [0.]*16
 for i, elm in enumerate(v):
 net_usage[k][i] = (net_dict2[k][i] - elm)/interval
 if total:
 return (net_usage, net_dict2)
 else:
 return net_usage
def getCPU(l, interval=1):
 l += getCPUUsage(interval)
def getMEM(arr,):
 arr[0], arr[1] = getMEMUsage()
def getNET(d, interval=1):
 d.update(**getNetUsage(interval))
def getCPUStr(arr, progbar=False, bar="|"):
 if progbar:
 return "{0:6.1%} [{1:10s}]".format(arr[0][0], bar*int(arr[0][0]*10))
 else:
 return "C:{0:6.1%}".format(arr[0][0])
def getMEMStr(arr):
 return "M:{0:4.1f}/{1:4.1f}GB".format((arr[1] - arr[0])/1024/1024, arr[1]/1024/1024)
def getNETStr(dic, key):
 return "D:{0:7.3f}MB U:{1:7.3f}MB".format(dic[key][0]/1024/1024, dic[key][8]/1024/1024)
if __name__ == '__main__':
 ## ネットワークデバイス名を指定.eth0とかeth1とか
 ## intervalで更新時間を表示できる
 net_dev = "eth1"
 interval = 1.
 manager = Manager()
 cpu_usage = manager.list()
 mem_usage = Array("d", (0., 0.))
 net_usage = manager.dict()
 jobs = [Process(target=getCPU, args=(cpu_usage, interval)),
 Process(target=getMEM, args=(mem_usage, )),
 Process(target=getNET, args=(net_usage, interval))]
 for j in jobs:
 j.start()
 for j in jobs:
 j.join()
 
 message_cpu = getCPUStr(cpu_usage, False)
 message_net = getNETStr(net_usage, net_dev)
 message_mem = getMEMStr(mem_usage)
 print("{0} {1} {2}".format(message_cpu, message_net, message_mem))

dstatの代わり.tmuxのために作った.
CPU使用量(/proc/stat),物理メモリ(/proc/meminfo),ネットワーク(/proc/net/dev),ロードアベレージ(/proc/loadavg)を並列して取得する.
(実際はロードアベレージ以外を取得する)
並行でやることもないようなものだが,サンプルとしては丁度よいか.

中味はただパースして数値に変換しているだけ.難しい処理は行なっていない. できるだけワンラインで書けるように,処理を細かい処理で構成するようにした. 各関数で処理した値は,Managerクラスを使って渡している. 何が何であるのかはっきりしているのであればArrayを使っても構わない.

Queue/Pipe

次にQueue,Pipeを使っての処理.Queueはいわゆる生産者/消費者モデルの簡単な例である. Pipeも似たものであるが,双方向の通信が可能.Queueは一方通行という感じか.
例のごとく公式のサンプルを張り付けると,

from multiprocessing import Process, Queue
def f(q):
 q.put([42, None, 'hello'])
if __name__ == '__main__':
 q = Queue()
 p = Process(target=f, args=(q,))
 p.start()
 print q.get() # prints "[42, None, 'hello']"
 p.join()

別プロセスを走らせる.そのプロセス内でQueueインスタンスのputメソッドを呼び出してデータ[42, None, 'hello']を"put"する. メインのプロセスでgetメソッドを使ってそれを取り出す.
時間のかかる処理やかかる時間がまちまちな処理を走らせデータをputする. queueに置かれたデータをもう一方のプロセスで処理する.例えばファイルやDBに書き出す様な. そういった用途に使える.

Pipeであるが,Queueの発展のようなもので相互でのやり取りが可能である.

from multiprocessing import Process, Pipe
def f(conn):
 conn.send([42, None, 'hello'])
 conn.close()
if __name__ == '__main__':
 parent_conn, child_conn = Pipe()
 p = Process(target=f, args=(child_conn,))
 p.start()
 print parent_conn.recv() # prints "[42, None, 'hello']"
 p.join()

multiproc_pipe01.py

# 上のサンプルが物足りないので通信
# threadingのLockも利用して表示のロックを図る
# これをしないとスレッドが別々に非同期に動いてしまう.
from multiprocessing import Pipe, Process, Lock
import time
import sys
def func(conn, lc):
 conn.send("< hello")="" lc.acquire()="" while="" true:="" if="" conn.poll():="" break="" else:="" sys.stdout.write(".")="" sys.stdout.flush()="" time.sleep(1.)="" sys.stdout.write("\n")="" print(conn.recv())="" lc.release()="" conn.close()="" if="" __name__="=" '__main__':="" lc="Lock()" parent_conn,="" child_conn="Pipe()" p="Process(target=func," args="(child_conn," lc))="" p.start()="" ##="" 画面上に表示させるまでスレッドをロック="" lc.acquire()="" print(parent_conn.recv())="" lc.release()="" #="" ロック解除="" time.sleep(3.)="" parent_conn.send("="">> hi! what's up?")
 p.join() 

Pipeでは子プロセスを殺す時,Queueよりは簡単に組める.
multiproc_pipe02.py

from multiprocessing import Pipe, Process
import time
import datetime
import random
import math
def genRandom(conn):
 while True:
 if conn.poll():
 if conn.recv() is None:
 conn.close()
 break
 else:
 time.sleep(random.randint(1,15)*0.01)
 conn.send(random.normalvariate(10, 0.6))
 print("connection closed")
 conn.close()
def cleanUp(arr):
 return filter(lambda x: x is not None, arr)
def getMean(arr):
 return 1.*sum(arr)/len(arr)
def getStd(arr):
 m = getMean(arr)
 return math.sqrt(sum(map(lambda x: (x - m)*(x - m), arr))/len(arr))
if __name__ == '__main__':
 ## create Pipe instance and Process instance
 ## Pipe returns two instance; parent and child
 ## child will run in subprocess
 ## parent will manage child
 parent_conn, child_conn = Pipe()
 p = Process(target=genRandom, args=(child_conn, ))
 p.daemon = True
 ## check current date
 start_time = datetime.datetime.now()
 p.start() # process start
 timeinterval = 10 # sec
 timeinterval = datetime.timedelta(seconds=timeinterval)
 sample_size = 1e2
 i = 0
 data_array = [None] * int(sample_size)
 while i < int(sample_size):="" print(p.is_alive())="" current_time="datetime.datetime.now()" if="" (current_time="" -="" start_time)=""> timeinterval: # elapsed setting value, break loop
 print("time out!")
 break
 else:
 res = parent_conn.recv()
 data_array[i] = res
 if i % 10 == 0 and i != 0:
 print("{0:%x %X}: {1:.2f}".format(datetime.datetime.now(), res))
 i += 1
 
 ## at first send "Signal" to child
 parent_conn.send(None)
 p.join() # join Process
 data_array = cleanUp(data_array)
 print("total event: {0}".format(i))
 print("mean: {0}, std: {1}".format(getMean(data_array), getStd(data_array)))
 print("done")

子プロセスはランダム秒待って乱数を親プロセスに送る. 親プロセスはひたすらそれを集める.
もし子プロセスが一定時間経過した,あるいは一定数のイベント集め終わったならば 子プロセスを終了し集めたデータの平均値と偏差を出力する.
子プロセスを親は直接殺さない. 終了のシグナルとしてNoneを子プロセスに送り ミシュラン PRIMACY 4 プライマシー4 サマータイヤ 215/45R18 BLEST Bahnsport Type902 ホイールセット 4本 18インチ 18 X 7 +48 5穴 100,子プロセスは読み取るべきデータがあり(pollメソッドで判断) かつそれがNoneで有るときにプロセスを終了する.
子プロセスが終了するとProcessインスタンスが勝手にterminateするのでjoin出来る. 後は後処理で BMW 5シリーズ フロント 左右セット ブレーキディスクローター PDタイプ 81~88 BMW E28 520i (D20) PD1212210 【送料無料】【DIXCEL】ポイント5倍【ディクセル】,平均値・偏差を求めて出力する.

恐らく,server/clientモデルで作った方がプロセスの終了などを管理しやすくなると思うのだが… Pipeを使ってもまぁまぁなものが作れるので,それはそれで便利かと思われる.

append

例のエキスパートPythonプログラミングによると, multiprocessingはthreadingのラップではなくos.fork+subprocessの組み合わせのようなものらしい…. プロセスが作られた瞬間自動的にforkしてプロセスを走らせるとのこと. また, 共有メモリスペースを提供するmultiprocessing.Arrayとmultiprocessing.Valueクラスは極力使わないほうが良い (pp365)と書かれている. 並列化のボトルネックやコードの複雑化を引き起こすから 17インチ サマータイヤ セット【適応車種:アテンザセダン(GH系)】HOT STUFF ラフィット LW-03 ブラックポリッシュ 7.0Jx17プロクセス CF2 215/50R17,だそうだ. もしデータをやりとりしたければ,ManagerのValue, dictやlist,あるいはctypeを使うべきだ,という事だろう.
(multiprocessingのValueではなく,managerクラスのValueやArrayを使うべきである.後者はProxyオブジェクトなので 安全に共有メモリにアクセスでき,必要であればロックを掛けることが出来る.)

実は最後のサンプルには落とし穴があった.処理に時間がかかる場合,L59のwhileループで詰まる. ここではサンプルサイズと経過時間をチェックしているのだが,elseの中つまり処理に時間を取られるとここで滞ってしまう. 従って目的の時間を大幅に過ぎてからループを抜ける可能性があるのだ.
そこでこれを回避するために,

ツバキ チェーン Tsubaki Chain kit FZS1000 YAMAHA FAZER (530 kind SIGMA 2 XRS)【ヨーロッパ直輸入品】 16 39 FZS1000 FAZER (1000) 01-06

カロ/KARO フロアマット QUEST 品番:149 カラー:ウォームブラック,ウォームグレージュ,クールパープル フィアット 149レガッタ ハンドル:左 FF 1983年09月~, 15インチデックスM401FENKEI オール オールワン マシニングガンメタ 6.0Jx15ヨコハマ エコス ES31 185/55R15, RS-R RS-Rダウン 1台分 ダウンサス ウィンダム MCV21 T263D 取付セット アライメント込 RSR RS★R DOWN ダウンスプリング バネ ローダウン コイルスプリング【店頭受取対応商品】, カードでポイント最大34倍 3/21(木)20:00~3/26(火)1:59迄 DUNLOP ダンロップ エナセーブ EC204 ENASAVE サマータイヤ 215/45R18 KYOHO 共豊 スマック プライム ヴァニッシュ SMACK VANISH ホイールセット 4本 18インチ 18 X 7.5 +38 5穴 114.3, カヤバ KYB ローファースポーツ フロント(左右セット) ウィッシュ ZGE22W 3ZRFAE(2.0L) FF 09/4~ ショックアブソーバー フロントブレーキホースキット スモークホース メッキ アルミ MBX50/F 82~86年 スウェッジライン(SWAGE-LINE) HONDA, [3551535] DIXCEL PD ブレーキローター リヤ用 アテンザスポーツワゴン GH5AW 08/01~12/11, 本州送料無料!CLAZZIO クラッツィオツィールトヨタ ランドクルーザープラド 120系 5人乗り, 15インチ サマータイヤ セット【適応車種:ゼスト スパーク(JE系 ターボ車)】WEDS レオニス NAVIA 06 マットガンメタマシニングカット 4.5Jx15ヨコハマ Sdrive ES03 165/50R15, NA Metal Craft エヌエーメタルクラフト 車高調整関係 レイダウンリンクKIT W400 W650 W800 MMG YTX5L-BS ATV バッテリー Arctic Cat E-Ton Honda Kasea Kawasaki Replaces M32X5B (海外取寄せ品)

ツバキ チェーン Tsubaki Chain kit FZS1000 YAMAHA FAZER (530 kind SIGMA 2 XRS)【ヨーロッパ直輸入品】 16 39 FZS1000 FAZER (1000) 01-06:17インチスイフトスポーツZC31SWEDS ウェッズスポーツ SA-72R ハイパーブラッククリア 7.5Jx17ZIEX ZE914F 215/40R17 GUTS CHROME ガッツ クローム MP700MC MOON EYES オリジナル Hi Flow エアクリーナー ガッツ クローム

ツバキ チェーン Tsubaki Chain kit FZS1000 YAMAHA FAZER (530 kind SIGMA 2 XRS)【ヨーロッパ直輸入品】 16 39 FZS1000 FAZER (1000) 01-06.グロンドマン シグナスX SE44J(28S)前期[エンボスイエロー/黒ダブルステッチ](張替)■グロンドマン国産シートカバー[GH5556YC250SW10] マスカロード ロイヤルシルバーメタリック/バイオレットセンシティブブルーメタリック クリア/通常スクリーン シックデザイン CB400SF・Ver.S(92~98年)

ピーエムシー PMC YSS ツイン リアショック スポーツライン G366 94年以降 インパルス400 330mm 黒/赤 27N 116-601671S7 HD店 【イベント開催中!】 GPR GP EVOLUTION TITAN 【GPエボリューションチタン】 (HONDA CBR 600 F - SPORT 2001-07 WITH AND WITHOUT PROBE SLIP ON EXHAUST MUFFLER) スリップオンマフラー

,もうひとつ処理専門のプロセスを作りデータを投げてやるのだ.
それがこれ.multiproc_pipe03.py (要pyROOT)

メインのスクリプトでは,単にプロセスの管理だけを行なっている. 一定時間経過したあるいは一定のイベント数に到達したかを判断し,それに応じてプロセスに司令を与える.

で,コレで何が出来るかというと仕事の分担が出来る. 要するにデータ収集とデータ処理を並列しておこなうとき楽なのだ. 処理に時間がかかる場合,1つのスレッドで仕事をさせる場合DAQにデッドタイムが生じてしまう. 結局そんなシビアな条件を課すんだったら Dimotiv ディモーティヴ アジャスタブルショートレバー ブレーキ/クラッチセット タイプ3 エクステンションカラー:ダークグリーン ボディーカラー:ダークグリーン(アジャスターカラー:シルバー) CBR150R,

ツバキ チェーン Tsubaki Chain kit FZS1000 YAMAHA FAZER (530 kind SIGMA 2 XRS)【ヨーロッパ直輸入品】 16 39 FZS1000 FAZER (1000) 01-06

,Cppで書いちゃうけどね….

{yahoojp} {quark.kj.yamagata-u.ac.jp}
{yahoojp}jpprem01-zenjp40-wl-zd-32704