저번 포스트에서 책의 7장 내용인 '동시성(concurrency)과 병렬성(parallelism)'에 대해 간략히 개념을 잡았다. 이번에는 thread의 약간 심화 버전으로 정리를 해보려고 한다.
지난 글:
https://hi-lu.tistory.com/entry/파이썬-코딩의-기술-리뷰-동시성과-병렬성-1
파이썬 코딩의 기술 리뷰- 동시성과 병렬성 1
이번에는 책의 7장 내용인 '동시성(concurrency)과 병렬성(parallelism)'에 대해 먼저 정리하겠다. 주로 개념들을 잡고 가는 기록이 될 듯하다. 동시성 , 병렬성 subprocess thread coroutine 개념잡기 - 동시성 &.
hi-lu.tistory.com
0. 간단 용어 설명
- fan in : 프로세스 내 다음 단계로 가기 전, 동시 작업 단위(ex. thread)가 다 끝날 때까지 기다리는 과정
- fan out : 여러 작업 단위가 동시에 실행되게 만드는 과정
1. Lock
스레드에는 GIL(Global Interperter Lock)이라는 전역 인터프리터 락이 있다. 이 락은 스레드들이 병렬적으로 실행되지 않도록 막는 용도로 지난 글에서 설명했다. 여기에 추가로 서로 다른 스레드들이 같은 데이터에도 접근하지 못하도록 막아줄 락이 필요하다.
아래 실험은 간단히 10**5번씩 +=1을 하는 스레드들을 총 5개 실행한 결과다.
### thread 실험
from threading import Thread
threads = []
class Sample:
def __init__(self):
self.sample = 0
def add(self, count):
self.sample += count
def thread_worker(dic, idx):
for i in range(idx):
dic.add(1)
#5개의 스레드를 만들어 Sample.sample에 1을 더하는 thread_worker함수를 돌린다고 가정하자.
sample = Sample()
for i in range(5):
thread = Thread(target=thread_worker, args=(sample, 10**5))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print(sample.sample)# -> 342734. 실제로는 5 * 10**5가 나와야 함
이러한 결과가 나오는 이유는 a번 thread가 +=1을 하고 있는 도중 b번 thread가 실행되도록 중단되는 일이 발생했기 때문이다. 아래와 같이 Sample.sample에 1을 더하는 연산 중 어딘가에서 스레드가 일시 중단된 것이다.
value = getattr(sample, 'sample') #Sample()로 가져온 클래스 오브젝트 sample의 'sample'애트리뷰트에 접근
value += 1
setattr(sample, 'sample', value)
이를 막기 위해서 Lock (mutex)를 사용하는 것이다. 코드를 다시 쓰면 다음과 같다.
### thread 실험2
from threading import Thread, Lock
dic = {'sample':0}
threads = []
class SampleAndLock:
def __init__(self):
self.sample = 0
self.lock = Lock()
def add(self, count):
with self.lock:
self.sample += count
def thread_worker(dic, idx):
for i in range(idx):
dic.add(1)
#5개의 스레드를 만들어 dic['sample']에 1을 더하는 thread_worker함수를 돌린다고 가정하자.
sample = SampleAndLock()
for i in range(5):
thread = Thread(target=thread_worker, args=(sample, 10**5))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print(sample.sample) #-> 5 * 10**5, 즉 500000이 잘 나온다.
cf) 이렇게 스레드와 Lock을 이용해서 풀 수 있는 예시 문제로는 voice recognition을 들 수 있다. 마이크모듈로 음성이 들어오고 있는지 계속 받아와야 하고(thread A) 해당 음성이 끝이 났는지 판별해야 하는(thread B) 경우를 생각할 때, thread A와 B 모두 음성이라는 동일 데이터에 접근하고 있다. 이럴 때 음성 데이터에 Lock이 잘 걸려있어야 함을 알 수 있다.
2. queue
추가적으로, 스레드를 순차적으로 실행하기 위해 가급적 queue를 사용하자. deque를 사용하는것에 비해 다음과 같은 장점이 있다.
- thread 처리를 해야하는 데이터가 비어있을 때는 block을 해준다. (queue.get())
- 버퍼 크기를 정의할 수 있다. 즉, 실행시킬 thread 수를 지정하고 나머지 스레드는 잠시 받지 않을 수 있다. (Queue(max_len))
- 작업 진행을 추적할 수 있다. (queue.task_done())
cf) 파이썬의 리스트는 기본적으로 queue기능을 제공하기는 하지만, 여기서 말하는 queue는 내장 라이브러리를 의미한다.
from thread import Thread
from queue import Queue
from queue import Queue
max_len = 100
my_queue = Queue(max_len) # queue가 100개가 넘으면 put을 블록
def ex_get():
print('get start')
my_queue.get() #먼저 실행되었지만 put()이 들어올 때 까지는 실행 X
print('get done')
print('task')
my_queue.task_done()
print('task done')
def ex_put():
print('put start')
my_queue.put(object())
print('put done')
print('join') #task_done이 호출되기 전까지 끝나지 않게 기다림
my_queue.join()
print('join done')
threads = []
threads.append(Thread(target = ex_get))
threads.append(Thread(target = ex_put))
for thread in threads:
thread.start()
#### 출력값 ####
#get start
#put start
#put done
#join
#get done
#
#task
#task done
#join done
3. ThreadpoolExecutor
I/O 동시성을 위해서는 asyncio를 사용해도 되지만, thread를 사용하는 방법도 존재한다. ThreadpoolExecutor는 exception을 자동으로 전파시켜준다. 간단하게 앞에서 사용한 예제를 이용해서 실험해보자.
from concurrent.futures import ThreadPoolExecutor
MAX_WORKERS=10
class SampleAndLock:
def __init__(self):
self.sample = 0
self.lock = Lock()
def add(self, count):
with self.lock:
if self.sample + count >5:
raise Exception('숫자가 5보다 큼')
self.sample += count
def thread_worker(dic, idx):
for i in range(idx):
dic.add(1)
sample = SampleAndLock()
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool: # 사용가능한 규모를 미리 정해야 함
for i in range(5):
thread = Thread(target=thread_worker, args=(sample, i*2))
thread.start()
####출력####
#Exception: 숫자가 5보다 큼
#Exception: 숫자가 5보다 큼
#Exception: 숫자가 5보다 큼
위 코드의 단점이라고는 max_workers를 사전에 정해서 나중에 스레드 수를 늘릴 수 없다는 점이다. 웬만하면 async를 이용하자.
이번에는 thread에 대해 조금 깊이 들어가 보았다. 실제로 I/O를 생각하는 구현에서는 async를 조금 더 선호하게 되는 듯하다. 다음 포스트에선 async에 대한 심화를 들어가 보겠다.
'python > 파이썬 코딩의 기술' 카테고리의 다른 글
[python] try/ except /finally/else (0) | 2022.04.13 |
---|---|
Python Asyncio (0) | 2022.03.05 |
python3 애트리뷰트, 메타클래스 - 2 (0) | 2022.02.20 |
파이썬 리스트 append 연산 시 arr와 arr[:] 차이점 (0) | 2022.02.12 |
파이썬 코딩의 기술 리뷰 - 메타클래스와 애트리뷰트 1 (0) | 2022.02.01 |