Dev.Chan64's Blog

홈으로 가기
Show Cover Slide Show Cover Slide

비동기 메시지와 MSA를 위한 Python 시스템 설계

1. 개요 (Overview)

본 문서는 AI 기반 로봇 제어 시스템을 개발하고,
핸들러를 FaaS(Function as a Service) 형태로 운영한 실전 경험을 바탕으로 작성하였습니다.

실세계 시스템, 특히 로봇이나 AI 서비스를 다루는 환경에서는
상태 변화가 빈번하고, 외부 응답 지연이 일상적으로 발생합니다.
이러한 환경에서는 통신 흐름, 상태 관리, 처리 로직이 서로 얽히기 쉬워,
결과적으로 다음과 같은 문제를 야기하게 됩니다:

이 문서에서는 이러한 문제를 해결하기 위해,
Python asyncio 기반의 비동기 메시지 아키텍처를 설계하는 방법을 설명하고자 합니다.

메시지 기반 흐름, FSM(Finite State Machine)을 통한 상태 전이 관리,
Queue를 통한 통신과 처리 분리 전략을 중심으로,
복잡한 시스템을 구조적으로 정리하고 확장 가능한 아키텍처를 구축하는 방법을 다루고 있습니다.


2. 메시지 기반 구조 이해 (Message-driven Architecture)

2.1 상태 중심 구조의 문제점

상태 중심 시스템(State-centric System)에서는 다음과 같은 문제가 발생할 수 있습니다:

2.2 메시지 기반 처리 모델

메시지 중심 처리(Message-driven Processing)에서는 다음과 같은 접근 방식을 택합니다:

2.3 FSM 기반 전이 모델

FSM(유한 상태 기계) 기반 모델은 다음과 같은 원칙을 따릅니다:

메시지 기반 FSM 모델 다이어그램은 다음과 같습니다:

stateDiagram-v2
    [*] --> Idle
    Idle --> Working : start(message)
    Working --> Idle : stop(message)

2.4 async/await와 메시지 기반 처리의 궁합

Python의 async/await 구문은 메시지 기반 아키텍처에 매우 적합합니다:

결론적으로,
asyncio 기반 구조는 FSM + 메시지 기반 모델과 결합하여
실세계 시스템에서 필요한 유연성과 견고함을 모두 제공합니다.


3. Python 비동기 구성요소 소개 (Key Components)

Python의 비동기 프로그래밍은 asyncio를 중심으로 다양한 핵심 구성요소를 제공합니다.
이를 효과적으로 활용하면, 복잡한 메시지 흐름을 관리하면서도 구조적인 코드를 유지할 수 있습니다.

이 장에서는 주요 비동기 구성요소를 소개하고, 메시지 기반 아키텍처 설계에 어떻게 적용할 수 있는지 설명드립니다.

3.1 Coroutine

Coroutine은 Python 비동기 프로그래밍의 기본 단위입니다.

async def fetch_data():
    await asyncio.sleep(1)
    return "data"

async def main():
    result = await fetch_data()
    print(result)

asyncio.run(main())

3.2 asyncio 기본 API

asyncio는 코루틴 실행과 관리, 태스크 스케줄링을 위한 표준 라이브러리입니다.

주요 함수는 다음과 같습니다:

import asyncio

async def task(name, delay):
    await asyncio.sleep(delay)
    print(f"Task {name} done")

async def main():
    await asyncio.gather(
        task("A", 1),
        task("B", 2)
    )

asyncio.run(main())

3.3 asyncio.Queue를 활용한 메시지 버퍼링

asyncio.Queue는 다수의 프로듀서와 컨슈머 간에 안전하게 메시지를 전달할 수 있는 구조를 제공합니다.

특징:

import asyncio

queue = asyncio.Queue()

async def producer():
    for i in range(5):
        await queue.put(f"message {i}")
        print(f"Produced: message {i}")

async def consumer():
    while True:
        message = await queue.get()
        print(f"Consumed: {message}")
        queue.task_done()

async def main():
    consumer_task = asyncio.create_task(consumer())
    await producer()
    await queue.join()
    consumer_task.cancel()

asyncio.run(main())

3.4 run_in_executor를 이용한 외부 동기 API 처리

CPU 바운드 작업이나 외부 동기 API 호출은 asynciorun_in_executor를 통해 별도 스레드 또는 프로세스에서 처리할 수 있습니다.

활용 예시:

import asyncio
import time

def blocking_io():
    time.sleep(2)
    return "blocking result"

async def main():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, blocking_io)
    print(result)

asyncio.run(main())

4. 기본 메시지 처리 구조 설계 (Design Pattern)

메시지 기반 비동기 시스템은 다음과 같은 기본 처리 흐름을 따릅니다:

  1. 메시지 수신: 외부 입력(WebSocket, TCP 등)으로부터 메시지를 비동기로 수신합니다.
  2. Queue 저장: 수신한 메시지는 즉시 asyncio.Queue에 저장합니다.
  3. Dispatcher 소비: 별도의 Dispatcher가 Queue를 소비하며 FSM으로 메시지를 전달합니다.
  4. FSM 처리: 현재 상태에 따라 메시지를 분기하고, 적절한 핸들러 코루틴을 호출합니다.
  5. 핸들러 실행: 비즈니스 로직을 핸들러에서 수행합니다.

4.1 시스템 흐름 다이어그램

flowchart LR
    A[Message Source] --> B[asyncio.Queue]
    B --> C[FSM Dispatcher]
    C --> D{State Decision}
    D --> |Idle → Working| E[Work Handler Coroutine]
    D --> |Working → Idle| F[Stop Handler Coroutine]

4.2 메시지 수신 → Queue → FSM 분기 → 핸들러 처리

구조를 요약하면 다음과 같습니다:

import asyncio

# 메시지 큐
queue = asyncio.Queue()

# 메시지 수신 (예: WebSocket, TCP 등)
async def message_listener():
    while True:
        msg = await receive_message()  # 비동기 수신
        await queue.put(msg)

# 상태 기반 FSM
class StateMachine:
    def __init__(self):
        self.state = "idle"

    async def handle(self, msg):
        if self.state == "idle" and msg.type == "start":
            self.state = "working"
            await self.do_work()
        elif self.state == "working" and msg.type == "stop":
            self.state = "idle"
            await self.stop_work()

    async def do_work(self):
        print("작업을 시작합니다.")

    async def stop_work(self):
        print("작업을 종료합니다.")

# Dispatcher
async def dispatcher(fsm):
    while True:
        msg = await queue.get()
        await fsm.handle(msg)
        queue.task_done()

# 메인 루프
async def main():
    fsm = StateMachine()
    await asyncio.gather(
        message_listener(),
        dispatcher(fsm)
    )

asyncio.run(main())

4.3 코드 구성 요약

구성 요소 설명
message_listener() 외부 시스템으로부터 비동기적으로 메시지를 수신하고 Queue에 저장합니다
asyncio.Queue 메시지를 안전하게 버퍼링하고 순차적으로 소비합니다
StateMachine 상태에 따라 메시지를 분기하고 핸들러를 호출합니다
dispatcher() Queue에서 메시지를 꺼내 FSM에 전달합니다
main() 전체 비동기 작업을 병렬 실행합니다

5. 잘못된 구조 vs 권장 구조 (Bad vs Good Practices)

복잡한 시스템에서는 통신, 상태, 처리 로직이 서로 얽혀
구조적인 문제를 일으키기 쉽습니다.

이 장에서는 흔히 발생하는 잘못된 구조(안티패턴)와
권장하는 아키텍처 패턴을 비교하여 설명드리겠습니다.

5.1 잘못된 구조(안티패턴) 예시

특징:

예시 코드:

async def handle_request():
    msg = await receive_message()

    if msg.type == "start":
        await start_work()
    elif msg.type == "stop":
        await stop_work()
    else:
        print("Unknown message")

    if error_detected():
        await retry()

문제점:

5.2 권장 구조

특징:

권장 구조 흐름도:

flowchart TD
    A[Message Listener] --> B[asyncio.Queue]
    B --> C[Dispatcher]
    C --> D{FSM State Decision}
    D -->|Valid Transition| E[Handler Coroutine]
    D -->|Invalid Transition| F[Error Handling]

권장 코드 예시:

async def dispatcher(fsm):
    while True:
        msg = await queue.get()
        await fsm.handle(msg)
        queue.task_done()

class StateMachine:
    async def handle(self, msg):
        handler = self.route(msg)
        if handler:
            await handler(msg)
        else:
            await self.handle_error(msg)

    def route(self, msg):
        # 상태와 타입에 따라 핸들러 선택
        ...

    async def handle_error(self, msg):
        print(f"Unhandled message: {msg}")

5.3 테스트/디버깅 관점 비교

항목 잘못된 구조 권장 구조
테스트 용이성 수신/상태/로직이 섞여 테스트가 어렵습니다. FSM/핸들러를 별도로 테스트할 수 있습니다.
상태 추적 복잡한 분기문으로 흐름을 파악하기 어렵습니다. 상태 전이 기록으로 흐름을 추적할 수 있습니다.
예외 처리 코드마다 중복된 예외 처리가 발생합니다. 전역 오류 핸들링이 가능합니다.
유지보수성 작은 변경에도 광범위한 수정이 필요합니다. 각 구성요소를 독립적으로 변경할 수 있습니다.

정리


6. 고급 확장 전략 (Scalability Strategies)

작은 시스템에서는 asyncio.Queue만으로 충분할 수 있습니다.
그러나 시스템 규모가 커질수록 다음과 같은 요구가 발생하게 됩니다:

이러한 요구를 해결하기 위한 고급 확장 전략을 소개드립니다.

6.1 FSM 로직 분리 및 외부화

FSM(State Machine) 로직을 별도 모듈로 분리하면 다음과 같은 이점이 있습니다:

라이브러리 예시:

from transitions import Machine

class Worker:
    pass

worker = Worker()
machine = Machine(model=worker, states=["idle", "working"], initial="idle")
machine.add_transition(trigger="start", source="idle", dest="working")
machine.add_transition(trigger="stop", source="working", dest="idle")

6.2 외부 메시지 브로커 도입

asyncio.Queue는 프로세스 내부에서만 동작합니다.
서비스 간 메시지를 공유하려면 외부 브로커를 도입하는 것이 필요합니다.

주요 선택지는 다음과 같습니다:

구조 비교:

항목 asyncio.Queue Kafka/NATS/RabbitMQ
범위 프로세스 내부 네트워크 전역
내구성 메모리 기반, 유실 가능 디스크 기반, 재처리 가능
확장성 제한적 수평 확장 가능

외부 브로커 연동 예시 (NATS):

import asyncio
from nats.aio.client import Client as NATS

async def run():
    nc = NATS()
    await nc.connect(servers=["nats://localhost:4222"])

    async def message_handler(msg):
        data = msg.data.decode()
        await queue.put(data)

    await nc.subscribe("robot.events", cb=message_handler)

asyncio.run(run())

6.3 FSM 컨테이너화 및 버전 분리

FSM 자체를 컨테이너화하면 다음과 같은 구성을 만들 수 있습니다:

FSM 라우팅 예시:

async def fsm_router(msg):
    version = msg.get("version", "v1")
    if version == "v1":
        await call_fsm_service("http://fsm-v1/api/fsm", msg)
    elif version == "v2":
        await call_fsm_service("http://fsm-v2/api/fsm", msg)

async def call_fsm_service(url, msg):
    import aiohttp
    async with aiohttp.ClientSession() as session:
        async with session.post(url, json=msg) as resp:
            return await resp.json()

6.4 메시지 트레이싱 및 모니터링 체계

대규모 시스템에서는 메시지 흐름을 추적할 수 있어야 합니다.

필수 전략:

구조 다이어그램:

flowchart TD
    A[Message Producer] --> B[Message Broker]
    B --> C[Dispatcher]
    C --> D[FSM Processor]
    D --> E[Handler Execution]
    D --> F[Tracing & Logging System]

7. 실전 적용시 체크리스트 (Best Practices Checklist)

메시지 기반 비동기 아키텍처를 실제 프로젝트에 적용할 때에는
다음 체크리스트를 통해 구조적 품질을 점검하는 것이 중요합니다.

7.1 설계 단계

7.2 개발 단계

7.3 운영/확장 단계


8. 참고 자료 (References)

본 문서에서 다룬 개념과 기술은 다음 자료들을 참고하여 구성하였습니다.

8.1 공식 문서

8.2 심화 학습 자료

8.3 관련 키워드


홈으로 가기
태그: 설계철학 프로젝트