We serialize inbound auto-reply runs (all channels) through a tiny in-process queue to prevent multiple agent runs from colliding, while still allowing safe parallelism across sessions.
runEmbeddedPiAgent enqueues by session key (lane session:<key>) to guarantee only one active run per session.main by default) so overall parallelism is capped by agents.defaults.maxConcurrent.Inbound messages can steer the current run, wait for a followup turn, or do both:
steer: inject immediately into the current run (cancels pending tool calls after the next tool boundary). If not streaming, falls back to followup.followup: enqueue for the next agent turn after the current run ends.collect: coalesce all queued messages into a single followup turn (default). If messages target different channels/threads, they drain individually to preserve routing.steer-backlog (aka steer+backlog): steer now and preserve the message for a followup turn.interrupt (legacy): abort the active run for that session, then run the newest message.queue (legacy alias): same as steer.Steer-backlog means you can get a followup response after the steered run, so
streaming surfaces can look like duplicates. Prefer collect/steer if you want
one response per inbound message.
Send /queue collect as a standalone command (per-session) or set messages.queue.byChannel.discord: "collect".
Defaults (when unset in config):
collectConfigure globally or per channel via messages.queue:
{
messages: {
queue: {
mode: "collect",
debounceMs: 1000,
cap: 20,
drop: "summarize",
byChannel: { discord: "collect" },
},
},
}
Options apply to followup, collect, and steer-backlog (and to steer when it falls back to followup):
debounceMs: wait for quiet before starting a followup turn (prevents “continue, continue”).cap: max queued messages per session.drop: overflow policy (old, new, summarize).Summarize keeps a short bullet list of dropped messages and injects it as a synthetic followup prompt.
Defaults: debounceMs: 1000, cap: 20, drop: summarize.
/queue <mode> as a standalone command to store the mode for the current session./queue collect debounce:2s cap:25 drop:summarize/queue default or /queue reset clears the session override.main) is process-wide for inbound + main heartbeats; set agents.defaults.maxConcurrent to allow multiple sessions in parallel.cron, subagent) so background jobs can run in parallel without blocking inbound replies.