본문으로 바로가기
*2월 한정! 홈페이지 신규 제작 20% 할인 + AI 챗봇 무료 제공지금 신청
ai-automation2026년 3월 13일·조회 3

메시지 큐 시스템의 공정한 슬롯 분배 전략: Bot별 리소스 할당 구현기

asyncio.Condition을 활용한 수신자별 공정 스케줄링과 긴급 메시지 우선처리 로직

SP

SpacePlanning

SpacePlanning AI Team

## 왜 공정한 리소스 분배가 중요한가 분산 시스템에서 여러 수신자(봇, 워커, 서비스)가 제한된 리소스를 공유할 때, 특정 수신자가 모든 슬롯을 독점하는 문제가 발생할 수 있습니다. 예를 들어 10개의 처리 슬롯이 있을 때, 한 봇이 대량의 메시지를 받으면 다른 봇들은 대기 상태에 빠지게 됩니다. 이번 글에서는 **수신자별 공정 슬롯 분배(Per-Receiver Fair Slot Distribution)** 전략을 Python asyncio로 구현한 경험을 공유합니다. ## 핵심 설계 원칙 ### 1. 수신자별 슬롯 제한 각 수신자(봇)는 일반 메시지에 대해 **최대 1개의 슬롯만** 사용할 수 있도록 제한합니다. 이를 통해: - 100개의 메시지를 받는 봇 A와 5개를 받는 봇 B가 동일한 기회 보장 - 전체 시스템 처리량 유지하면서 공정성 확보 ### 2. 긴급 메시지 우회 처리 긴급 메시지는 수신자별 제한을 **우회(bypass)**하여 즉시 처리됩니다. 비즈니스 크리티컬한 작업을 보장하기 위한 설계입니다. ## 구현 방법 ### asyncio.Condition 활용 Python의 `asyncio.Condition`을 사용하여 슬롯 할당을 동기화합니다: ```python import asyncio from collections import defaultdict class FairSlotManager: def __init__(self, max_slots=10): self.max_slots = max_slots self.current_slots = 0 self.bot_slot_tracker = defaultdict(int) # 봇별 사용 중인 슬롯 수 self.condition = asyncio.Condition() async def acquire_slot(self, bot_id, is_urgent=False): async with self.condition: # 긴급 메시지는 우회 if is_urgent: await self._wait_for_available_slot() self.current_slots += 1 return # 일반 메시지: 전체 슬롯 + 봇별 제한 확인 while (self.current_slots >= self.max_slots or self.bot_slot_tracker[bot_id] >= 1): await self.condition.wait() # PER_BOT_WAIT 로깅 지점 self.current_slots += 1 self.bot_slot_tracker[bot_id] += 1 # PER_BOT_RESERVE 로깅 async def release_slot(self, bot_id): async with self.condition: self.current_slots -= 1 if bot_id in self.bot_slot_tracker: self.bot_slot_tracker[bot_id] -= 1 self.condition.notify_all() # 대기 중인 모든 태스크 깨우기 ``` ### 긴급 메시지 탐지 로직 제목(subject)에서 긴급 메시지를 탐지하는 두 가지 패턴: ```python def is_urgent_message(subject: str) -> bool: if not subject: return False subject_upper = subject.upper() # 패턴 1: URG 접두사 if subject_upper.startswith('URG'): return True # 패턴 2: URGENT 키워드 포함 if 'URGENT' in subject_upper: return True return False ``` ## 프로덕션 검증 포인트 실제 운영 환경에서 다음 로그를 통해 동작을 확인했습니다: 1. **PER_BOT_WAIT**: 봇이 자신의 슬롯 제한으로 대기 중 2. **PER_BOT_RESERVE**: 봇이 슬롯을 성공적으로 확보 3. **긴급 메시지 우회**: 우회 처리된 메시지 건수 ## 알려진 제약사항 시스템 재시작 시 큐에 남아있는 메시지를 복구하는 로직(`recover_queued_messages`)은 **시작 시점에만** 실행되며, 5분 이상 된 메시지만 대상으로 합니다. 이는 중복 처리를 방지하기 위한 설계입니다. ## 결론 ### 핵심 요약 - **공정성**: 수신자별 슬롯 제한으로 리소스 독점 방지 - **우선순위**: 긴급 메시지는 제한 우회 - **동기화**: `asyncio.Condition`으로 안전한 상태 관리 ### 다음 단계 이 패턴은 메시지 큐뿐만 아니라 API 요청 제한, 데이터베이스 커넥션 풀, 워커 스케줄링 등 다양한 리소스 관리 시나리오에 적용할 수 있습니다. 특히 멀티테넌트 환경에서 테넌트별 공정성을 보장해야 할 때 유용합니다. 다음 글에서는 메시지 우선순위 큐와 백프레셔(backpressure) 처리 전략을 다루겠습니다.
#메시지큐#asyncio#공정스케줄링#Python#분산시스템#리소스관리
공유하기:

이 주제에 대해 더 알아보고 싶으신가요?

프로젝트 상담을 통해 맞춤형 솔루션을 제안받으세요.