지난 포스트에서는 동시성과 병렬성에 대해 아래와 같이 다루어봤다.

  1.  동시성 병렬성의 개념, 스레드와 코루틴의 개념
  2. 스레드 심화. 스레드의 lock, queue를 이용하는 방법 

 

이번에는 책 '파이썬 코딩의 기술'의 7장, 파이썬 공식문서 내용을 포함해서 asyncio에 대해 비교적 자세히 다뤄보겠다. 책 리뷰보단 공식문서를 더 차용한 거 같아 이번 시리즈 제목은 간단하게 asyncio로 간다. 

 

파이썬 코루틴 공식문서 : https://docs.python.org/ko/3/library/asyncio-task.html 

지난글 1:  https://hi-lu.tistory.com/entry/파이썬-코딩의-기술-리뷰-동시성과-병렬성-1
지난 글 2: https://hi-lu.tistory.com/entry/Thread-파이썬-코딩의-기술-리뷰동시성과-병렬성-2 


 

0.  기본 asyncio 기능

asyncio 가 제공하는 함수에 대해서 자세히 알아보도록 하겠다. 

  1. async : 코루틴을 만드는 방법. 함수 선언 def 이전에 붙여서 실행한다.
  2. await : 코루틴이 중단되는 곳. await구문에서 코루틴이 일시 중단되고, awaitable을 해결한 다음 async 함수로 실행을 재개한다. 코루틴 인스턴스를 반환한다. (제너레이터 yield와 비슷함) 
  3. awaitable : 함수는 아니지만 await 표현식에서 사용될 수 있는 객체를 뜻한다. 
  4.  gather : async로 정의된 코루틴들을 실행하게끔 한다.  
  5. run : 코루틴을 실행, 결과를 반환한다. 새로운 event loop를 만들고 끝에는 이 event loop을 닫는다. 

 

 

 

 

여기까지를 파이썬 공식문서의 예시 코드를 살짝 변형해서 확인해보자. 

import asyncio
async def sleep_fcn(idx): #async로 코루틴 함수를 정의했다. 
    print(f'start {idx}')
    await asyncio.sleep(1) # sleep_fcn()코루틴은 이 await구문에서 일시 중단된다. 
    print(f'end {idx}')
    
async def sample1(): #async로 코루틴 함수를 정의했다. 
    tasks = []
    for i in range(5):
        task = sleep_fcn(i) #위에서 정의한 sleep_fcn 코루틴함수를 받아오고 있다. 이때 task에 저장되는 값은 await에 사용할 수 있는 코루틴 인스턴스다.
        tasks.append(task)
    await asyncio.gather(*tasks) # gather로 받아온 태스크들을 await구문으로 실행할 수 있다.

 

 

위 코드를 실행한다면 아래와 같다. 참고로 이번 코드들을 실습할 때 나처럼 jupyter notebook에서 실행한다면 asyncio.run으로 실행했을 때 runtime error가 날 것이다. jupyter notebook이 이미 event loop으로 동일 스레드에서 돌아가고 있기 때문이다. 

import time
start_time = time.time()
#asyncio.run(sample1()) # 그냥 파이썬 파일로 실행시킨다면 이걸 쓰면 됨 
await sample1() # jupyter notebook의 경우는 이미 주피터가 event loop에서 실행되고 있기 때문에, await으로 실행해주자.
print(time.time() - start_time)
####결과#####
#start 0
#start 1
#start 2
#start 3
#start 4
#end 0
#end 1
#end 2
#end 3
#end 4
#1.006260871887207

출력 값을 확인하면 코루틴의 동시성이 잘 활용되고 있음을 알 수 있다. 

 

 

 

1. asyncio 기능 2 

  1. wait : awaitable 객체를 동시에 실행하고 resturn_when 인자의 조건을 달성할 때까지 블록한다. 
  2. get_event_loop : 현 event loop을 가져온다.
  3. set_event_loop : 현 loop을 event loop으로 지정한다. 
  4. new_event_loop : 새로운 event loop 객체를 반환한다. 
  5. run_until_complete : 인자 loop가 끝날 때까지 실행한다. 인자가 corutine객체일 경우 asyncio.Task로 실행되게끔 예약한다.
  6. Task : 코루틴 실행이 가능한 객체로, 이벤트 루프는 한 번에 하나의 Task를 실행한다. 
  7. asyncio.create_task : 고수준 api. Task를 만든다.
  8.  loop.create_task : 저수준 api. task를 만들고, 코루틴의 실행을 예약한다. 

 

이벤트 루프로 코루틴의 비동기 태스크, 콜백, I/O연산, subprocess 관리 등을 할 수 있다. 위의 기능보단 저수준 api이다. 예시 코드를 task와 event_loop 두 가지로 작성해 보자. (실행하실 분들은 주피터 말고 cmd에서 python 파일로 실행할 것!)

def sample_loop1(): #저수준 api
    loop = asyncio.new_event_loop() # 새 event loop을 만들었다.
    asyncio.set_event_loop(loop) # loop을 현 event loop으로 지정한다.
    tasks = []
    for i in range(5):
        task = sleep_fcn(i)
        tasks.append(task)
    loop.run_until_complete(asyncio.gather(*tasks))
    loop.close() #loop가 끝나면 닫아준다. 
    
def sample_loop2(): # 고수준 api인 asyncio.create_task를 이용해서 실행할 수도 있다. 
    for i in range(5):
        asyncio.create_task(sleep_fcn(i))
    
sample_loop1() #이 두 함수의 실행 결과는 (당연하게도) 같다. 
sample_loop2()

확실히 sample_loop1보다 sample_loop2가 코드가 짧고 좋아보일 수 있다. 하지만 저수준 api를 사용해야 할 순간이 분명 온다. 많은 데이터 파일을 한 번에 비동기로 받아오고 싶을 때와 같이 I/O 연산이 필요할 때, callback을 받아야 할 때 등 에서 event_loop을 사용하게 된다. 

 

 

 

 

 

2. asyncio 기능 3

  1. Lock : 태스크를 위한 mutex lock을 걸어주며, 주로 async with와 함께 사용한다. 공유 자원에 대한 독점 액세스를 보장할 때 사용한다. (== thread safe X. cf.) 스레드는 공유하는 데이터에 동시 접근을 막기 위해 lock을 걸어줬다. asyncio.Lock을 걸어주면 다른 스레드들이 접근할 때 안정성을 보장할 수 없게 된다.) 
  2. Semaphore : 마찬가지로 스레드 안정성이 없다. 호출될 때마다 내부 카운터가 하나씩 감소하는 형태로, 0이 된다면 block 한다. 

 

위 2개 개념을 사용해 새로운 예제를 작성해보자. 아래 sample_loop3은  lock을 걸었기 때문에 한 코루틴이 자원을 독점하여 이전처럼 1.x초만에 실행되지 않는다. 실행 순서도 한 coroutine이 끝난 후에 다른 코루틴이 실행되는 것을 알 수 있다. 

#lock을 사용하는 예제
async def sample_async(lock, semaphore, idx):
    result = 0
    async with lock, semaphore: #여기의 semaphore는 await문을 만나기 전에 semaphore.acquire()를, 그 후에는 semapohre.release()를 한다. 
    	print(lock)
        await sleep_fcn(idx) 
    return

def sample_loop3():
    loop = asyncio.new_event_loop() # 새 event loop을 만들었다.
    asyncio.set_event_loop(loop) # loop을 현 event loop으로 지정한다.
    lock = asyncio.Lock() # asyncio를 위한 mutex lock을 걸어준다. 
    semaphore = asyncio.Semaphore(value=1) #1번 호출할 수 있는 semaphore 생성

    tasks = []
    
    for i in range(5):
        task = sample_async(lock, semaphore, i)
        tasks.append(task)
    loop.run_until_complete(asyncio.gather(*tasks))
    loop.close() #loop가 끝나면 닫아준다. 

st = time.time()
sample_loop3()
print(time.time() - st)

#### 결과
#start 0
#end 0
#<asyncio.locks.Lock object at 0x7fad60209eb0 [locked, waiters:3]>
#start 1
#end 1
#<asyncio.locks.Lock object at 0x7fad60209eb0 [locked, waiters:2]>
#start 2
#end 2
#<asyncio.locks.Lock object at 0x7fad60209eb0 [locked, waiters:1]>
#start 3
#end 3
#<asyncio.locks.Lock object at 0x7fad60209eb0 [locked]>
#start 4
#end 4
#5.0180981159210205

 

 

sample_loop4는 semaphore에 대한 예시다.

내가 await으로 어떤 일들을 받는데, 호출 시도를 할 때마다 semaphore 내 카운터가 계속 감소하고, 그 task 호출을 완료할 때마다 count+=1을 하게 된다. 즉 예를 들어 Semaphore(value=20)일 때는 async task를 20번까지 호출 시도를 하겠다는 의미다. 아래 예시를 보면 semaphore가 await 호출될 땐 1씩 감소하고, 호출이 끝날 땐 +1이 되는 걸 알 수 있다.

async def sample_async_sema(semaphore, idx):
    result = 0
    async with semaphore:
        print(semaphore._value)
        await sleep_fcn(idx) 
        print(semaphore._value)
    return
    
def sample_loop4():
    loop = asyncio.new_event_loop() # 새 event loop을 만들었다.
    asyncio.set_event_loop(loop) # loop을 현 event loop으로 지정한다.
    semaphore = asyncio.Semaphore(value=20) #20번 호출할 수 있는 semaphore 생성
    tasks = []
    
    for i in range(5):
        task = sample_async_sema(semaphore, i)
        tasks.append(task)
    loop.run_until_complete(asyncio.gather(*tasks))
    loop.close() #loop가 끝나면 닫아준다. 

st = time.time()
sample_loop4()
print(time.time() - st)

### 결과
#19
#start 0
#18
#start 1
#17
#start 2
#16
#start 3
#15
#start 4
#end 0
#15
#end 1
#16
#end 2
#17
#end 3
#18
#end 4
#19
#1.0043728351593018

그러면 semaphore의 value가 0일 땐? 동시성이 발현되지 않는 것처럼 보일 것이다!

 

 

 

4. 살짝의 응용

지금까지의 개념에 조금 더하여 비동기를 활용한 예제를 한 번 만들어보자.  여러 파일을 비동기로 읽는 프로그램을 작성해보자. 우선 5개 정도의 텍스트 파일을 만들자. 

def make_files(idxs): #예시로 읽어들일 파일 5개정도를 만들자. 
    for idx in range(idxs): 
        with open(f'python_example/example{idx}.txt', 'w') as f:
            f.write(f'start {idx}')
            f.close()

이제 example0~4.txt 파일을 만들었으니 이를 async를 통해 읽어보자. 

from multiprocessing import Pool #병렬성을 지닌 멀티프로세스를 추가해보자.
import aiofiles # 비동기일 때 파일을 읽을 수 있는 라이브러리
async def sample_read(semaphore, idx): #간단하게 example.txt파일을 읽는 비동기 프로그램이다. 
    async with semaphore, aiofiles.open(f'example{idx}.txt', 'r') as f:
        result = await f.readlines()
        f.close()
    return result
    
def run_task():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    semaphore = asyncio.Semaphore(value=20)
    tasks = [sample_read(semaphore, i) for i in range(5)]
    result = loop.run_until_complete(asyncio.gather(*tasks))
    loop.close()
    return result
    
def run():
    st = time.time()
    pool = Pool(1)
    asyncs = []
    result = []
    asyncs = pool.apply_async(run_task,(), callback=result.append) #이 callback에 비동기로 읽어온 결과가 담긴다.
    asyncs.wait()
    print(result)
    print(time.time() - st)

추가된 것은 multiprocessing 하나다. 이는 병렬로 함수를 실행할 수 있고, 비동기로 읽은 결과를 callback으로 받아올 수 있다.

우리가 def sample_read에서 읽은 결과들이 def run()의 result에 담길 것이다. 

if __name__ == '__main__':
    run()



### 결과 
#[[['start 0'], ['start 1'], ['start 2'], ['start 3'], ['start 4']]]
#0.0845940113067627

위 예제 같은 경우는 멀티프로세스의 worker를 1개로 했기 때문에,  pool을 사용하지 않아도 같은 결과를 얻을 수 있다.

if __name__ == '__main__':
    result = run_task()
    print(result)
    
###결과
#[['start 0'], ['start 1'], ['start 2'], ['start 3'], ['start 4']]

 

 


이번 포스트에서는 파이썬에서 비동기 프로그램을 작성하는 방법에 대해 자세히 다뤄봤다. 이 예제를 몇 안 되는 사람들이 읽겠지만, 실전에서 어려움 없이 asyncio 구문을 활용할 수 있기를 바라는 마음으로 작성해 보았다. ML엔지니어도 이렇게 비동기 함수를 작성할 일이 생긴다. 받고 싶은 데이터가 있는데 너무 클 때, 그 큰 데이터를 받는 중 쓰레기 값이 있을 때 함수를 변형해서 체크할 수도 있겠다. 

 

728x90

+ Recent posts