State Schema for Resumable Agents
from dataclasses import dataclass, asdict
from typing import Optional
import json
@dataclass
class AgentState:
"""Structured state that survives session boundaries."""
# Identity
run_id: str
created_at: str
updated_at: str
# Task tracking
original_goal: str
current_phase: str # "exploration" | "planning" | "implementation" | "verification"
completed_tasks: list[str]
failed_tasks: list[dict] # {task_id, error, retryable}
remaining_tasks: list[str]
# Confirmed findings (structured, not prose)
confirmed_facts: dict # {fact_key: fact_value}
decisions_made: list[str]
files_examined: list[str]
# Quality tracking
iteration_count: int
error_count: int
last_successful_op: str
def save(self, storage) -> None:
storage.save(
key=f"agent_state:{self.run_id}",
value=asdict(self),
ttl_hours=72 # State expires after 3 days
)
@classmethod
def load(cls, storage, run_id: str) -> Optional['AgentState']:
data = storage.load(f"agent_state:{run_id}")
if not data:
return None
# Check expiration
age_hours = calculate_age_hours(data["updated_at"])
if age_hours > 72:
return None # Stale state
return cls(**data)
def to_briefing_prompt(self) -> str:
"""Convert state to a prompt for session rehydration."""
return f"""
RESUMING RUN {self.run_id}
ORIGINAL GOAL: {self.original_goal}
CURRENT PHASE: {self.current_phase}
PROGRESS:
- Completed: {len(self.completed_tasks)} tasks
- Failed: {len(self.failed_tasks)} tasks
- Remaining: {len(self.remaining_tasks)} tasks
CONFIRMED FACTS:
{json.dumps(self.confirmed_facts, indent=2)}
DECISIONS MADE:
{chr(10).join(f"- {d}" for d in self.decisions_made)}
NEXT TASKS TO COMPLETE:
{chr(10).join(f"- {t}" for t in self.remaining_tasks[:5])}
Continue from this exact point. Do not redo completed work.
"""
Checkpoint Strategy
class CheckpointManager:
"""Save state at key decision points, not just at the end."""
CHECKPOINT_EVENTS = {
"task_completed",
"phase_changed",
"significant_finding",
"error_occurred",
}
async def run_with_checkpoints(self, agent, initial_state: AgentState) -> dict:
state = initial_state
while state.remaining_tasks:
task = state.remaining_tasks[0]
try:
result = await agent.execute_task(task, state)
# Update state
state.completed_tasks.append(task)
state.remaining_tasks.remove(task)
state.last_successful_op = task
state.iteration_count += 1
# Checkpoint after every task completion
state.updated_at = utcnow()
state.save(self.storage)
except Exception as e:
state.failed_tasks.append({"task": task, "error": str(e)})
state.remaining_tasks.remove(task)
state.error_count += 1
state.save(self.storage) # Checkpoint failures too
return state.to_final_report()
Key Takeaways
- Structured state not raw history — decisions, facts, progress — not transcript
- Checkpoint after every task — not just at the end
- State expiration — don’t rehydrate stale state without checking timestamps
- to_briefing_prompt() — convert state to a usable session initialization prompt
- Separate fast and slow state — in-memory for current session, storage for cross-session