[Python] 비동기 작업큐와 Global 객체에 관한 고찰 Develop Tip

지난번 비동기 작업 큐를 살펴보면서 Celery 대신 간단한 파이썬의 RQ를 살펴본 적이 있습니다.
이것을 사용하다가 이번에 새로운 곳에 진행을 하려다 보니
다음과 같은 경우가 생겼습니다.


G_H = None

def mytask(*args, **kwargs):
    if G_H is None:
        G_H = MyHandle()
    G_H.do()

위와 유사한 경우가 생겼습니다.

즉, G_H가 어떤 글로벌 인스턴스를 갖고
이를 mytask 라는 태스크 수행 함수에서
초기화 된 것을 계속 이용한다는 개념입니다.

Singleton 패턴에서 잘 사용하고는 하지요.
위에서는 글로벌로 해결하려고 합니다만...

위와 같은 접근이 RQ에서는 전혀 동작하지 않습니다.

작업 워커는 위와 같은 코드가 수행될 때마다 fork()해서 
실행하고 종료시키기 때문입니다.

실제 초기화하는데 시간이 많이 걸린다면,
아주 낭패를 볼 만한 상황이 발생합니다.

그래서 다시 샐러리를 이용하여 다각도로 접근해보았습니다.
여러 가지를 해 본 것에 대한 경험을 정리해 보겠습니다.

우선 기본 샐러리 환경을 구축해 봅니다.
방법은 샐러리 매뉴얼을 참조했습니다.

우선 PATHONPATH에 추가된 곳에

proj 라는 파이썬 패키지를 만듦니다.
어려운 것은 아니구요, 

$ export PYTHONPATH=$HOME/work
라고 되어 있으면

$ cd $HOME/work
$ mkdir proj
$ cd proj
$ touch __init__.py

라는 식으로 하시면 됩니다.

그리고 그 안에 우선 tasks_app.py 라는 파일을 만들어
아래와 같은 내용을 넣습니다.

################################################################################
from __future__ import absolute_import, unicode_literals
from celery import Celery

################################################################################
app = Celery('proj',
             broker='redis://',
             backend='redis://',
             include=['proj.tasks'])

################################################################################
# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

################################################################################
if __name__ == '__main__':
    app.start()

디폴트 큐 대신 redis를 이용하여 브로커 및 결과 BackEnd로 이용하였습니다.
셀러리로 구동시킬 app 을 정의하는 부분입니다.

그 다음에는 실제 작업을 진행할 task를 정의해 보겠습니다.

우선 tasks_global.py 라는 이름으로 다음의 내용을 넣어줍니다.

################################################################################
from __future__ import absolute_import, unicode_literals
from proj.tasks_app import app
import os
import datetime

################################################################################
G_HANDLE = None


################################################################################
@app.task
def add(x, y):
    global G_HANDLE
    if G_HANDLE is None:
        G_HANDLE = datetime.datetime.now()
    print("[%s]<<<add>>:G_HANDLE<%s>" % (os.getpid(), G_HANDLE))
    return x + y


################################################################################
@app.task
def mul(x, y):
    global G_HANDLE
    if G_HANDLE is None:
        G_HANDLE = datetime.datetime.now()
    print("[%s]<<<mul>>:G_HANDLE<%s>" % (os.getpid(), G_HANDLE))
    return x * y


################################################################################
@app.task
def xsum(numbers):
    global G_HANDLE
    if G_HANDLE is None:
        G_HANDLE = datetime.datetime.now()
    print("[%s]<<<xsum>>:G_HANDLE<%s>" % (os.getpid(), G_HANDLE))
    return sum(numbers)

테스트로 add, mul, xsum 함수를 만들어 놓았습니다.
여기서 G_HANDLE은 그냥 datetime 클래스 인스턴스로 동일 객체가
사용되는지 확인하는 것입니다.

위와 같은 상황에서 

$ cd $HOME/work/proj
$ ln -s tasks_global.py tasks.py

라고 tasks.py 라고 지정하고 다음과 celery를 돌려 서비스를 동작시킵니다.

$ celery -A proj.tasks_app worker --concurrency=4  --logleve info

--concurrency는 4개의 프로세스로 워커를 돌린다는 개념입니다.
(생략하면 디폴트로 CPU 코어 개수만큼 돌립니다)

그러면, 다음과 같

결과가 잘 실행되었다고 메시지와 함게 볼 수 있습니다.
실제 4개의 워크가 실행되고 있는 상태입니다.

이제 호출해보는 코드를 작성해 봅니다.

call.py 라는 파일에 다음의 내용을 넣습니다.

from __future__ import absolute_import, unicode_literals
from proj.tasks import add, mul, xsum

ar = add.delay(4, 4)
print(ar.successful())
r = ar.get()
print("add r = %s" % r)

ar = mul.delay(4, 4)
print(ar.successful())
r = ar.get()
print("mul r = %s" % r)

ar = xsum.delay((4, 4, 4))
print(ar.successful())
r = ar.get()
print("xsum r = %s" % r)

ar = add.delay(4, 4)
print(ar.successful())
r = ar.get()
print("add r = %s" % r)

$ python call.py 
를 실행하고 이전 celery 를 돌렸던 창에서 확인해 보면,

두 개의 워커가 대답을 하지만 Global 객체 G_HANDLE<...> 내용을 살펴보면 두개의 워크가 대답을 했습니다.
[...] PID를 확인해도 알 수 있습니다.


만약 개별 Global 로 동작하고 worker 마다 singleton이 필요없다면 위와 같이 동작해도 무방할 것입니다만,
만약 singleton 개념으로 동작해야 한다면 (즉 worker의 task에서 critical section이 있어 한 시간에 오직 하나의 
worker만 돌아야 한다면) 

다음과 같이 워커를 한개 동작시키는 방법이 존재하거나,

$ celery -A proj.tasks_app worker --concurrency=1  --logleve info

라고 실행시키고,

$ python call.py 
를 실행하면,

위에서는 단일 워커가 하나의 글로벌 인스턴스로 동작합니다.
(뭐 당연한 것이겠지요)

위와 같이 global을 이용하기는 하는데 초기화를 다른 방식으로 설정할 수 있습니다.

tasks_signals.py 라는 파일을 만들어 아래의 내용을 넣습니다.

################################################################################
from __future__ import absolute_import, unicode_literals
from proj.tasks_app import app
from celery.signals import worker_process_init, worker_process_shutdown
import os
import datetime

################################################################################
G_HANDLE = None


################################################################################
@worker_process_init.connect
def init_worker(**kwargs):
    global G_HANDLE
    G_HANDLE = datetime.datetime.now()
    print('[%s]Initializing G_HANDLE<%s> for worker.<kwargs=%s>'
          % (os.getpid(), G_HANDLE, kwargs))


################################################################################
@worker_process_shutdown.connect
def shutdown_worker(**kwargs):
    global G_HANDLE
    if G_HANDLE is not None:
        G_HANDLE = None
        print('[%s]Closing G_HANDLE for worker.<kwargs=%s>'
              % (os.getpid(), kwargs))


################################################################################
@app.task
def add(x, y):
    print("[%s]<<<add>>:G_HANDLE<%s>" % (os.getpid(), G_HANDLE))
    return x + y


################################################################################
@app.task
def mul(x, y):
    print("[%s]<<<mul>>:G_HANDLE<%s>" % (os.getpid(), G_HANDLE))
    return x * y


################################################################################
@app.task
def xsum(numbers):
    print("[%s]<<<xsum>>:G_HANDLE<%s>" % (os.getpid(), G_HANDLE))
    return sum(numbers)


@worker_process_init.connect 에서 worker Process가 실행될 때 동작할 내용
(global G_HANDLE 초기화) 및
@worker_process_shutdown.connect 에서 종료되면서 close 될 작업을 진행하면 됩니다.

데이터베이스 풀 등의 이용할 때 연결해 놓고 사용하면 되겠군요.

이제 

$ cd $HOME/work/proj
$ ln -s tasks_signals.py tasks.py

라고 tasks.py 라고 지정하고 다음과 celery를 돌려 서비스를 동작시킵니다.

$ celery -A proj.tasks_app worker --concurrency=4  --logleve info

라고 동작시키면,


위와 같이 worker가 초기화 될 때 생성되는 것을 확인할 수 있고,

역시 그 결과도 global을 이용할 때와 동일합니다.


이제는 동일한 것을 다른 방식으로 사용해 보겠습니다.

tasks_class.py 를 만들고 아래의 내용을 넣습니다.

################################################################################
from __future__ import absolute_import, unicode_literals
from proj.tasks_app import app
import os
import datetime

from celery import Task


################################################################################
# noinspection PyAbstractClass
class MyTask(Task):
    _gv = None
    @property
    def gv(self):
        if self._gv is None:
            self._gv = datetime.datetime.now()
            print("new self._gv=%s" % self._gv)
        return self._gv


################################################################################
@app.task(base=MyTask)
def add(x, y):
    print("[%s]<<<add>>:gv<%s>" % (os.getpid(), add.gv))
    return x + y


################################################################################
@app.task(base=MyTask)
def mul(x, y):
    print("[%s]<<<mul>>:gv<%s>" % (os.getpid(), mul.gv))
    return x * y


################################################################################
@app.task(base=MyTask)
def xsum(numbers):
    print("[%s]<<<xsum>>:gv<%s>" % (os.getpid(), xsum.gv))
    return sum(numbers)


MyTask라는 클래스를 베이스 클래스로 하여 호출되는데 gv 라는 프라퍼티를 호출하면
해당 _gv 라는 것을 클래스 변수로 이용하여 global 처럼 동작합니다.

재미있는 것은 해당 함수명.gv (Property) 를 호출할 수 있다는 점이군요.
셀러리 매뉴얼에서 Task Instanctiation으로 해당 설명이 나옵니다.

위와 같은 상황에서 결국 여러 worker를 돌린다고 하면서도
singleton이 필요하다면

celery-singleton 을 이용하는 것도 한 방법이 되겠습니다.

프로그램을 하면 할 수록 느끼는 것이지만,
특정 상황에 맞게 처음부터 모든 것을 고려하면서 작성할 수 없습니다.

하지만 상황 상황 마다 해결 방법을 빠르게 모색하여
해결책을 적용하는 능력은 어쩌면 최초 동작하는 것을 보여주는 것
이상으로 중요한 프로그래머의 능력이라 생각되네요...


어느 분께는 도움이 되시기를...



덧글

댓글 입력 영역

구글애드텍스트