이번 포스트는 파이썬의 try / except / else / finally 블록에 대해 다루고자 한다. 전반적인 내용은 책 '파이썬 코딩의 기술' 8장인 강건성과 성능에 기반한다. 

 

  1. try/except
  2. finally
  3. else

 

 

이전 글 : 2022.03.01 - [파이썬] - Thread - 파이썬 코딩의 기술 리뷰[동시성과 병렬성 2]

 

Thread - 파이썬 코딩의 기술 리뷰[동시성과 병렬성 2]

저번 포스트에서 책의 7장 내용인 '동시성(concurrency)과 병렬성(parallelism)'에 대해 간략히 개념을 잡았다. 이번에는 thread의 약간 심화 버전으로 정리를 해보려고 한다. 지난 글:  https://hi-lu.tistory.c.

hi-lu.tistory.com


1. try / except

try 안 코드에서 에러가 발생하는 것을 except 블록에서 처리한다. 텍스트 파일을 읽는 예제를 생각해보자. 아래와 같이 파일을 읽는 구문을 try 블록에 넣는다. try except 구문을 쓰지 않았다면, 파일을 읽을 수 없을 때 Error가 발생하고 코드는 중지될 것이다. 

with open('sample.txt', 'r')as f:
    try:
        data = f.readlines()
    except:
        print('sample.txt 읽을 수 없음')

그러나 try/except 구문을 사용함으로써 이 에러는 except구문에서 잡히게 되고, 코드는 정상적으로 실행된다. 

 txt파일을 읽을 수 있다면 try 내 코드 잘 실행됨
반대로 txt파일을 읽다가 에러가 발생했다면 'sample.txt 읽을 수 없음'이 출력됨

 

 

2. finally

보통 try/except 블록에서 예외 (Exception)을 잡는다. finally 블록은 try 블록이 실행된 다음 항상 실행된다. 즉 finally 블록은 항상 실행된다. 

with open('sample.txt', 'r')as f:
    try:
        data = f.readlines()
    except: #try에서 에러가 발생할 시 except 블럭으로 넘어감
        print('sample.txt 읽을 수 없음')
    finally: #try블록 실행 후 실행. == 항상 실행
        f.close()

 

 

3. else

try에서 에러가 발생하면 except 블록으로 넘어갔다. 그렇다면 예외가 발생하지 않았다면? 그때는 else 블록으로 넘어갈 수 있다. 

이렇게 하면 try에 들어갈 코드가 줄어들어 어디서 에러가 생기는지 좀더 명확하게 볼 수 있다. 

with open('example0.txt', 'r')as f:
    try:
        data = f.readlines()
    except:
        print('sample.txt 읽을 수 없음')
    else:
        print(data)
    finally:
        f.close()
        print('파일 닫기')

이 else는 처리할 예외가 없을 경우에 실행하라는 뜻이다.  '예외(==except)가 발생하지 않았(==else)다면?' 

 

 

 

 

 

cf.) else는 재밌는 친구다. 아래와 같이 for문 밑에도 else를 사용할 수 있으며, 문법적으로 틀리지 않다. 코드를 실행시켜보면 알겠지만 for문이 끝나고 else구문이 실행된다! 즉, 위의 블록이 제대로 실행되면 else가 실행된다. 이는 if/else나 위의 try/except/else와는 반대로 보인다. 

for i in range(3):
    print(i)
else:
    print(i) # -> 출력됨

 

반면 아래와 같은 while문에 break가 있는 경우는 조금 다르다. else 위의 구문인 while가 정상적으로 실행되지 않고, break 문에서 중단되었다고 생각하는 것이다. ( 참고 : https://www.pythontutorial.net/python-basics/python-while-else/ )

i=0
while True:
    print(i)
    i+=1
    if i>=3:
        break
else:
    print(i) # -> 출력되지 않음

마찬가지로 위 for문에 break를 넣어 볼까? 그러면 else가 작동하지 않은 것을 알 수 있다. for문이 제대로 실행되지 않고 중단되었다고 본 것이다.

for i in range(3):
    print(i)
    if i==2:
        break
else:
    print(i) # -> 출력되지 않음

 

 

 

 


 

나는 try / except 구문은 자주 쓰지만 else, finally 블록과는 그리 친하지 않았다. 이번 기회에 실제로 프로그램을 짤 때 finally구문도 활용해 봐야겠다. 

 

 

 

 

728x90

지난 포스트에서는 동시성과 병렬성에 대해 아래와 같이 다루어봤다.

  1.  동시성 병렬성의 개념, 스레드와 코루틴의 개념
  2. 스레드 심화. 스레드의 lock, queue를 이용하는 방법 

 

이번에는 책 '파이썬 코딩의 기술'의 7장, 파이썬 공식문서 내용을 포함해서 asyncio에 대해 비교적 자세히 다뤄보겠다. 책 리뷰보단 공식문서를 더 차용한 거 같아 이번 시리즈 제목은 간단하게 asyncio로 간다. 

 

파이썬 코루틴 공식문서 : https://docs.python.org/ko/3/library/asyncio-task.html 

지난글 1:  https://hi-lu.tistory.com/entry/파이썬-코딩의-기술-리뷰-동시성과-병렬성-1
지난 글 2: https://hi-lu.tistory.com/entry/Thread-파이썬-코딩의-기술-리뷰동시성과-병렬성-2 


 

0.  기본 asyncio 기능

asyncio 가 제공하는 함수에 대해서 자세히 알아보도록 하겠다. 

  1. async : 코루틴을 만드는 방법. 함수 선언 def 이전에 붙여서 실행한다.
  2. await : 코루틴이 중단되는 곳. await구문에서 코루틴이 일시 중단되고, awaitable을 해결한 다음 async 함수로 실행을 재개한다. 코루틴 인스턴스를 반환한다. (제너레이터 yield와 비슷함) 
  3. awaitable : 함수는 아니지만 await 표현식에서 사용될 수 있는 객체를 뜻한다. 
  4.  gather : async로 정의된 코루틴들을 실행하게끔 한다.  
  5. run : 코루틴을 실행, 결과를 반환한다. 새로운 event loop를 만들고 끝에는 이 event loop을 닫는다. 

 

 

 

 

여기까지를 파이썬 공식문서의 예시 코드를 살짝 변형해서 확인해보자. 

import asyncio
async def sleep_fcn(idx): #async로 코루틴 함수를 정의했다. 
    print(f'start {idx}')
    await asyncio.sleep(1) # sleep_fcn()코루틴은 이 await구문에서 일시 중단된다. 
    print(f'end {idx}')
    
async def sample1(): #async로 코루틴 함수를 정의했다. 
    tasks = []
    for i in range(5):
        task = sleep_fcn(i) #위에서 정의한 sleep_fcn 코루틴함수를 받아오고 있다. 이때 task에 저장되는 값은 await에 사용할 수 있는 코루틴 인스턴스다.
        tasks.append(task)
    await asyncio.gather(*tasks) # gather로 받아온 태스크들을 await구문으로 실행할 수 있다.

 

 

위 코드를 실행한다면 아래와 같다. 참고로 이번 코드들을 실습할 때 나처럼 jupyter notebook에서 실행한다면 asyncio.run으로 실행했을 때 runtime error가 날 것이다. jupyter notebook이 이미 event loop으로 동일 스레드에서 돌아가고 있기 때문이다. 

import time
start_time = time.time()
#asyncio.run(sample1()) # 그냥 파이썬 파일로 실행시킨다면 이걸 쓰면 됨 
await sample1() # jupyter notebook의 경우는 이미 주피터가 event loop에서 실행되고 있기 때문에, await으로 실행해주자.
print(time.time() - start_time)
####결과#####
#start 0
#start 1
#start 2
#start 3
#start 4
#end 0
#end 1
#end 2
#end 3
#end 4
#1.006260871887207

출력 값을 확인하면 코루틴의 동시성이 잘 활용되고 있음을 알 수 있다. 

 

 

 

1. asyncio 기능 2 

  1. wait : awaitable 객체를 동시에 실행하고 resturn_when 인자의 조건을 달성할 때까지 블록한다. 
  2. get_event_loop : 현 event loop을 가져온다.
  3. set_event_loop : 현 loop을 event loop으로 지정한다. 
  4. new_event_loop : 새로운 event loop 객체를 반환한다. 
  5. run_until_complete : 인자 loop가 끝날 때까지 실행한다. 인자가 corutine객체일 경우 asyncio.Task로 실행되게끔 예약한다.
  6. Task : 코루틴 실행이 가능한 객체로, 이벤트 루프는 한 번에 하나의 Task를 실행한다. 
  7. asyncio.create_task : 고수준 api. Task를 만든다.
  8.  loop.create_task : 저수준 api. task를 만들고, 코루틴의 실행을 예약한다. 

 

이벤트 루프로 코루틴의 비동기 태스크, 콜백, I/O연산, subprocess 관리 등을 할 수 있다. 위의 기능보단 저수준 api이다. 예시 코드를 task와 event_loop 두 가지로 작성해 보자. (실행하실 분들은 주피터 말고 cmd에서 python 파일로 실행할 것!)

def sample_loop1(): #저수준 api
    loop = asyncio.new_event_loop() # 새 event loop을 만들었다.
    asyncio.set_event_loop(loop) # loop을 현 event loop으로 지정한다.
    tasks = []
    for i in range(5):
        task = sleep_fcn(i)
        tasks.append(task)
    loop.run_until_complete(asyncio.gather(*tasks))
    loop.close() #loop가 끝나면 닫아준다. 
    
def sample_loop2(): # 고수준 api인 asyncio.create_task를 이용해서 실행할 수도 있다. 
    for i in range(5):
        asyncio.create_task(sleep_fcn(i))
    
sample_loop1() #이 두 함수의 실행 결과는 (당연하게도) 같다. 
sample_loop2()

확실히 sample_loop1보다 sample_loop2가 코드가 짧고 좋아보일 수 있다. 하지만 저수준 api를 사용해야 할 순간이 분명 온다. 많은 데이터 파일을 한 번에 비동기로 받아오고 싶을 때와 같이 I/O 연산이 필요할 때, callback을 받아야 할 때 등 에서 event_loop을 사용하게 된다. 

 

 

 

 

 

2. asyncio 기능 3

  1. Lock : 태스크를 위한 mutex lock을 걸어주며, 주로 async with와 함께 사용한다. 공유 자원에 대한 독점 액세스를 보장할 때 사용한다. (== thread safe X. cf.) 스레드는 공유하는 데이터에 동시 접근을 막기 위해 lock을 걸어줬다. asyncio.Lock을 걸어주면 다른 스레드들이 접근할 때 안정성을 보장할 수 없게 된다.) 
  2. Semaphore : 마찬가지로 스레드 안정성이 없다. 호출될 때마다 내부 카운터가 하나씩 감소하는 형태로, 0이 된다면 block 한다. 

 

위 2개 개념을 사용해 새로운 예제를 작성해보자. 아래 sample_loop3은  lock을 걸었기 때문에 한 코루틴이 자원을 독점하여 이전처럼 1.x초만에 실행되지 않는다. 실행 순서도 한 coroutine이 끝난 후에 다른 코루틴이 실행되는 것을 알 수 있다. 

#lock을 사용하는 예제
async def sample_async(lock, semaphore, idx):
    result = 0
    async with lock, semaphore: #여기의 semaphore는 await문을 만나기 전에 semaphore.acquire()를, 그 후에는 semapohre.release()를 한다. 
    	print(lock)
        await sleep_fcn(idx) 
    return

def sample_loop3():
    loop = asyncio.new_event_loop() # 새 event loop을 만들었다.
    asyncio.set_event_loop(loop) # loop을 현 event loop으로 지정한다.
    lock = asyncio.Lock() # asyncio를 위한 mutex lock을 걸어준다. 
    semaphore = asyncio.Semaphore(value=1) #1번 호출할 수 있는 semaphore 생성

    tasks = []
    
    for i in range(5):
        task = sample_async(lock, semaphore, i)
        tasks.append(task)
    loop.run_until_complete(asyncio.gather(*tasks))
    loop.close() #loop가 끝나면 닫아준다. 

st = time.time()
sample_loop3()
print(time.time() - st)

#### 결과
#start 0
#end 0
#<asyncio.locks.Lock object at 0x7fad60209eb0 [locked, waiters:3]>
#start 1
#end 1
#<asyncio.locks.Lock object at 0x7fad60209eb0 [locked, waiters:2]>
#start 2
#end 2
#<asyncio.locks.Lock object at 0x7fad60209eb0 [locked, waiters:1]>
#start 3
#end 3
#<asyncio.locks.Lock object at 0x7fad60209eb0 [locked]>
#start 4
#end 4
#5.0180981159210205

 

 

sample_loop4는 semaphore에 대한 예시다.

내가 await으로 어떤 일들을 받는데, 호출 시도를 할 때마다 semaphore 내 카운터가 계속 감소하고, 그 task 호출을 완료할 때마다 count+=1을 하게 된다. 즉 예를 들어 Semaphore(value=20)일 때는 async task를 20번까지 호출 시도를 하겠다는 의미다. 아래 예시를 보면 semaphore가 await 호출될 땐 1씩 감소하고, 호출이 끝날 땐 +1이 되는 걸 알 수 있다.

async def sample_async_sema(semaphore, idx):
    result = 0
    async with semaphore:
        print(semaphore._value)
        await sleep_fcn(idx) 
        print(semaphore._value)
    return
    
def sample_loop4():
    loop = asyncio.new_event_loop() # 새 event loop을 만들었다.
    asyncio.set_event_loop(loop) # loop을 현 event loop으로 지정한다.
    semaphore = asyncio.Semaphore(value=20) #20번 호출할 수 있는 semaphore 생성
    tasks = []
    
    for i in range(5):
        task = sample_async_sema(semaphore, i)
        tasks.append(task)
    loop.run_until_complete(asyncio.gather(*tasks))
    loop.close() #loop가 끝나면 닫아준다. 

st = time.time()
sample_loop4()
print(time.time() - st)

### 결과
#19
#start 0
#18
#start 1
#17
#start 2
#16
#start 3
#15
#start 4
#end 0
#15
#end 1
#16
#end 2
#17
#end 3
#18
#end 4
#19
#1.0043728351593018

그러면 semaphore의 value가 0일 땐? 동시성이 발현되지 않는 것처럼 보일 것이다!

 

 

 

4. 살짝의 응용

지금까지의 개념에 조금 더하여 비동기를 활용한 예제를 한 번 만들어보자.  여러 파일을 비동기로 읽는 프로그램을 작성해보자. 우선 5개 정도의 텍스트 파일을 만들자. 

def make_files(idxs): #예시로 읽어들일 파일 5개정도를 만들자. 
    for idx in range(idxs): 
        with open(f'python_example/example{idx}.txt', 'w') as f:
            f.write(f'start {idx}')
            f.close()

이제 example0~4.txt 파일을 만들었으니 이를 async를 통해 읽어보자. 

from multiprocessing import Pool #병렬성을 지닌 멀티프로세스를 추가해보자.
import aiofiles # 비동기일 때 파일을 읽을 수 있는 라이브러리
async def sample_read(semaphore, idx): #간단하게 example.txt파일을 읽는 비동기 프로그램이다. 
    async with semaphore, aiofiles.open(f'example{idx}.txt', 'r') as f:
        result = await f.readlines()
        f.close()
    return result
    
def run_task():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    semaphore = asyncio.Semaphore(value=20)
    tasks = [sample_read(semaphore, i) for i in range(5)]
    result = loop.run_until_complete(asyncio.gather(*tasks))
    loop.close()
    return result
    
def run():
    st = time.time()
    pool = Pool(1)
    asyncs = []
    result = []
    asyncs = pool.apply_async(run_task,(), callback=result.append) #이 callback에 비동기로 읽어온 결과가 담긴다.
    asyncs.wait()
    print(result)
    print(time.time() - st)

추가된 것은 multiprocessing 하나다. 이는 병렬로 함수를 실행할 수 있고, 비동기로 읽은 결과를 callback으로 받아올 수 있다.

우리가 def sample_read에서 읽은 결과들이 def run()의 result에 담길 것이다. 

if __name__ == '__main__':
    run()



### 결과 
#[[['start 0'], ['start 1'], ['start 2'], ['start 3'], ['start 4']]]
#0.0845940113067627

위 예제 같은 경우는 멀티프로세스의 worker를 1개로 했기 때문에,  pool을 사용하지 않아도 같은 결과를 얻을 수 있다.

if __name__ == '__main__':
    result = run_task()
    print(result)
    
###결과
#[['start 0'], ['start 1'], ['start 2'], ['start 3'], ['start 4']]

 

 


이번 포스트에서는 파이썬에서 비동기 프로그램을 작성하는 방법에 대해 자세히 다뤄봤다. 이 예제를 몇 안 되는 사람들이 읽겠지만, 실전에서 어려움 없이 asyncio 구문을 활용할 수 있기를 바라는 마음으로 작성해 보았다. ML엔지니어도 이렇게 비동기 함수를 작성할 일이 생긴다. 받고 싶은 데이터가 있는데 너무 클 때, 그 큰 데이터를 받는 중 쓰레기 값이 있을 때 함수를 변형해서 체크할 수도 있겠다. 

 

728x90

저번 포스트에서 책의 7장 내용인 '동시성(concurrency)과 병렬성(parallelism)'에 대해 간략히 개념을 잡았다. 이번에는 thread의 약간 심화 버전으로 정리를 해보려고 한다.

 

지난 글: 
 https://hi-lu.tistory.com/entry/파이썬-코딩의-기술-리뷰-동시성과-병렬성-1

 

파이썬 코딩의 기술 리뷰- 동시성과 병렬성 1

이번에는 책의 7장 내용인 '동시성(concurrency)과 병렬성(parallelism)'에 대해 먼저 정리하겠다. 주로 개념들을 잡고 가는 기록이 될 듯하다. 동시성 , 병렬성 subprocess thread coroutine 개념잡기 - 동시성 &.

hi-lu.tistory.com

 


 

 

0.  간단 용어 설명

  • fan in : 프로세스 내 다음 단계로 가기 전, 동시 작업 단위(ex. thread)가 다 날 때까지 기다리는 과정
  • fan out : 여러 작업 단위가 동시에 실행되게 만드는 과정 

 

1.  Lock

스레드에는 GIL(Global Interperter Lock)이라는 전역 인터프리터 락이 있다. 이 락은 스레드들이 병렬적으로 실행되지 않도록 막는 용도로 지난 글에서 설명했다. 여기에 추가로 서로 다른 스레드들이 같은 데이터에도 접근하지 못하도록 막아줄 락이 필요하다. 

아래 실험은 간단히 10**5번씩 +=1을 하는 스레드들을 총 5개 실행한 결과다. 

### thread 실험
from threading import Thread
threads = []
class Sample:
    def __init__(self):
        self.sample = 0
    
    def add(self, count):
        self.sample += count
     
def thread_worker(dic, idx):
    for i in range(idx):
        dic.add(1)
        
#5개의 스레드를 만들어 Sample.sample에 1을 더하는 thread_worker함수를 돌린다고 가정하자. 
sample = Sample()
for i in range(5):
    thread = Thread(target=thread_worker, args=(sample, 10**5))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(sample.sample)# -> 342734. 실제로는 5 * 10**5가 나와야 함

 

이러한 결과가 나오는 이유는 a번 thread가 +=1을 하고 있는 도중 b번 thread가 실행되도록 중단되는 일이 발생했기 때문이다. 아래와 같이 Sample.sample에 1을 더하는 연산 중 어딘가에서 스레드가 일시 중단된 것이다. 

value = getattr(sample, 'sample') #Sample()로 가져온 클래스 오브젝트 sample의 'sample'애트리뷰트에 접근
value += 1
setattr(sample, 'sample', value)

 

이를 막기 위해서 Lock (mutex)를 사용하는 것이다. 코드를 다시 쓰면 다음과 같다. 

### thread 실험2
from threading import Thread, Lock
dic = {'sample':0}
threads = []
class SampleAndLock:
    def __init__(self):
        self.sample = 0
        self.lock = Lock()
    
    def add(self, count):
        with self.lock:
            self.sample += count
 
 
def thread_worker(dic, idx):
    for i in range(idx):
        dic.add(1)
        
#5개의 스레드를 만들어 dic['sample']에 1을 더하는 thread_worker함수를 돌린다고 가정하자. 
sample = SampleAndLock()
for i in range(5):
    thread = Thread(target=thread_worker, args=(sample, 10**5))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(sample.sample) #-> 5 * 10**5, 즉 500000이 잘 나온다.

 

cf) 이렇게 스레드와 Lock을 이용해서 풀 수 있는 예시 문제로는 voice recognition을 들 수 있다. 마이크모듈로 음성이 들어오고 있는지 계속 받아와야 하고(thread A) 해당 음성이 끝이 났는지 판별해야 하는(thread B) 경우를 생각할 때, thread A와 B 모두 음성이라는 동일 데이터에 접근하고 있다. 이럴 때 음성 데이터에 Lock이 잘 걸려있어야 함을 알 수 있다. 

 

 

 

2. queue

추가적으로, 스레드를 순차적으로 실행하기 위해 가급적 queue를 사용하자. deque를 사용하는것에 비해 다음과 같은 장점이 있다.

  1. thread 처리를 해야하는 데이터가 비어있을 때는 block을 해준다. (queue.get())
  2. 버퍼 크기를 정의할 수 있다. 즉, 실행시킬 thread 수를 지정하고 나머지 스레드는 잠시 받지 않을 수 있다. (Queue(max_len))
  3. 작업 진행을 추적할 수 있다. (queue.task_done())

cf) 파이썬의 리스트는 기본적으로 queue기능을 제공하기는 하지만, 여기서 말하는 queue는 내장 라이브러리를 의미한다. 

 

from thread import Thread
from queue import Queue
from queue import Queue
max_len = 100
my_queue = Queue(max_len) # queue가 100개가 넘으면 put을 블록

def ex_get():
    print('get start')
    my_queue.get() #먼저 실행되었지만 put()이 들어올 때 까지는 실행 X 
    print('get done')
    print('task')
    my_queue.task_done()
    print('task done')

def ex_put():
    print('put start')
    my_queue.put(object())
    print('put done')
    print('join') #task_done이 호출되기 전까지 끝나지 않게 기다림
    my_queue.join()
    print('join done')
    
threads = []
threads.append(Thread(target = ex_get))
threads.append(Thread(target = ex_put))
for thread in threads:
    thread.start()
    
#### 출력값 ####
#get start
#put start
#put done
#join
#get done
#
#task
#task done
#join done

 

 

3. ThreadpoolExecutor

I/O 동시성을 위해서는 asyncio를 사용해도 되지만, thread를 사용하는 방법도 존재한다. ThreadpoolExecutor는 exception을 자동으로 전파시켜준다. 간단하게 앞에서 사용한 예제를 이용해서 실험해보자. 

from concurrent.futures import ThreadPoolExecutor
MAX_WORKERS=10
class SampleAndLock:
    def __init__(self):
        self.sample = 0
        self.lock = Lock()
    
    def add(self, count):
        with self.lock:
            if self.sample + count >5:
                raise Exception('숫자가 5보다 큼')
            self.sample += count
    
    
def thread_worker(dic, idx):
    for i in range(idx):
        dic.add(1)
        
sample = SampleAndLock()
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool: # 사용가능한 규모를 미리 정해야 함
    for i in range(5):
        thread = Thread(target=thread_worker, args=(sample, i*2))
        thread.start()
        
        
####출력####
#Exception: 숫자가 5보다 큼
#Exception: 숫자가 5보다 큼
#Exception: 숫자가 5보다 큼

위 코드의 단점이라고는 max_workers를 사전에 정해서 나중에 스레드 수를 늘릴 수 없다는 점이다. 웬만하면 async를 이용하자. 

 


 

이번에는 thread에 대해 조금 깊이 들어가 보았다. 실제로 I/O를 생각하는 구현에서는 async를 조금 더 선호하게 되는 듯하다. 다음 포스트에선 async에 대한 심화를 들어가 보겠다. 

728x90

지난 포스트에 이어서 '파이썬 코딩의 기술' 책의 6장인 메타 클래스와 애트리뷰트에 대해 마저 기술해보도록 하겠다. 

 

  1. 메타클래스
  2. __init__subclass__
  3. __set_name__
  4. 클래스 데코레이터

 


0. 간단 정의

클래스 애트리뷰트 : 클래스 내 self가 붙어있는  친구들이라 보면 편하다. 클래스 내 object라고 보면 될 듯.

메타클래스 : 클래스를 넘어가는 개념, 클래스 문을 가로채서 특별한 동작을 진행할 수 있음. ex) 클래스 잘 구현됐는지 검증

 

1. 메타클래스

클래스가 잘 구현됐는지 검증하기 위해 메타클래스를 사용할 수 있다. __init__메서드에서 검증 코드를 실행하는 경우가 많은데, 이때의 장점은 클래스 모듈을 import 할 때와 같은 시점에 검증 코드가 실행되게 때문에 예외를 더 빨리 찾을 수 있다. 

 

메타클래스는 아래와 같이 정의할 수 있다. type을 상속받고, __new__메서드로 클래스 내용을 전달받는다. 

 

  • 메타클래스 사용법 1 : 클래스 검증
class MetaClass(type): #type을 상속해 정의
        def __new__(meta, name,bases, class_dict): #new 메서드로 자신과 연관된 클래스 내용 전달받음
            print(f'name {name}, meta {meta}')
            print(f'bases {bases}, class dict {class_dict}')
            if class_dict['cnt'] >1:
                raise ValueError('cnt 값 너무큼')
            
 class MyClass(metaclass = MetaClass):
    cnt = 1
    print(cnt)
    #1
    #name MyClass, meta <class '__main__.MetaClass'>
    #bases (), class dict {'__module__': '__main__', '__qualname__': 'MyClass', 'cnt': 1}
    #
    #

위와 같이 메타클래스에서 클래스들의 정보를 받아와, 검증에 사용할 수 있다. 

 

  • 단점: 한 클래스당 한 메타클래스만 사용할 수 있다.

 

 

 

 

 

2. __init_subclass__

위에서 메타클래스를 정의하고 사용하는 법에 대해 알아보는데, __init_subclass__를 사용하면 더 단순하게 클래스 검증에 사용할 수 있다. 

class LikeMetaClass:
    def __init_subclass__(cls):
        super().__init_subclass__()
        if cls.cnt >1:
            raise ValueError('cnt값 너무 큼')
    
    @classmethod
    def get_cnt(cls):
        print(cls.cnt)
        
class MyClass2(LikeMetaClass):
    cnt = 3
    
    
#Input In [9], in LikeMetaClass.__init_subclass__(cls)
#      3 super().__init_subclass__()
#      4 if cls.cnt >1:
#----> 5     raise ValueError('cnt값 너무 큼')
#
#ValueError: cnt값 너무 큼

 

 

3. __set_name__

클래스 애트리뷰트를 표시하는 방법 중 하나. 메타클래스 __new__ 에서 처리하는 일을 처리할 수 있는데 그 중 프로퍼티 이름을 변경할 수 있다. 어떻게 사용하는지만 알아두자. 

class Field:
    def __set_name__(self, owner, name):
        self.name = name
        self.internal_name = '_'+name

 

 

 

4. 클래스 데코레이터

functools.wrap으로 데코레이터를 생성할 수 있는데, 데코레이터를 각 메서드에 사용할 경우는 메서드마다 데코레이터를 써줘야 한다. 이러한 중복이 일어나기 때문에 가독성이 나빠진다.

from functools import wraps
#wrap으로 데코레이터 사용하는 예시
def trace_fcn(func):
    if hasattr(func, 'tracing'):
        return func
    
    @wraps(func)
    def wrapper(*args, **kwargs):
        result = None
        try :
            result = func(*args, **kwargs)
            return
        except Exception as e:
            result = e
            raise
    wrapper.tracing = True
    print(func.__name__)
    return wrapper

 

이럴 때 메타클래스 내부에서 wrap을 계속 호출할 수 있다. 

import types
class TraceMeta(type):
    def __new__(meta, name, bases, class_dict):
        clas = super().__new__(meta, name, bases, class_dict)
        for k in dir(clas):
            value= getattr(clas, k)
            #print(value)
            if isinstance(value, types.BuiltinMethodType):
                wrapped = trace_fcn(value)
                print(wrapped)
                setattr(clas, k, wrapped)
        return clas

하지만 메타클래스를 여러개 사용할 수 없는 단점이 적용되기독 하고, 메타클래스를 사용하는 방법은 제약이 많은 편이라 클래스 데코레이터를 사용하는 편이 더 낫다.

#클래스 데코레이터 사용법
def class_decorator(clas):
        for k in dir(clas):
            value= getattr(clas, k)
            #print(value)
            if isinstance(value, types.BuiltinMethodType):
                wrapped = trace_fcn(value)
                print(wrapped)
                setattr(clas, k, wrapped)
        return clas

@class_decorator
class MyClass:
    pass

 

 

 

 


이번 장은 내가 개발할 때 사용할만한 직접적인 지식은 아니어서, 1)메타클래스와 애트리뷰트의 개념과 2) 사용법을 간략하게 알아보는 정도로 기록했다. 감만 잡고 나중에 필요할 때 더 깊이 공부해보도록 하자.

728x90

최근 파이썬으로 순열을 짜다가 놓치고 있는 게 있는 것 같아 글을 쓴다. 

 


 

다음과 같이 1번 코드, 2번 코드는 순열(permutation)을 재귀 함수로 푸는 코드이다. 차이점은 5번째 줄 arr.append()에 파라미터로 tmp를 주느냐, tmp[:]를 주느냐의 차이다. 

    ## 1번 코드
    def permute(self, nums: List[int]) -> List[List[int]]:
        arr = []
        def backtrack(tmp = []):
            if len(tmp) == len(nums):
                arr.append(tmp)
                return
            for n in nums:
                if n in tmp:
                    pass
                else:
                    tmp.append(n)
                    backtrack(tmp)
                    tmp.pop()
            return
        backtrack()
        return arr
    ## 2번 코드
    def permute(self, nums: List[int]) -> List[List[int]]:
        arr = []
        def backtrack(tmp = []):
            if len(tmp) == len(nums):
                arr.append(tmp[:]) #1번 코드와 다른점
                return
            for n in nums:
                if n in tmp:
                    pass
                else:
                    tmp.append(n)
                    backtrack(tmp)
                    tmp.pop()
            return
        backtrack()
        return arr

 

 

 

 

하지만 각 코드의 결과는 다르다.

#1번코드 결과
permute([1,2,3]) # [[],[],[],[],[],[]]

#2번코드 결과
permute([1,2,3]) # [[1,2,3],[1,3,2],[2,1,3],[2,3,1],[3,1,2],[3,2,1]]

 

 

 

 

 

그이유는 arr 안에 들어가는 tmp는 유동적이지만 tmp의 주소는 바뀌지 않기 때문이다. tmp[:]은 주소는 제외하고 copy 할 수 있다. 따라서 1번 코드와 2번 코드의 6번째 줄에 print(arr)을 추가해보면 재밌는 결과가 나온다.

#1번코드
def combine(self, n: int, k: int) -> List[List[int]]:
    arr = []
        
    def backtrack(tmp = [], start=1):
        if len(tmp) == k:
            arr.append(tmp)
            print(arr)
            return
        for i in range(start, n+1)
        	tmp.append(i)
        	backtrack(tmp, i+1)
            tmp.pop()
        return
    backtrack()
    return arr
    
combine([1,2,3])
#[[1, 2]]
#[[1, 3], [1, 3]]
#[[1, 4], [1, 4], [1, 4]]
#[[2, 3], [2, 3], [2, 3], [2, 3]]
#[[2, 4], [2, 4], [2, 4], [2, 4], [2, 4]]
#[[3, 4], [3, 4], [3, 4], [3, 4], [3, 4], [3, 4]]
#2번코드
def combine(self, n: int, k: int) -> List[List[int]]:
    arr = []
        
    def backtrack(tmp = [], start=1):
        if len(tmp) == k:
            arr.append(tmp[:])
            print(arr)
            return
        for i in range(start, n+1)
        	tmp.append(i)
        	backtrack(tmp, i+1)
            tmp.pop()
        return
    backtrack()
    return arr
    
combine([1,2,3])
#[[1, 2]]
#[[1, 2], [1, 3]]
#[[1, 2], [1, 3], [1, 4]]
#[[1, 2], [1, 3], [1, 4], [2, 3]]
#[[1, 2], [1, 3], [1, 4], [2, 3], [2, 4]]
#[[1, 2], [1, 3], [1, 4], [2, 3], [2, 4], [3, 4]]

위와 같이 1번 코드는 tmp의 값이 재귀를 돌면서 계속 변하는데, 변한 마지막 값으로 같은 주소를 공유하는 tmp들이 다 변한다. 

 

 

 

 

 


 

파이썬으로 코딩하다보면 가끔 포인터, 주소에 대해 무신경해질 때가 가끔 있는 듯하다. 놓치지 말고 잘 챙기자.

728x90

책 '파이썬 코딩의 기술'의 6장인 메타클래스와 애트리뷰트에 대해서 정리해보고자 한다. 

  1. 애트리뷰트
  2. @property
  3. 동적 기능을 위한 애트리뷰트 메서드

0. 정의

  • 메타클래스 :
    • 정의 : 클래스를 넘어서는 개념, class문을 이용해 클래스가 정의될 때마다 특별한 동작 제공
    • 예시 : 동적으로 애트리뷰트 접근을 커스텀화해주는 내장 기능
    • 주의 : '최소 놀람의 법칙' 에 따라 뜻하지 않은 부작용을 피하기

 

1. 애트리뷰트

다른 언어를 사용할 때는 class 내부에 getter, setter를 명시해준다. 하지만 파이썬은 그럴 필요가 없다. 공개 애트리뷰트(self로 시작하는, 클래스에서 명시한 변수)를 이용하면 된다. 

# 1. setter, getter 사용 에시 
class OldOne:
    def __init__(self, x):
        self._x = x #애트리뷰트에 밑줄 하나는 보호 애트리뷰트!
    
    def get_x(self):
        return self._x
    
    def set_x(self, x):
        self._x = x
        
# 2. 공개 애트리뷰트만 사용할 때
class NewOne:
    def __init__(self, x):
        self.x = x
        self.y = 0
        self.z = 0

oo = OldOne(2)
oo.set_x(3)
print(oo.get_x()) # 3

no = NewOne(2)
no.x = 3
print(no.x) # 3

 

공개, 비공개 애트리뷰트가 헷갈린다면 이전 포스트의 PEP8 스타일을 참고하자 : https://hi-lu.tistory.com/entry/%ED%8C%8C%EC%9D%B4%EC%8D%AC-%EC%BD%94%EB%94%A9%EC%9D%98-%EA%B8%B0%EC%88%A0-%EB%A6%AC%EB%B7%B0-self%EC%99%80-cls-bytes%EC%99%80-str-f-%EB%AC%B8%EC%9E%90%EC%97%B4 

 

파이썬 코딩의 기술 리뷰 - self와 cls, bytes와 str, f-문자열, 왈러스 연산자

오늘은 책 '파이썬 코딩의 기술'을 읽고 파이썬으로 코딩하는 데에 유용한 내용들을 정리해보고자 한다. 이 책의 챕터는 다음과 같이 구성되어 있다. 파이썬답게 생각하기 리스트와 딕셔너리 함

hi-lu.tistory.com

 

2. @property

property

애트리뷰트에 접근할 때 특별한 동작을 위해서 setter, getter 메서드를 정의해야 할 일(ex. 계산)이 있을 것이다. 이렇게 특별한 기능을 수행할 때는 데코레이터 @property를 사용하면 된다. 다만 너무 과하게 프로퍼티를 사용할 때는 차라리 코드를 리펙토링 하는 것을 권장하고 있다. 

 

위 OldOne, NewOne 클래스 예시를 사용해서 다음과 같이 빌드업할 수 있다. 1번은 @property를 사용했을 때 클래스로, 공개애트리뷰트 z 값을 setter로 하여금 계산할 수 있게 되었다. 

# proptery 사용 
class PropertyOne(NewOne): 
	def __init__(self, x): 
    	super().__init__(x) 
        self._y = 0 
        
    @property 
    def y(self): 
    	return self._y 
        
    @y.setter 
    def y(self, y): 
    	self._y = y 
        self.z = self._y + self.x 
    
# property 미사용. 잘못된 예. npo.y를 하면 NewOne의 self.y가 호출될 것 
class NoPropertyOne(NewOne): 
	def __init__(self, x): 
    	super().__init__(x) 
        self._y = 0 
        
    def y(self, y):
    	self._y = y 
        self.z = self._y + self.x 
        return self._y
        

po = PropertyOne(3) 
print(po.z) # 0
po.y = 10 
print(po.z) #13 
npo =NoPropertyOne(3) 
print(npo.z) # 0
npo.y = 10 
print(npo.z) # 0

디스크립터

 

디스크립터 프로토콜 : 파이썬에서 애트리뷰트 접근을 해석하는 방법을 정의하는데, @property는 다른 클래스 간 공유하지 못하기 때문에 이럴 때 사용할 수 있는 방법이다.

 

__get__, __set__ 메서드를 제공하는데, 여기서 주의해야 할 점은 한 클래스를 애트리뷰트로 가지는 모든 인스턴스들이 공유한다는 것이다.

class EXAttribute:
    def __init__(self):
        self._value = 0
    
    def __get__(self, instance):
        return self._value
    
    def __set__(self, instance, value):
        self._value = value
        

class EXClass:
    exa1 = EXAttribute()
    exa2 = EXAttribute()
    
exc = EXClass
exc.exa1 = 10
print(exc.exa1) # 10
exc2 = EXClass
exc2.exa1 = 20
print(exc.exa1) # 20. 10이 나올거라 생각했지만 틀림. EXAttribute() 인스턴스가 EXClass 내에서 한번만 생성되었기 떄문

 

이를 옳게 표현하기 위해서는 다음과 같이 디스크립터를 사용하면 된다. (솔직히 def__init__해서 self로 클래스를 받아주면 해결되긴 하지만 디스크립터 기능을 사용할 수는 없었다. 만약 def __init__(self)로 해주면  __get__과 __set__이 호출이 안 되는 걸 확인할 수 있었다.)

from weakref import WeakKeyDictionary
class RightEXAttribute:
    def __init__(self):
        self._values = WeakKeyDictionary()#메모리 누수를 막기 위해 {} 대신 사용. _values가 __set__호출에 전달된 모든 인스턴스에 대해 참조를 저장하기 떄문. 
        
    def __get__(self, instance, instance_type):
        print(instance) # -> ExClass2의 object 반환
        if instance is None:
            return self
        return self._values.get(instance, 0)
    
    def __set__(self, instance, value):
        self._values[instance] = value
        

class EXClass2:
    exa1 = RightEXAttribute()
    exa2 = RightEXAttribute()
    
exc = EXClass2()
exc.exa1 = 10
print(exc.exa1) # 10
exc2 = EXClass2()
exc2.exa1 = 20
print(exc.exa1, exc2.exa1) # 10, 20

 

3. 동적 기능을 위한 애트리뷰트 메서드

위에서 살펴본 @property, 디스크립터를 사용하기 적합(사전 정의로 애트리뷰트를 가져오기) 하지 않는 경우, 즉 지연 계산 애트리뷰트가 필요하다면 사용할 수 있는 메서드는 다음과 같다. 

__getattr__

인스턴스 딕셔너리에서 존재하지 않는 애트리뷰트를 접근할 때 호출되는 매서드이다. 아래 예시처럼 1) 존재하지 않은 애트리뷰트를 가 호출될 때 __getattr__가 호출되고, 2) 그 안에서 setattr가 호출되어 인스턴스에 추가해준다.

class LazyRecord:
    def __init__(self):
        self.exists = 5
    
    def __getattr__(self, name):
        value = f'{name}!'
        setattr(self, name, value)
        return value
        
data =LazyRecord()
print(data.__dict__) #{'exists': 5}
print(data.no) #no!
print(data.__dict__) #{'exists': 5, 'no': 'no!'}
print(hasattr(data, '1')) # True를 출력함과 동시에 __dict__에 {'1' : '1!'}이 추가됨

 

__getattribute__

__getattr__처럼 애트리뷰트가 존재하지 않을 때만 호출되지 않고, 객체 애트리뷰트에 접근할 때 항상 호출된다. AttributeError가 발생할 때 setattr를 호출하면 __getattr__의 기능도 수행할 수 있다. 

 

이때 재귀를 피하기 위해서는 super().__getattribute__를 사용하면 된다. 재귀가 나타나는 이유는 self._data를 호출할 때 __getattribute__를 다시 호출하기 때문이다.(WrongGetAttributeRecord -> __getattribute__ -> self._data  -> __getattribute__ ->......)

# 잘못된 예
class WrongGetAttributeRecord:
    def __init__(self, data):
        self._data = data
    
    def __getattribute__(self, name):
        print(f'{name!r}')
        return self._data[name]
        
        
class RightGetAttributeRecord:
    def __init__(self, data):
        self._data = data
    
    def __getattribute__(self, name):
        print(f'{name!r}')
        data_dict = super().__getattribute__('_data')
        return data_dict[name]
        
data = WrongGetAttributeRecord({'name' : 1})
data.name # '_data'를 출력하며 무한재귀

rdata = RightGetAttributeRecord({'name' : 1})
rdata.name

 

다음 글에서는 아래 순서대로 이어서 정리를 해볼까 한다. 

  1. __init__subclass__
  2. __set_name__
  3. 클래스 데코레이터

애트리뷰트에 대한 근본적인 이해를 할 수 있는 챕터라고 생각한다. 올해도 꼬물꼬물 차근차근 힘 내보자.

 

728x90

+ Recent posts