[Python] 스케줄러 (at, interval, cron) 작업에 대한 고찰 Develop Tip

프로그램 작업을 하다보면 다음과 같이 호스트 시스템에서 동작하는
시간 관련 스케줄러 작업이 필요할 경우가 있습니다.

at : 특정 지정 시간에 작업을 수행함
interval : 주기적으로 주어진 초/분/시간 등 만큼 매번 수행되는 작업
cron : 특정 정해진 요일, 날짜, 시간, 분, 초 등의 작업
(주의! 현재 cron 작업에서는 분단위가 가장 작은 시간 단위입니다)

이렇게 시간 관련된 스케줄러 작업을 진행하는 여러가지 파이썬 방법이 있습니다.

물론 파이썬에서 분산 작업 큐 역할을 담당하는 celery와 같은 프레임워크나,
비동기-IO를 이용한 이벤트 방식의 네트워크 엔진 프레임워크인 twisted 도 있습니다만,
한 호스트 안에서 위와 같은 스케줄러로만 이용하기에는 너무 무겁거나 배보다 배꼽이
더 커지는 경향이 발생하기도 합니다.

따라서 살펴본 것 중에서는, Advanced Python Scheduler 라는 것이 있었습니다.

설치는,
$ sudo apt-get install python-pip
$ sudo pip install apscheduler
로 간단히 설치할 수 있습니다.

자세한 사항은 직접 내용을 읽어보시면 될 것 같고,
이것을 간단한 예제로 클래스화 하여 보았습니다.

다음을 확인해 보시면,

와 같이 작성할 수 있습니다. schedule_job.py (해당 파일)

일단 간단히 살펴봅니다.

import apscheduler.scheduler
from apscheduler.scheduler import Scheduler
###############################################################################
logger = logging.getLogger('test_sh01')
logger.setLevel(logging.DEBUG)
LOGFILE = './foo.log'
LOGSIZE = 1024*100
LOGBACKUP_COUNT = 5
if not logger.handlers:
    loghandler = logging.handlers.RotatingFileHandler(LOGFILE,
                maxBytes=LOGSIZE, backupCount=LOGBACKUP_COUNT)
    formatter = logging.Formatter('%(asctime)s-%(name)s-%(levelname)s-%(message)s')
    loghandler.setFormatter(formatter)
    logger.addHandler(loghandler)
apscheduler.scheduler.logger = logger

위 부분은 생략 가능합니다만,
로거를 지정하고 해당 로거를 스케줄러에 할당하는 부분입니다.
위에 샘플은 현재 폴더에 foo.log 이름으로 100K씩 저장을 하다가,
100K가 넘어가면 5개까지 .1, .2 등이 붙으며 이전 로그를 logrotate 식으로
저장하는 방법입니다.

마지막 줄이 스케줄러의 로거를 자신의 로거로 할당하는 문장입니다.

###############################################################################
class ScheduleJob(object):
    def __init__(self):
        self.sched = Scheduler()
        self.sched.start()
        self.job_count = self.cron_count = 0
        self.sched.add_interval_job(self.job_function, seconds=1)
        self.sched.add_cron_job(self.cron_function, day_of_week='mon-fri', second='*/3')
    def __del__(self):
        self.shutdown()
    def shutdown(self):
        self.sched.shutdown()
    def job_function(self):
        self.job_count += 1
        sleep_period = random.randint(5,15) / 10.0
        t_ident = str(thread.get_ident())[-4:]
        print "[%s:%s:%s] ScheduleJob.job_function: and will sleep(%s)" % (
            self.job_count, t_ident, datetime.now(), sleep_period)
        time.sleep(sleep_period)
    def cron_function(self):
        self.cron_count += 1
        sleep_period = random.randint(25,35) / 10.0
        t_ident = str(thread.get_ident())[-4:]
        print ">>>[%s:%s:%s] ScheduleJob.cron_function: and will sleep(%s)" % (
            self.cron_count, t_ident, datetime.now(), sleep_period)
        time.sleep(sleep_period)

위와 같이 ScheduleJob 이라는 간단한 클래스를 하나 만들었습니다.

기능은 class 인스턴스를 만들 때, 스케줄러를 만들어 시작하고,
멤머 함수인 job_function을 1초마다의 인터벌 작업으로, 또한 cron_function 멤머함수를
월~금 사이에 3초마다 작업을 수행하도록 지정하였습니다.

###############################################################################
if __name__=='__main__':
    t_ident = str(thread.get_ident())[-4:]
    print "[%s:%s] main starting..." % (t_ident, datetime.now())
    sj = ScheduleJob()
    for _ in xrange(30): time.sleep(1)
    sj.shutdown()
    print "[%s:%s] main ended." % (t_ident, datetime.now())

그러면 상단과 같이 간단한 메인을 만들어 테스트 해 보겠습니다.

$ python schedule_job.py 
[9120:2013-06-17 12:21:18.273737] main starting...
[1:3968:2013-06-17 12:21:19.276465] ScheduleJob.job_function: and will sleep(1.0)
>>>[1:8560:2013-06-17 12:21:21.003058] ScheduleJob.cron_function: and will sleep(2.7)
[2:5856:2013-06-17 12:21:21.278811] ScheduleJob.job_function: and will sleep(1.3)
[3:1264:2013-06-17 12:21:23.277466] ScheduleJob.job_function: and will sleep(1.4)
>>>[2:5856:2013-06-17 12:21:24.002717] ScheduleJob.cron_function: and will sleep(3.0)
[4:8560:2013-06-17 12:21:25.277048] ScheduleJob.job_function: and will sleep(1.0)
[5:4384:2013-06-17 12:21:27.278801] ScheduleJob.job_function: and will sleep(1.1)
[6:1264:2013-06-17 12:21:29.277231] ScheduleJob.job_function: and will sleep(1.5)
>>>[3:4384:2013-06-17 12:21:30.002517] ScheduleJob.cron_function: and will sleep(2.5)
[7:3968:2013-06-17 12:21:31.277416] ScheduleJob.job_function: and will sleep(0.5)
[8:1264:2013-06-17 12:21:32.277334] ScheduleJob.job_function: and will sleep(1.4)
>>>[4:3968:2013-06-17 12:21:33.002146] ScheduleJob.cron_function: and will sleep(3.4)
[9:4384:2013-06-17 12:21:34.277400] ScheduleJob.job_function: and will sleep(1.3)
[10:1264:2013-06-17 12:21:36.277566] ScheduleJob.job_function: and will sleep(0.8)
[11:5856:2013-06-17 12:21:37.277328] ScheduleJob.job_function: and will sleep(1.0)
>>>[5:3968:2013-06-17 12:21:39.001857] ScheduleJob.cron_function: and will sleep(3.0)
...

등과 같이 1초마다 나오는 job_function화면 출력과 3초마다 나오는 cron_function 화면 출력이 있습니다.

여기서 주의할 사항은 다음과 같이 두 가지 입니다.

자동으로 쓰레드 풀이 생성되어 각각의 함수 호출은 별도의 쓰레드 풀에서 호출되어 별개의 쓰레드로
구동되어 집니다.

만약 주어진 인터벌이나 크론작업이 해당 주기 보다 더 크다면, 즉 다음 실행될 시간을 너머서 이전 작업이
종료되지 않았다면 그 해당 타임의 작업은 건너 뛰게 됩니다.

위의 예제에서는 30초를 돌다가, 해당 SchedulerJob 의 shutdown 을 호출하였고,
그러면 해당 스케줄러의 shutdown() 이 호출되며 모든 스케줄 작업이 종료되게 됩니다.

아주 간단한 예제 이지만 다른 서버 작업 등과 더불어 아주 유용하게 사용될 수 있는 작업입니다.

참고로, foo.log 내용은 다음과 같이,

2013-06-17 12:21:48,002 - test_sh01 - DEBUG - Looking for jobs to run
2013-06-17 12:21:48,003 - test_sh01 - WARNING - Execution of job "ScheduleJob.cron_function (trigger: cron[day_of_week='mon-fri', second='*/3'], next run at: 2013-06-17 12:21:48)" skipped: maximum number of running instances reached (1)
2013-06-17 12:21:48,004 - test_sh01 - DEBUG - Next wakeup is due at 2013-06-17 12:21:48.274876 (in 0.272255 seconds)
2013-06-17 12:21:48,207 - test_sh01 - INFO - Job "ScheduleJob.cron_function (trigger: cron[day_of_week='mon-fri', second='*/3'], next run at: 2013-06-17 12:21:51)" executed successfully
2013-06-17 12:21:48,277 - test_sh01 - DEBUG - Looking for jobs to run
2013-06-17 12:21:48,277 - test_sh01 - DEBUG - Next wakeup is due at 2013-06-17 12:21:49.274876 (in 0.997337 seconds)
2013-06-17 12:21:48,278 - test_sh01 - WARNING - Execution of job "ScheduleJob.job_function (trigger: interval[0:00:01], next run at: 2013-06-17 12:21:49.274876)" skipped: maximum number of running instances reached (1)
2013-06-17 12:21:48,343 - test_sh01 - INFO - Scheduler has been shut down
2013-06-17 12:21:48,579 - test_sh01 - INFO - Job "ScheduleJob.job_function (trigger: interval[0:00:01], next run at: 2013-06-17 12:21:49.274876)" executed successfully

과 같이 나오는 것을 확인 할 수 있습니다.

패키지 소개 글에서와 같이,

- 다른 큰 프레임워크와 같은 의존도가 없습니다.
- 쓰레드에 안전한 API 입니다.
- CPython 2.4~2.7, 3.1~3.2, Jython 2.5.2, PyPy 1.4.1~1.5 등과 같은 다양한 파이썬 인터프리터에서 테스트 완료
- 스케줄링 작업 트리거 방법:
  - Cron 과 같은 스케줄링 (초 단위까지)
  - Unix의 at 명령과 같은 특정 시각의 작업 수행
  - 인터벌 시간을 둔 주기적인 작업 수행

등과 같은 장점이 있습니다.
스케줄링의 작업이 메모리 뿐만 아니라,
파일을 이용하는 shelve나 SQLAlchemy와 같은 ORM 혹은 MongoDB 등과 같은
저장소에 작업 스케줄링을 넣고 작업을 할 수 있습니다.
(Out of this blog's scope...)

어느분께는 도움이 되셨기를...

핑백

  • 한글이 잘 보이는지..? | parannamu 2015-05-21 05:31:02 #

    ... 030528 Pycharm을 이용한 원격디버깅 : http://mcchae.egloos.com/11065987 스케줄러 작업에 대한 고찰 : http://mcchae.egloos.com/11030317 pydev +virtualenv + autoenv : https://dobest.io/how-to-set-python-dev-env/ ... more

  • 지훈현서 : [Python] APScheduler 2.x 대신 3.x로 이용 하기 2016-08-22 19:42:45 #

    ... 지난번 파이썬 스케줄러에 대한 고찰에서 APScheduler를 살펴본 적이 있습니다.그런데 그 당시 이용했던 2.1.2 버전을 이용하다가최근에 다시 $ pip install ... more

덧글

  • 펭귄 2013/12/11 11:01 # 삭제 답글

    스케쥴링 프로그램이 정말 필요했는데 어떻게 짤지 고민하다가 덕분에 좋은거 배워갑니다 ㅋㅋ
  • 지훈현서아빠 2013/12/11 13:30 #

    도움이 되셨다니 저의 보람입니다~ ^^
댓글 입력 영역

구글애드텍스트