[Python] Queue를 이용한 Thread Pool의 BackgroundWorker 사용 Develop Tip

C#에서는 Component Model 로 BackgroundWorker 라는 객체가 있습니다.
하나를 생성하면 하나의 쓰레드가 만들어지면서 그 결과를
콜백으로 받는 것이지요.


#!/usr/bin/env python
#coding=utf8
import Queue
import threading
import thread
import time
import datetime
import random
###############################################################################
class cmdQ():
#==========================================================================
CMDS = {}
#==========================================================================
def __init__(self, cmd, *args, **kwargs):
if not cmdQ.CMDS.has_key(cmd):
raise ReferenceError('Invalid cmdQ.command<%s>' % cmd)
self.cmd = cmd
self.args = args
self.kwargs = kwargs
#==========================================================================
def __repr__(self):
sb = ['cmdQ instance{']
sb.append('cmd="%s"' % self.cmd)
for arg in self.args: sb.append(',%s' % arg)
for k,v in self.kwargs.items(): sb.append(',%s="%s"' % (k,v))
sb.append('}')
return ''.join(sb)
#==========================================================================
@staticmethod
def SetCommands(cmds):
if not isinstance(cmds,dict):
raise TypeError('cmdQ.SetCommands need only dictionary type parameters but <%s>' % type(cmds))
cmdQ.CMDS = cmds
###############################################################################
class BackgroundWorker(threading.Thread):
"""Background worker using threads"""
#==========================================================================
DO=0
DONE=1
EXCEPT=2
#==========================================================================
def __init__(self, Q):
threading.Thread.__init__(self)
self.Q = Q
self.is_doing = False
#==========================================================================
def run(self):
t_ident = str(thread.get_ident())[-4:]
print('[%s] BackgroundWorker.run Starting...' % t_ident)
while True:
cmdq = self.Q.get()
cmdq.kwargs['t_ident'] = t_ident
self.is_doing = True
try:
if not cmdQ.CMDS.has_key(cmdq.cmd):
raise LookupError('Cannot find command "%s"' % cmdq.cmd)
cmdQ.CMDS[cmdq.cmd][BackgroundWorker.DO](cmdq)
cmdQ.CMDS[cmdq.cmd][BackgroundWorker.DONE](cmdq)
except Exception, e:
cmdQ.CMDS[cmdq.cmd][BackgroundWorker.EXCEPT](cmdq,e)
finally:
# send done signal to Q
self.is_doing = False
self.Q.task_done()
print('[%s] BackgroundWorker.run end.' % t_ident)
#==========================================================================
def isDoing(self):
return self.is_doing
###############################################################################
class doJob():
def doA(self, cmdq):
print('[%s] %s doing~' % (datetime.datetime.now(),cmdq))
time.sleep(1)
def doneA(self, cmdq):
print('[%s] %s done!' % (datetime.datetime.now(),cmdq))
def doB(self, cmdq):
print('[%s] %s doing~' % (datetime.datetime.now(),cmdq))
time.sleep(2)
raise NotImplementedError('Test Error') # 테스트로 예외발생
def doneB(self, cmdq):
print('[%s] %s done!' % (datetime.datetime.now(),cmdq))
def doC(self, cmdq):
print('[%s] %s doing~' % (datetime.datetime.now(),cmdq))
time.sleep(3)
def doneC(self, cmdq):
print('[%s] %s done!' % (datetime.datetime.now(),cmdq))
def doFail(self, cmdq, e):
print('[%s] %s fail! <%s>' % (datetime.datetime.now(),cmdq,e))
###############################################################################
def main():
# 1) 실제 작업을 수행할 개체를 생성한다
job = doJob()
# 2) 쓰레드작업에서 수행할 명령어 및 콜백함수를 지정한다
#  '명령':(시작작업함수, 완료후작업함수, 실패시작업함수)
#  세가지 함수 모두 첫번째 인자로 cmdQ 객체가 넘어오며
#  실패시작업함수인 경우 두번째 인자로 Exception 객체가 넘어옴
cmds = {
'A':(job.doA,job.doneA,job.doFail),
'B':(job.doB,job.doneB,job.doFail),
'C':(job.doC,job.doneC,job.doFail),
}
# 위에서 지정한 명령을 설정 (클래스 함수)
cmdQ.SetCommands(cmds)
# 쓰레드 작업 개수 설정
NUM_WORKERS = 5
# 작업 대기큐
Q = Queue.Queue()
# 3) 실제 작업 쓰레드 생성
for _ in range(NUM_WORKERS):
bw = BackgroundWorker(Q)
bw.setDaemon(True)
bw.start()
# 4) 테스트로 10개의 작업을 작업큐에 넣음
for _ in range(10):
rand_cmd = cmds.keys()[random.randint(0,len(cmds)-1)]
cmdq = cmdQ(rand_cmd,param_cmd=rand_cmd)
Q.put(cmdq)
# 3초가 지난 후 대기 큐에 아무 작업도 없으면 종료
time.sleep(3)
Q.join()
###############################################################################
if __name__=='__main__':
main()

위와 같이 수행하면 그 결과는,

[9584] BackgroundWorker.run Starting...
[0064] BackgroundWorker.run Starting...
[7360] BackgroundWorker.run Starting...
[2336] BackgroundWorker.run Starting...
[2013-06-05 14:50:05.989471] cmdQ instance{cmd="C",t_ident="9584",param_cmd="C"} doing~
[2013-06-05 14:50:05.989534] cmdQ instance{cmd="B",t_ident="0064",param_cmd="B"} doing~
[2013-06-05 14:50:05.989573] cmdQ instance{cmd="B",t_ident="7360",param_cmd="B"} doing~
[2013-06-05 14:50:05.989609] cmdQ instance{cmd="A",t_ident="2336",param_cmd="A"} doing~
[4656] BackgroundWorker.run Starting...
[2013-06-05 14:50:05.989661] cmdQ instance{cmd="A",t_ident="4656",param_cmd="A"} doing~
[2013-06-05 14:50:06.991274] cmdQ instance{cmd="A",t_ident="2336",param_cmd="A"} done!
[2013-06-05 14:50:06.991405] cmdQ instance{cmd="A",t_ident="2336",param_cmd="A"} doing~
[2013-06-05 14:50:06.991439] cmdQ instance{cmd="A",t_ident="4656",param_cmd="A"} done!
[2013-06-05 14:50:06.991469] cmdQ instance{cmd="A",t_ident="4656",param_cmd="A"} doing~
[2013-06-05 14:50:07.991778] cmdQ instance{cmd="B",t_ident="0064",param_cmd="B"} fail! <Test Error>
[2013-06-05 14:50:07.991884] cmdQ instance{cmd="C",t_ident="0064",param_cmd="C"} doing~
[2013-06-05 14:50:07.991921] cmdQ instance{cmd="B",t_ident="7360",param_cmd="B"} fail! <Test Error>
[2013-06-05 14:50:07.991951] cmdQ instance{cmd="C",t_ident="7360",param_cmd="C"} doing~
[2013-06-05 14:50:07.991977] cmdQ instance{cmd="A",t_ident="2336",param_cmd="A"} done!
[2013-06-05 14:50:07.992006] cmdQ instance{cmd="A",t_ident="2336",param_cmd="A"} doing~
[2013-06-05 14:50:07.992031] cmdQ instance{cmd="A",t_ident="4656",param_cmd="A"} done!
[2013-06-05 14:50:08.991075] cmdQ instance{cmd="C",t_ident="9584",param_cmd="C"} done!
[2013-06-05 14:50:08.993271] cmdQ instance{cmd="A",t_ident="2336",param_cmd="A"} done!
[2013-06-05 14:50:10.993106] cmdQ instance{cmd="C",t_ident="0064",param_cmd="C"} done!
[2013-06-05 14:50:10.993238] cmdQ instance{cmd="C",t_ident="7360",param_cmd="C"} done!

와 같이 나옵니다.

백그라운드워커를 잘 사용하면 성능향상에 많은 도움을 줍니다.
꼭 위 코드의 내용을 이해하셔서 어느 분께는 도움이 되셨기를 바랍니다.


핑백

  • 지훈현서 : [Python] GIL 과 multiprocessing 2014-04-03 11:00:39 #

    ... 터 보시면 정말 딱 보고 알 수 있을 정도로쉽게 설명을 해 주셨습니다. 더불어 파이썬의 장점도 쉽게 설명되어 있습니다. 또한 일전에 살펴본 "Queue를 이용한 쓰레드 풀의 사용방법"에서와 같이 작업을 할 때 문제가 있을 수 있습니다. 따라서 기존의 Thread 작업이 있었다면,이를 거의 동일한 인터페이스의 프로세 ... more

덧글

댓글 입력 영역

구글애드텍스트