Is there an existing issue for this?
What happened?
Queue workers use inefficient polling with asyncio.sleep(0.1) in a tight loop, causing CPU waste and up to 100ms message processing delays.
Location: backend/app/core/orchestration/queue_manager.py:99-124
Current Behavior
async def _worker(self, worker_name: str):
queues = [...]
while self.running:
for queue in queues:
message = await queue.get(no_ack=False, fail=False)
if message:
# process message...
await asyncio.sleep(0.1) # ❌ Polls every 100ms even when empty
Impact:
- 30 polls/second (10 per worker × 3 workers) when queues are empty
- Up to 100ms delay before messages are processed
- Unnecessary CPU usage during idle periods
Expected Behavior
Workers should block and wait for messages instead of polling:
- Near-instant message processing (eliminate 100ms delay)
- ~90% reduction in CPU usage during idle
- Better resource efficiency
Proposed Solution
Use asyncio.wait() to block until messages arrive:
async def _worker(self, worker_name: str):
queues = [
await self.channel.declare_queue(self.queues[priority], durable=True)
for priority in [QueuePriority.HIGH, QueuePriority.MEDIUM, QueuePriority.LOW]
]
while self.running:
try:
done, pending = await asyncio.wait(
[queue.get(no_ack=False) for queue in queues],
return_when=asyncio.FIRST_COMPLETED,
timeout=1.0
)
if not self.running:
break
for task in done:
message = await task
if message:
item = json.loads(message.body.decode())
await self._process_item(item, worker_name)
await message.ack()
for task in pending:
task.cancel()
except asyncio.CancelledError:
return
except Exception as e:
logger.error(f"Worker {worker_name} error: {e}")
await asyncio.sleep(1)
Testing Checklist
Performance Targets
- CPU reduction: ~90% during idle
- Latency: <10ms (from current ~100ms)
- Throughput: Maintain or improve
Additional Context
This is part of a broader performance optimization effort. The current implementation works but is inefficient.
Is there an existing issue for this?
What happened?
Queue workers use inefficient polling with
asyncio.sleep(0.1)in a tight loop, causing CPU waste and up to 100ms message processing delays.Location:
backend/app/core/orchestration/queue_manager.py:99-124Current Behavior
Impact:
Expected Behavior
Workers should block and wait for messages instead of polling:
Proposed Solution
Use
asyncio.wait()to block until messages arrive:Testing Checklist
Performance Targets
Additional Context
This is part of a broader performance optimization effort. The current implementation works but is inefficient.