fix(consolidation): use checkpoint-based trim instead of relative count
**Problem:** When memory_consolidate is called mid-turn, deferred trim was using a relative count (_pending_trim) that gets applied after the turn completes. This caused the trim to recalculate the cut point based on the FINAL session size (after messages were added), and _trim_to_clean_boundary would walk backward to find a user message, often landing at the START of the current turn and wiping all prior history. Example: Session with 426 messages, consolidate sets pending_trim=10, turn grows to 440 messages, trim calculates cut=430, finds no user messages in 430-439 (all tool chain), walks back to position 426 (current turn start), wipes messages 0-425. **Solution:** Replace relative count with absolute checkpoint position: - At consolidation time: calculate checkpoint = len(session) - keep_count - Find clean boundary at or before checkpoint (not after turn completes) - Store absolute position in session._trim_checkpoint - At trim time: simply slice session.messages[checkpoint:] This preserves the intended trim point regardless of messages added during the remainder of the turn. **Testing:** Hot-patched and verified: - Before: consolidation wiped all history, kept only current turn (16 msgs) - After: consolidation preserved history correctly (23 msgs from before consolidation) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
+36
-10
@@ -589,15 +589,16 @@ class AgentLoop:
|
||||
session.add_raw_message(chain_msg)
|
||||
self.sessions.save(session)
|
||||
|
||||
# Deferred trim: if memory_consolidate ran mid-turn, it set a pending trim
|
||||
# flag instead of mutating the live session. Now that the turn's tool chain
|
||||
# is fully saved, we can safely trim at a clean boundary.
|
||||
pending = getattr(session, '_pending_trim', 0)
|
||||
if pending > 0:
|
||||
session.messages = self._trim_to_clean_boundary(session.messages, pending)
|
||||
session._pending_trim = 0
|
||||
# Deferred trim: if memory_consolidate ran mid-turn, it set a checkpoint
|
||||
# marking where to trim. Now that the turn's tool chain is fully saved,
|
||||
# we can safely trim to that checkpoint.
|
||||
checkpoint = getattr(session, '_trim_checkpoint', None)
|
||||
if checkpoint is not None:
|
||||
old_size = len(session.messages)
|
||||
session.messages = session.messages[checkpoint:]
|
||||
session._trim_checkpoint = None
|
||||
self.sessions.save(session)
|
||||
logger.info(f"Deferred trim applied, session now {len(session.messages)} messages")
|
||||
logger.info(f"Deferred trim applied: {old_size} -> {len(session.messages)} messages (checkpoint={checkpoint})")
|
||||
|
||||
return OutboundMessage(
|
||||
channel=msg.channel,
|
||||
@@ -810,6 +811,26 @@ class AgentLoop:
|
||||
metadata=outbound_metadata,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _find_clean_boundary_before(messages: list[dict], target_pos: int) -> int:
|
||||
"""Find a clean user message boundary at or before target position.
|
||||
|
||||
Returns the index of a user message at or before target_pos,
|
||||
or target_pos if no user message is found.
|
||||
"""
|
||||
if not messages or target_pos <= 0:
|
||||
return 0
|
||||
if target_pos >= len(messages):
|
||||
return len(messages)
|
||||
|
||||
# Walk backward from target to find a user message
|
||||
for i in range(target_pos, -1, -1):
|
||||
if messages[i].get("role") == "user":
|
||||
return i
|
||||
|
||||
# No user message found, return target position
|
||||
return target_pos
|
||||
|
||||
@staticmethod
|
||||
def _trim_to_clean_boundary(messages: list[dict], keep_count: int) -> list[dict]:
|
||||
"""Trim messages to approximately keep_count, starting at a user message boundary.
|
||||
@@ -870,8 +891,13 @@ class AgentLoop:
|
||||
logger.info("Mem0 consolidation done, session cleared (archive_all)")
|
||||
else:
|
||||
keep_count = min(10, max(2, self.memory_window // 2))
|
||||
session._pending_trim = keep_count
|
||||
logger.info(f"Mem0 consolidation done, trim deferred (keep={keep_count})")
|
||||
# Set checkpoint at current session size minus keep_count
|
||||
# This preserves the intended trim point regardless of messages added later
|
||||
checkpoint = max(0, len(session.messages) - keep_count)
|
||||
# Find clean boundary at or before checkpoint
|
||||
checkpoint = self._find_clean_boundary_before(session.messages, checkpoint)
|
||||
session._trim_checkpoint = checkpoint
|
||||
logger.info(f"Mem0 consolidation done, trim deferred (checkpoint={checkpoint}, current_size={len(session.messages)})")
|
||||
return
|
||||
else:
|
||||
memory = MemoryStore(self.workspace)
|
||||
|
||||
Reference in New Issue
Block a user