Dev.Chan64's Blog

Go Home
Show Cover Slide Show Cover Slide

gpt-4-turbo has translated this article into English.


Designing a Python System for Asynchronous Messaging and MSA


1. Overview

This document is based on practical experience in developing an AI-based robotic control system and operating handlers in a Function as a Service (FaaS) model.

In real-world systems, especially those dealing with robots or AI services, frequent state changes and routine external response delays are common. In such environments, the intertwining of communication flow, state management, and processing logic can lead to the following issues:

This document aims to solve these problems by explaining how to design an asynchronous message architecture based on Python’s asyncio.

The focus is on structuring complex systems and building scalable architectures using message-based flows, FSM (Finite State Machine) for state transition management, and queues for separating communication and processing.


2. Understanding Message-driven Architecture

2.1 Problems with State-centric Structures

State-centric systems can encounter several issues:

2.2 Message-driven Processing Model

The message-driven processing approach includes:

2.3 FSM-based Transition Model

The FSM-based model adheres to the following principles:

Message-based FSM model diagram:

stateDiagram-v2
    [*] --> Idle
    Idle --> Working : start(message)
    Working --> Idle : stop(message)

2.4 Compatibility of async/await with Message-driven Processing

Python’s async/await syntax is highly suitable for message-based architectures:

In conclusion, the asyncio-based structure combined with an FSM + message-driven model offers both flexibility and robustness needed for real-world systems.


3. Introduction to Key Python Asynchronous Components

Python’s asynchronous programming revolves around asyncio, offering various core components for effective use. These can maintain a structural code while managing complex message flows.

3.1 Coroutine

Coroutines are fundamental units in Python asynchronous programming:

async def fetch_data():
    await asyncio.sleep(1)
    return "data"

async def main():
    result = await fetch_data()
    print(result)

asyncio.run(main())

3.2 Basic asyncio API

asyncio is a standard library for executing and managing coroutines and task scheduling.

Key functions include:

import asyncio

async def task(name, delay):
    await asyncio.sleep(delay)
    print(f"Task {name} done")

async def main():
    await asyncio.gather(
        task("A", 1),
        task("B", 2)
    )

asyncio.run(main())

3.3 Using asyncio.Queue for Message Buffering

asyncio.Queue provides a structure for safely transferring messages between multiple producers and consumers.

Features:

import asyncio

queue = asyncio.Queue()

async def producer():
    for i in range(5):
        await queue.put(f"message {i}")
        print(f"Produced: message {i}")

async def consumer():
    while True:
        message = await queue.get()
        print(f"Consumed: {message}")
        queue.task_done()

async def main():
    consumer_task = asyncio.create_task(consumer())
    await producer()
    await queue.join()
    consumer_task.cancel()

asyncio.run(main())

3.4 Handling External Synchronous API Calls with run_in_executor

CPU-bound tasks or external synchronous API calls can be managed in separate threads or processes using asyncio’s run_in_executor.

Usage examples:

import asyncio
import time

def blocking_io():
    time.sleep(2)
    return "blocking result"

async def main():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, blocking_io)
    print(result)

asyncio.run(main())

4. Basic Message Handling Structure Design

Message-based asynchronous systems follow this basic processing flow:

  1. Message Reception: Asynchronously receive messages from external inputs (WebSocket, TCP, etc.).
  2. Queue Storage: Store received messages immediately in asyncio.Queue.
  3. Dispatcher Consumption: A separate Dispatcher consumes the Queue and passes messages to the FSM.
  4. FSM Processing: Branches messages based on the current state and calls the appropriate handler coroutine.
  5. Handler Execution: Performs business logic in the handler.

4.1 System Flow Diagram

flowchart LR
    A[Message Source] --> B[asyncio.Queue]
    B --> C[FSM Dispatcher]
    C --> D{State Decision}
    D --> |Idle → Working| E[Work Handler Coroutine]
    D --> |Working → Idle| F[Stop Handler Coroutine]

4.2 Message Reception → Queue → FSM Branching → Handler Processing

The structure is summarized as follows:

import asyncio

# Message queue
queue = asyncio.Queue()

# Message reception (e.g., WebSocket, TCP, etc.)
async def message_listener():
    while True:
        msg = await receive_message()  # Asynchronous reception
        await queue.put(msg)

# State-based FSM
class StateMachine:
    def __init__(self):
        self.state = "idle"

    async def handle(self, msg):
        if self.state == "idle" and msg.type == "start":
            self.state = "working"
            await self.do_work()
        elif self.state == "working" and msg.type == "stop":
            self.state = "idle"
            await self.stop_work()

    async def do_work(self):
        print("Starting work.")

    async def stop_work(self):
        print("Stopping work.")

# Dispatcher
async def dispatcher(fsm):
    while True:
        msg = await queue.get()
        await fsm.handle(msg)
        queue.task_done()

# Main loop
async def main():
    fsm = StateMachine()
    await asyncio.gather(
        message_listener(),
        dispatcher(fsm)
    )

asyncio.run(main())

4.3 Code Structure Summary

Component Description
message_listener() Asynchronously receives messages from external systems and stores them in the Queue
asyncio.Queue Safely buffers and sequentially consumes messages
StateMachine Branches messages and calls handlers based on the state
dispatcher() Retrieves messages from the Queue and delivers them to the FSM
main() Executes all asynchronous tasks in parallel

5. Bad vs Good Practices

Complex systems often encounter structural issues due to intertwined communication, state, and processing logic.

This section compares commonly encountered bad structures (anti-patterns) with recommended architectural patterns.

5.1 Examples of Bad Structures (Anti-patterns)

Characteristics:

Example code:

async def handle_request():
    msg = await receive_message()

    if msg.type == "start":
        await start_work()
    elif msg.type == "stop":
        await stop_work()
    else:
        print("Unknown message")

    if error_detected():
        await retry()

Issues:

Characteristics:

Recommended structure flowchart:

flowchart TD
    A[Message Listener] --> B[asyncio.Queue]
    B --> C[Dispatcher]
    C --> D{FSM State Decision}
    D -->|Valid Transition| E[Handler Coroutine]
    D -->|Invalid Transition| F[Error Handling]

Recommended code example:

async def dispatcher(fsm):
    while True, msg = await queue.get()
        await fsm.handle(msg)
        queue.task_done()

class StateMachine:
    async def handle(self, msg):
        handler = self.route(msg)
        if handler:
            await handler(msg)
        else:
            await handle_error(msg)

    def route(self, msg):
        # Select handler based on state and type
        ...

    async def handle_error(self, msg):
        print(f"Unhandled message: {msg}")

5.3 Comparison from a Testing/Debugging Perspective

Item Bad Structure Recommended Structure
Testability Mixed reception/state/logic makes testing difficult. Separate FSM/handler testing is possible.
State Tracking Complex branching makes flow tracking difficult. State transition logs enable flow tracking.
Exception Handling Duplicated exception handling across the code. Global error handling is possible.
Maintainability Small changes require extensive modifications. Independent component changes are possible.

Summary


6. Advanced Scalability Strategies

Small systems may suffice with asyncio.Queue, but larger systems often require:

Here are some advanced scalability strategies.

6.1 Separating and Externalizing FSM Logic

Separating FSM (State Machine) logic into a separate module offers several benefits:

Library example:

from transitions import Machine

class Worker:
    pass

worker = Worker()
machine = Machine(model=worker, states=["idle", "working"], initial="idle")
machine.add_transition(trigger="start", source="idle", dest="working")
machine.add_transition(trigger="stop", source="working", dest="idle")

6.2 Introducing an External Message Broker

asyncio.Queue operates within a process. To share messages between services, introducing an external broker is necessary.

Key options include:

Structure comparison:

Item asyncio.Queue Kafka/NATS/RabbitMQ
Scope Within process Across networks
Durability Memory-based, may lose data Disk-based, reprocessable
Scalability Limited Horizontally scalable

External broker integration example (NATS):

import asyncio
from nats.aio.client import Client as NATS

async def run():
    nc = NATS()
    await nc.connect(servers=["nats://localhost:4222"])

    async def message_handler(msg):
        data = msg.data.decode()
        await queue.put(data)

    await nc.subscribe("robot.events", cb=message_handler)

asyncio.run(run())

6.3 Containerizing and Versioning FSM

Containerizing the FSM itself allows the following configurations:

FSM routing example:

async def fsm_router(msg):
    version = msg.get("version", "v1")
    if version == "v1":
        await call_fsm_service("http://fsm-v1/api/fsm", msg)
    elif version == "v2":
        await call_fsm_service("http://fsm-v2/api/fsm", msg)

async def call_fsm_service(url, msg):
    import aiohttp
    async with aiohttp.ClientSession() as session:
        async with session.post(url, json=msg) as resp:
            return await resp.json()

6.4 Message Tracing and Monitoring Systems

Large-scale systems require the ability to trace message flows.

Essential strategies:

Structure diagram:

flowchart TD
    A[Message Producer] --> B[Message Broker]
    B --> C[Dispatcher]
    C --> D[FSM Processor]
    D --> E[Handler Execution]
    D --> F[Tracing & Logging System]

7. Best Practices Checklist for Practical Application

When applying message-based asynchronous architecture to actual projects, it’s crucial to check the structural quality using the following checklist.

7.1 Design Phase

7.2 Development Phase

7.3 Operation/Expansion Phase


8. References

The concepts and techniques discussed in this document are based on the following resources.

8.1 Official Documentation

8.2 Advanced Learning Materials


Go Home
Tags: Design Philosophy Project