Chat-based agents: how to handle message bursts
We are all so familiar with the ChatGPT assistant interface that forget that we assume that users will send one message at a time, but they actually chat in bursts.
We are all so familiar with the ChatGPT assistant interface that forget that we assume that users will send one message at a time. See, ChatGPT blocks the “send” button while it is processing a message, which prevents users from sending multiple messages in quick succession. This design choice helps to manage the flow of conversation and ensures that the assistant can respond to each message in turn.
But Whatsapp-based chat interfaces operate differently. They allow users to send multiple messages in bursts, with irregular intervals between messages. Now, what happens when you try to answer two messages as they arrive?
(This started after reading a post from Lucas Rolim on LinkedIn about this problem, where I shared my solution and how I arrived at it).
Scenario I
Your AI agent relies on some global state that interferes with multiple execution, so responding to a second message while working on the first can even crash your application.
User: Hi!
User: Is there any coupé available?
Agent: Hello! How can I assist you today?
User: Is there?
Agent: Hi, what are you looking for?
The second user message crashed the agent, which was still processing the first message. We failed to write the second message to the conversation history, and the agent lost the context of the conversation.
Scenario II
Your AI agent can answer two messages in parallel, but the first message was “Hi!” and the second was “Is there any 2020 Studebaker available?” In this case, the agent replies two messages, one asking about the user intents and the second providing the information about the car.
User: Hi!
User: Is there any coupé available?
User: Is there any 2020 Studebaker available?
Agent: Hello! How can I assist you today?
Agent: No, we don’t have any coupés available at the moment.
Agent: Yes, we have a 2020 Studebaker available. Would you like more details?
This can lead to very confuse conversations, giving users a bad experience, while your typical client wants an agent that can at least match human performance.
We must therefore ensure the agent:
Processes messages in the order they were received.
Understands that users send messages in series, and wait for all messages to arrive before responding.
If the user sends a message while the agent is processing, that message must be taken into account in the response (This requires either a retry or a cancellation mechanism).
Approach 1: Locks & Cancellation
We could use locks and cancellation to check if the agent is busy processing a message.
But using locks has several downsides:
Locks are hard to work with, specially if your team is not very experienced with concurrent programming.
Locks can lead to deadlocks, which can crash your application. Debugging deadlocks is hard.
Locks introduce overhead, which can slow down your application.
And cancellation has its own problems:
Your concurrent task must check either check for cancellation or be able to be interrupted. This implies either 1) your programming language supports preemptive multitasking, or 2) you have to sprinkle your code with cancellation checks, which is error-prone and hard to maintain.
Anyways, it can be done, for example in Python with threading locks (or any other lock, for that matter):
def process_message(user_id, message_role, message_text, message_timestamp, current_time):
lock = get_user_lock(user_id)
if lock.acquire(blocking=False):
try:
# Process the message
response = generate_response(user_id, message_role, message_text)
save_agent_response(user_id, response)
finally:
lock.release()
else:
# Agent is busy, we can either cancel the current task or queue the message
cancel_current_task(user_id)
queue_message(user_id, message_role, message_text, message_timestamp)
Approach 2: Real-time programming (Process with loops and timers)
In this approiach, process can receive messages and process them in a loop, using asynchronous constructs such as sleep calls, intervals, delays, timers. For example, after receiving a message, we can wait for 2 seconds to see if any other messages arrive, and then process all messages in the order they were received.
This might sound over engineered but is actually quite natural to implement in languages that support the actor model, such as Erlang, Elixir, or other languages, with suitable libraries. But changes are you are not using one of these languages, and implementing this in a language that does not support the actor model can be tricky. Try writing those in Python with threads, and you will see what I mean.
-module(chat_agent).
-export([start/0, loop/1, receive_message/2, process_messages/1]).
start() ->
spawn(fun() -> loop([]) end).
loop(Messages) ->
receive
{new_message, Message} ->
NewMessages = Messages ++ [Message],
loop(NewMessages);
process ->
process_messages(Messages),
loop([])
after 2000 -> % Wait for 2 seconds
if
Messages =/= [] ->
process_messages(Messages),
loop([]);
true ->
loop(Messages)
end
end.
Or with GenServer and timers:
defmodule ChatAgent do
use GenServer
# Client API
def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def receive_message(message) do
GenServer.cast(__MODULE__, {:new_message, message})
end
# Server Callbacks
def init(state) do
schedule_processing()
{:ok, state}
end
def handle_cast({:new_message, message}, state) do
{:noreply, [message | state]}
end
def handle_info(:process_messages, messages) do
process_messages(messages)
schedule_processing()
{:noreply, []}
end
defp schedule_processing do
Process.send_after(self(), :process_messages, 2000)
end
def process_messages(messages) do
# Process the messages
end
end
Approach 3: Check all customers with unanswered messages
This approach is perhaps the simplest to implement, but if implemented naively, it is not scalable.
The idea is to have a timer that runs every few seconds, and checks for all users with unanswered messages. If a user has an unanswered message, we check if the last message was sent more than 2 seconds ago. If so, we process all messages for that user.
Given a user id:
Assuming the messages table is defined as follows:
CREATE TABLE messages (
id SERIAL PRIMARY KEY,
user_id INT NOT NULL,
message_role VARCHAR(10) NOT NULL, -- ‘user’ or ‘agent’
message_text TEXT NOT NULL,
processed BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
A query such as the following can be used to fetch unprocessed messages for a specific user that were created more than 2 seconds ago:
SELECT message_role, message_text
FROM messages
WHERE messages.user_id = $1
AND messages.processed = FALSE
AND messages.created_at <= NOW() - INTERVAL ‘2 seconds’
ORDER BY messages.created_at ASC;
To look up users with unprocessed messages, you can use a query like this:
SELECT DISTINCT user_id
FROM messages
WHERE processed = FALSE
AND created_at <= NOW() - INTERVAL ‘2 seconds’;
This is better, because it will always ensure that all messages are processed (w.r.t to the 2 second rule, for example). It also allows for very simple queries on the state of message processing.
A simpler timer can trigger the responses, such as with celery/beat:
# beat config
from celery import Celery
from celery.schedules import crontab
app.conf.beat_schedule = {
‘process-pending-messages’: {
‘task’: ‘process_pending_messages’,
‘schedule’: crontab(minute=’*’, hour=’*’),
},
}
@app.task
def process_pending_messages(message_service):
# Fetch users with unprocessed messages older than 2 seconds
users = message_service.fetch_unprocessed_users()
for user_id in users:
# fire up new task
process_user_messages.delay(process_user_messages, user_id)
@app.task
def process_user_messages(message_service, user_id):
messages = message_service.fetch_unprocessed_messages(user_id)
if messages:
# Process messages and generate response
response = generate_response(messages)
message_service.save_agent_response(user_id, response)
message_service.mark_messages_as_processed(user_id)
We are far from ideal; it will simply not scale with hundreds of thousands of users, even if they are not online at the same time, because it will be a very long table scan. It will mess either your users or messages table with a lot of writes and updates, degrading your indexing performance.
Approach 4: Use a table to keep track of users waiting for responses
This takes a single write per message, but will write to a temporary table that gets wiped as soon as the response is sent. This way, you can keep track of which users are waiting for responses without cluttering your main user table.
CREATE TABLE pending_users (
user_id INT PRIMARY KEY,
last_message_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
When a new message arrives, you can upsert the user into the pending_users table, and bump the last_message_at timestamp to the current time:
INSERT INTO pending_users (user_id, last_message_at)
VALUES ($1, CURRENT_TIMESTAMP)
ON CONFLICT (user_id)
DO UPDATE SET last_message_at = EXCLUDED.last_message_at;
And to process pending users, we use DELETE with RETURNING to fetch and remove users who have been waiting for more than 2 seconds:
DELETE FROM pending_users
WHERE last_message_at <= NOW() - INTERVAL ‘2 seconds’
SKIP LOCKED
RETURNING user_id;
This will return all user ids that are ready to be processed, and remove them from the pending_users table in a single atomic operation. This ensures that no two processes will handle the same user at the same time, preventing double processing.
Then, for each user id returned, you can fetch and process their messages as in the previous approach.
An important aspect here is to use UPSERTS and DELETE SKIP LOCKED RETURNING so we can query this jobs table and remove all due messages efficiently, while keeping the ones that should wait a bit more.
This is the winning approach for me, since it balances simplicity, a guarantee that every single message is processed properly, and minimal impact on the database performance.
Conclusion
Handling message bursts in chat-based agents is a complex problem that requires careful consideration of concurrency, state management, and user experience. That said, we have solutions at hand that avoid engineering complex queuing systems, and that can be implemented with relative ease in most programming languages and frameworks.


