"프로그래밍 언어의 개념과 흐름에 대한 고찰 - 파이썬답게 코딩하기" 학습중...


multiprocessing 기법에 대해 어렴풋이 알거나 몰랐던 내용들 몇가지 정리해봅니다.



## multiprocessing의 로그
# import logging
# import multiprocessing
#
# def worker(count) :
# print("count : %s" % count)
#
# def main() :
# multiprocessing.log_to_stderr() ## log_to_stderr() [%(levelname)s\%(processName)s]\ %(message)s 형식으로 로그를
## 만들어서 sys.stderr에 전달하는 역할.
# logger = multiprocessing.get_logger()
# logger.setLevel(logging.DEBUG)
# for i in range(5) :
# t = multiprocessing.Process(target = worker, args = (i,))
# t.start()
#
# if __name__ =="__main__" :
# main()



(실행 결과)


# [INFO/MainProcess] process shutting down
# [DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
# [INFO/MainProcess] calling join() for process Process-2
# [INFO/Process-4] child process calling self.run()
# [INFO/Process-2] child process calling self.run()
# count : 1
# [INFO/Process-2] process shutting down
# [DEBUG/Process-2] running all "atexit" finalizers with priority >= 0
# [DEBUG/Process-2] running the remaining "atexit" finalizers
# [INFO/Process-2] process exiting with exitcode 0
# count : 3
# [INFO/Process-4] process shutting down
# [DEBUG/Process-4] running all "atexit" finalizers with priority >= 0
# [DEBUG/Process-4] running the remaining "atexit" finalizers
# [INFO/Process-4] process exiting with exitcode 0
# [INFO/Process-3] child process calling self.run()
# count : 2
# [INFO/Process-3] process shutting down
# [DEBUG/Process-3] running all "atexit" finalizers with priority >= 0
# [DEBUG/Process-3] running the remaining "atexit" finalizers
# [INFO/Process-5] child process calling self.run()
# count : 4
# [INFO/Process-5] process shutting down
# [DEBUG/Process-5] running all "atexit" finalizers with priority >= 0
# [DEBUG/Process-5] running the remaining "atexit" finalizers
# [INFO/Process-5] process exiting with exitcode 0
# [INFO/Process-3] process exiting with exitcode 0
# [INFO/MainProcess] calling join() for process Process-4
# [INFO/Process-1] child process calling self.run()
# count : 0
# [INFO/Process-1] process shutting down
# [DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
# [DEBUG/Process-1] running the remaining "atexit" finalizers
# [INFO/Process-1] process exiting with exitcode 0
# [INFO/MainProcess] calling join() for process Process-5
# [INFO/MainProcess] calling join() for process Process-3
# [INFO/MainProcess] calling join() for process Process-1
# [DEBUG/MainProcess] running the remaining "atexit" finalizers




## Daemon Process
## 스레드와 동일하게 프로세스도 자식 프로세스가 종료되지 않으면 메인 프로그램도 종료되지 않는다. 그래서 프로세스도 데몬으로 띄워
## 메인의 종료에 영향을 주지 않으면서 처리할 수 있다.
# import time
# import multiprocessing
#
# def daemon() :
# print("start")
# time.sleep(5)
# print("exit")
#
# def main() :
# d = multiprocessing.Process(target = daemon, name = "daemon")
# d.daemon = True
# d.start()
# d.join()
#
# if __name__ == "__main__" :
# main()



(실행 결과)


# start
# exit

## process.daemon = True로 설정할 경우, parent process가 종료될 때, child process도 같이 종료된다.
## process.daemon = False인 경우, parent process가 종료되어도 자동으로 child process가 종료되지 않는다.
## parent process는 자신의 process가 마지막으로 exit되기 전에, child process join을 하면서 child process의 종료를 기다린다.
## 또한 daemon으로 생성된 child process는 스스로 child process(증손자)를 가질 수 없다. 이는 parent가 종료될 때 자동으로
## daemonic child process가 종료되면서, 이 녀석이 생성한 증손자 process orphaned process로 되는 것을 방지하기 위함이다.
## process.join() 을 실행하면, 현재 process가 해당 process의 종료를 blocking하면서 waiting한다.




## Process Exit
## 프로세스만의 특징, 스레드의 경우 프로세스 내에서 자식으로 띄운 스레드를 종료할 수 있는 방법이 없었다. 그러나 프로세스는
## 자식을 강제로 종료시키고, 상태도 확인하고, 프로세스 수행 결과를 반환 받을 수 있다.
# import sys
# import time
# import multiprocessing
#
# def good_job() :
# p = multiprocessing.current_process()
# print("start name : %s, pid : %s" % (p.name, p.pid))
# time.sleep(5)
# print("exit name : %s, pid : %s" % (p.name, p.pid))
# return 0
#
# def fail_job() :
# p = multiprocessing.current_process()
# print("start name : %s, pid : %s" % (p.name, p.pid))
# time.sleep(5)
# print("exit name : %s, pid : %s" % (p.name, p.pid))
# return 0
#
# def kill_job() :
# p = multiprocessing.current_process()
# print("start name : %s, pid : %s" % (p.name, p.pid))
# time.sleep(10)
# print("exit name : %s, pid : %s" % (p.name, p.pid))
# return 0
#
# def main() :
# process_list = []
# for func in [good_job, fail_job, kill_job] :
# p = multiprocessing.Process(name = func.__name__, target = func)
# process_list.append(p)
# print("process check : %s, %s" % (p, p.is_alive()))
# p.start()
# time.sleep(0.3)
# for p in process_list :
# print("process check : %s, %s" % (p, p.is_alive()))
# time.sleep(6)
# for p in process_list :
# print("process check : %s, %s" % (p, p.is_alive()))
# if p.is_alive() :
# print("termination process : %s" % p)
# p.terminate()
# for p in process_list :
# print("process check : %s exit code : %s" % (p, p.exitcode))
#
# if __name__ == "__main__" :
# main()



(실행 결과)


# process check : <Process(good_job, initial)>, False
# start name : good_job, pid : 14484
# process check : <Process(fail_job, initial)>, False
# start name : fail_job, pid : 15756
# process check : <Process(kill_job, initial)>, False
# start name : kill_job, pid : 17868
# process check : <Process(good_job, started)>, True
# process check : <Process(fail_job, started)>, True
# process check : <Process(kill_job, started)>, True
# exit name : good_job, pid : 14484
# exit name : fail_job, pid : 15756
# process check : <Process(good_job, stopped)>, False
# process check : <Process(fail_job, stopped)>, False
# process check : <Process(kill_job, started)>, True
# termination process : <Process(kill_job, started)>
# process check : <Process(good_job, stopped)> exit code : 0
# process check : <Process(fail_job, stopped)> exit code : 0
# process check : <Process(kill_job, started)> exit code : None




## Process Event
## 스레드와 마찬가지로 프로세스도 서로 통신할 수 있다. 이벤트는 멀티프로세스만의 다른 방법이 있으므로 권장되는 사항은 아니다.
# import time
# import multiprocessing
#
# def first_wait(e1, e2) :
# p = multiprocessing.current_process()
# while not e1.is_set() :
# event = e1.wait(1)
# print("[%s] event status : (%s)" % (p.name, event))
# if event :
# print("[%s] e1 is set" % p.name)
# time.sleep(3)
# print("[%s] set e2" % p.name)
# e2.set()
#
# def second_wait(e2) :
# p = multiprocessing.current_process()
# while not e2.is_set() :
# event = e2.wait(1)
# print("[%s] event status : (%s)" % (p.name, event))
# if event :
# print("[%s] e2 is set" % p.name)
#
# def main() :
# e1 = multiprocessing.Event()
# e2 = multiprocessing.Event()
# p1 = multiprocessing.Process(name = "first", target = first_wait, args = (e1, e2))
# p1.start()
# p2 = multiprocessing.Process(name = "second", target = second_wait, args = (e2,))
# p2.start()
# print("wait...")
# time.sleep(5)
# print("set e1")
# e1.set()
# time.sleep(5)
# print("exit")
#
# if __name__ == "__main__" :
# main()



(실행 결과)


# wait...
# [first] event status : (False)
# [second] event status : (False)
# [first] event status : (False)
# [second] event status : (False)
# [first] event status : (False)
# [second] event status : (False)
# [first] event status : (False)
# [second] event status : (False)
# set e1
# [first] event status : (True)
# [first] e1 is set
# [second] event status : (False)
# [second] event status : (False)
# [second] event status : (False)
# [first] set e2
# [second] event status : (True)
# [second] e2 is set
# exit




## Process Communication
## multiprocessing에는 프로세스간의 통신기능으로 queue pipe 기능을 지원한다.
## 프로세스간의 간단한 데이터 교환은 queue를 많이 사용.
# import time
# import multiprocessing
#
# def set_data(q) :
# p = multiprocessing.current_process()
# msg = "hello world"
# q.put(msg)
# print("[%s] set queue data : %s" % (p.name, msg))
#
# def get_data(q) :
# time.sleep(1)
# p = multiprocessing.current_process()
# print("[%s] get queue data : %s" % (p.name, q.get()))
#
# def main() :
# queue = multiprocessing.Queue()
# p1 = multiprocessing.Process(name = "set_data", target = set_data, args = (queue,))
# p1.start()
# p2 = multiprocessing.Process(name = "get_data", target = get_data, args = (queue,))
# p2.start()
# p1.join()
# p2.join()
#
# if __name__ == "__main__" :
# main()



(실행 결과)


# [set_data] set queue data : hello world
# [get_data] get queue data : hello world




## pipe 방식은 기본적으로 1:1 통신 방식임. 아래의 예제로 확인
# import multiprocessing
#
# def child(pipe) :
# p = multiprocessing.current_process()
# msg = "hello world"
# pipe.send(msg)
# print("[%s] send a message to pipe %s:" % (p.name, msg))
#
# def main() :
# parent_pipe, child_pipe = multiprocessing.Pipe()
# p = multiprocessing.Process(name = "child", target = child, args = (child_pipe,))
# p.start()
# print("received message : %s" % parent_pipe.recv())
# p.join()
#
# if __name__ == "__main__" :
# main()



(실행 결과)


# [child] send a message to pipe hello world:
# received message : hello world



+ Recent posts