지난 포스트에서는 동시성과 병렬성에 대해 아래와 같이 다루어봤다.
- 동시성 병렬성의 개념, 스레드와 코루틴의 개념
- 스레드 심화. 스레드의 lock, queue를 이용하는 방법
이번에는 책 '파이썬 코딩의 기술'의 7장, 파이썬 공식문서 내용을 포함해서 asyncio에 대해 비교적 자세히 다뤄보겠다. 책 리뷰보단 공식문서를 더 차용한 거 같아 이번 시리즈 제목은 간단하게 asyncio로 간다.
파이썬 코루틴 공식문서 : https://docs.python.org/ko/3/library/asyncio-task.html
지난글 1: https://hi-lu.tistory.com/entry/파이썬-코딩의-기술-리뷰-동시성과-병렬성-1
지난 글 2: https://hi-lu.tistory.com/entry/Thread-파이썬-코딩의-기술-리뷰동시성과-병렬성-2
0. 기본 asyncio 기능
asyncio 가 제공하는 함수에 대해서 자세히 알아보도록 하겠다.
- async : 코루틴을 만드는 방법. 함수 선언 def 이전에 붙여서 실행한다.
- await : 코루틴이 중단되는 곳. await구문에서 코루틴이 일시 중단되고, awaitable을 해결한 다음 async 함수로 실행을 재개한다. 코루틴 인스턴스를 반환한다. (제너레이터 yield와 비슷함)
- awaitable : 함수는 아니지만 await 표현식에서 사용될 수 있는 객체를 뜻한다.
- gather : async로 정의된 코루틴들을 실행하게끔 한다.
- run : 코루틴을 실행, 결과를 반환한다. 새로운 event loop를 만들고 끝에는 이 event loop을 닫는다.
여기까지를 파이썬 공식문서의 예시 코드를 살짝 변형해서 확인해보자.
import asyncio
async def sleep_fcn(idx): #async로 코루틴 함수를 정의했다.
print(f'start {idx}')
await asyncio.sleep(1) # sleep_fcn()코루틴은 이 await구문에서 일시 중단된다.
print(f'end {idx}')
async def sample1(): #async로 코루틴 함수를 정의했다.
tasks = []
for i in range(5):
task = sleep_fcn(i) #위에서 정의한 sleep_fcn 코루틴함수를 받아오고 있다. 이때 task에 저장되는 값은 await에 사용할 수 있는 코루틴 인스턴스다.
tasks.append(task)
await asyncio.gather(*tasks) # gather로 받아온 태스크들을 await구문으로 실행할 수 있다.
위 코드를 실행한다면 아래와 같다. 참고로 이번 코드들을 실습할 때 나처럼 jupyter notebook에서 실행한다면 asyncio.run으로 실행했을 때 runtime error가 날 것이다. jupyter notebook이 이미 event loop으로 동일 스레드에서 돌아가고 있기 때문이다.
import time
start_time = time.time()
#asyncio.run(sample1()) # 그냥 파이썬 파일로 실행시킨다면 이걸 쓰면 됨
await sample1() # jupyter notebook의 경우는 이미 주피터가 event loop에서 실행되고 있기 때문에, await으로 실행해주자.
print(time.time() - start_time)
####결과#####
#start 0
#start 1
#start 2
#start 3
#start 4
#end 0
#end 1
#end 2
#end 3
#end 4
#1.006260871887207
출력 값을 확인하면 코루틴의 동시성이 잘 활용되고 있음을 알 수 있다.
1. asyncio 기능 2
- wait : awaitable 객체를 동시에 실행하고 resturn_when 인자의 조건을 달성할 때까지 블록한다.
- get_event_loop : 현 event loop을 가져온다.
- set_event_loop : 현 loop을 event loop으로 지정한다.
- new_event_loop : 새로운 event loop 객체를 반환한다.
- run_until_complete : 인자 loop가 끝날 때까지 실행한다. 인자가 corutine객체일 경우 asyncio.Task로 실행되게끔 예약한다.
- Task : 코루틴 실행이 가능한 객체로, 이벤트 루프는 한 번에 하나의 Task를 실행한다.
- asyncio.create_task : 고수준 api. Task를 만든다.
- loop.create_task : 저수준 api. task를 만들고, 코루틴의 실행을 예약한다.
이벤트 루프로 코루틴의 비동기 태스크, 콜백, I/O연산, subprocess 관리 등을 할 수 있다. 위의 기능보단 저수준 api이다. 예시 코드를 task와 event_loop 두 가지로 작성해 보자. (실행하실 분들은 주피터 말고 cmd에서 python 파일로 실행할 것!)
def sample_loop1(): #저수준 api
loop = asyncio.new_event_loop() # 새 event loop을 만들었다.
asyncio.set_event_loop(loop) # loop을 현 event loop으로 지정한다.
tasks = []
for i in range(5):
task = sleep_fcn(i)
tasks.append(task)
loop.run_until_complete(asyncio.gather(*tasks))
loop.close() #loop가 끝나면 닫아준다.
def sample_loop2(): # 고수준 api인 asyncio.create_task를 이용해서 실행할 수도 있다.
for i in range(5):
asyncio.create_task(sleep_fcn(i))
sample_loop1() #이 두 함수의 실행 결과는 (당연하게도) 같다.
sample_loop2()
확실히 sample_loop1보다 sample_loop2가 코드가 짧고 좋아보일 수 있다. 하지만 저수준 api를 사용해야 할 순간이 분명 온다. 많은 데이터 파일을 한 번에 비동기로 받아오고 싶을 때와 같이 I/O 연산이 필요할 때, callback을 받아야 할 때 등 에서 event_loop을 사용하게 된다.
2. asyncio 기능 3
- Lock : 태스크를 위한 mutex lock을 걸어주며, 주로 async with와 함께 사용한다. 공유 자원에 대한 독점 액세스를 보장할 때 사용한다. (== thread safe X. cf.) 스레드는 공유하는 데이터에 동시 접근을 막기 위해 lock을 걸어줬다. asyncio.Lock을 걸어주면 다른 스레드들이 접근할 때 안정성을 보장할 수 없게 된다.)
- Semaphore : 마찬가지로 스레드 안정성이 없다. 호출될 때마다 내부 카운터가 하나씩 감소하는 형태로, 0이 된다면 block 한다.
위 2개 개념을 사용해 새로운 예제를 작성해보자. 아래 sample_loop3은 lock을 걸었기 때문에 한 코루틴이 자원을 독점하여 이전처럼 1.x초만에 실행되지 않는다. 실행 순서도 한 coroutine이 끝난 후에 다른 코루틴이 실행되는 것을 알 수 있다.
#lock을 사용하는 예제
async def sample_async(lock, semaphore, idx):
result = 0
async with lock, semaphore: #여기의 semaphore는 await문을 만나기 전에 semaphore.acquire()를, 그 후에는 semapohre.release()를 한다.
print(lock)
await sleep_fcn(idx)
return
def sample_loop3():
loop = asyncio.new_event_loop() # 새 event loop을 만들었다.
asyncio.set_event_loop(loop) # loop을 현 event loop으로 지정한다.
lock = asyncio.Lock() # asyncio를 위한 mutex lock을 걸어준다.
semaphore = asyncio.Semaphore(value=1) #1번 호출할 수 있는 semaphore 생성
tasks = []
for i in range(5):
task = sample_async(lock, semaphore, i)
tasks.append(task)
loop.run_until_complete(asyncio.gather(*tasks))
loop.close() #loop가 끝나면 닫아준다.
st = time.time()
sample_loop3()
print(time.time() - st)
#### 결과
#start 0
#end 0
#<asyncio.locks.Lock object at 0x7fad60209eb0 [locked, waiters:3]>
#start 1
#end 1
#<asyncio.locks.Lock object at 0x7fad60209eb0 [locked, waiters:2]>
#start 2
#end 2
#<asyncio.locks.Lock object at 0x7fad60209eb0 [locked, waiters:1]>
#start 3
#end 3
#<asyncio.locks.Lock object at 0x7fad60209eb0 [locked]>
#start 4
#end 4
#5.0180981159210205
sample_loop4는 semaphore에 대한 예시다.
내가 await으로 어떤 일들을 받는데, 호출 시도를 할 때마다 semaphore 내 카운터가 계속 감소하고, 그 task 호출을 완료할 때마다 count+=1을 하게 된다. 즉 예를 들어 Semaphore(value=20)일 때는 async task를 20번까지 호출 시도를 하겠다는 의미다. 아래 예시를 보면 semaphore가 await 호출될 땐 1씩 감소하고, 호출이 끝날 땐 +1이 되는 걸 알 수 있다.
async def sample_async_sema(semaphore, idx):
result = 0
async with semaphore:
print(semaphore._value)
await sleep_fcn(idx)
print(semaphore._value)
return
def sample_loop4():
loop = asyncio.new_event_loop() # 새 event loop을 만들었다.
asyncio.set_event_loop(loop) # loop을 현 event loop으로 지정한다.
semaphore = asyncio.Semaphore(value=20) #20번 호출할 수 있는 semaphore 생성
tasks = []
for i in range(5):
task = sample_async_sema(semaphore, i)
tasks.append(task)
loop.run_until_complete(asyncio.gather(*tasks))
loop.close() #loop가 끝나면 닫아준다.
st = time.time()
sample_loop4()
print(time.time() - st)
### 결과
#19
#start 0
#18
#start 1
#17
#start 2
#16
#start 3
#15
#start 4
#end 0
#15
#end 1
#16
#end 2
#17
#end 3
#18
#end 4
#19
#1.0043728351593018
그러면 semaphore의 value가 0일 땐? 동시성이 발현되지 않는 것처럼 보일 것이다!
4. 살짝의 응용
지금까지의 개념에 조금 더하여 비동기를 활용한 예제를 한 번 만들어보자. 여러 파일을 비동기로 읽는 프로그램을 작성해보자. 우선 5개 정도의 텍스트 파일을 만들자.
def make_files(idxs): #예시로 읽어들일 파일 5개정도를 만들자.
for idx in range(idxs):
with open(f'python_example/example{idx}.txt', 'w') as f:
f.write(f'start {idx}')
f.close()
이제 example0~4.txt 파일을 만들었으니 이를 async를 통해 읽어보자.
from multiprocessing import Pool #병렬성을 지닌 멀티프로세스를 추가해보자.
import aiofiles # 비동기일 때 파일을 읽을 수 있는 라이브러리
async def sample_read(semaphore, idx): #간단하게 example.txt파일을 읽는 비동기 프로그램이다.
async with semaphore, aiofiles.open(f'example{idx}.txt', 'r') as f:
result = await f.readlines()
f.close()
return result
def run_task():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
semaphore = asyncio.Semaphore(value=20)
tasks = [sample_read(semaphore, i) for i in range(5)]
result = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
return result
def run():
st = time.time()
pool = Pool(1)
asyncs = []
result = []
asyncs = pool.apply_async(run_task,(), callback=result.append) #이 callback에 비동기로 읽어온 결과가 담긴다.
asyncs.wait()
print(result)
print(time.time() - st)
추가된 것은 multiprocessing 하나다. 이는 병렬로 함수를 실행할 수 있고, 비동기로 읽은 결과를 callback으로 받아올 수 있다.
우리가 def sample_read에서 읽은 결과들이 def run()의 result에 담길 것이다.
if __name__ == '__main__':
run()
### 결과
#[[['start 0'], ['start 1'], ['start 2'], ['start 3'], ['start 4']]]
#0.0845940113067627
위 예제 같은 경우는 멀티프로세스의 worker를 1개로 했기 때문에, pool을 사용하지 않아도 같은 결과를 얻을 수 있다.
if __name__ == '__main__':
result = run_task()
print(result)
###결과
#[['start 0'], ['start 1'], ['start 2'], ['start 3'], ['start 4']]
이번 포스트에서는 파이썬에서 비동기 프로그램을 작성하는 방법에 대해 자세히 다뤄봤다. 이 예제를 몇 안 되는 사람들이 읽겠지만, 실전에서 어려움 없이 asyncio 구문을 활용할 수 있기를 바라는 마음으로 작성해 보았다. ML엔지니어도 이렇게 비동기 함수를 작성할 일이 생긴다. 받고 싶은 데이터가 있는데 너무 클 때, 그 큰 데이터를 받는 중 쓰레기 값이 있을 때 함수를 변형해서 체크할 수도 있겠다.
'python > 파이썬 코딩의 기술' 카테고리의 다른 글
[python] try/ except /finally/else (0) | 2022.04.13 |
---|---|
Thread - 파이썬 코딩의 기술 리뷰[동시성과 병렬성 2] (0) | 2022.03.01 |
python3 애트리뷰트, 메타클래스 - 2 (0) | 2022.02.20 |
파이썬 리스트 append 연산 시 arr와 arr[:] 차이점 (0) | 2022.02.12 |
파이썬 코딩의 기술 리뷰 - 메타클래스와 애트리뷰트 1 (0) | 2022.02.01 |