HiveLoop — User Manual Part 7: Operational Events Integration Guide #
Version: 0.1.0 Last updated: 2026-02-12
Plans, escalations, issues, retries, and queue snapshots — the methods that turn a timeline into a narrative your ops team can act on.
Table of Contents #
- What This Guide Covers
- The Integration Mindset
- Plans —
task.plan()andtask.plan_step() - Escalations —
task.escalate() - Issues —
agent.report_issue() - Queue Snapshots —
agent.queue_snapshot() - Retries —
task.retry() - Tool Execution Tracking —
agent.track_context() - Putting It All Together
- Finding Integration Points in Your Codebase
- Validation Checklists
- Common Mistakes
1. What This Guide Covers #
This guide covers five SDK methods that add operational context to your agent's telemetry. These are Layer 2 events — they build on top of Layer 0 (init + heartbeat) and Layer 1 (tasks + actions), adding the narrative that answers "why did it fail?" and "what is it waiting for?"
| Method | Scope | What it answers |
|---|---|---|
task.plan() + task.plan_step() |
Task | What is the agent's strategy? Which step failed? How far did it get? |
task.escalate() |
Task | When did the agent decide it needs help? Who did it hand off to? |
agent.report_issue() |
Agent | What persistent problems has the agent detected? |
agent.queue_snapshot() |
Agent | How deep is the work queue? Is the agent falling behind? |
task.retry() |
Task | How many retries? What triggered them? How much time is lost? |
agent.track_context() |
Task | Which tools were called? How long did each take? Which failed? |
Prerequisites #
Before integrating these methods, you should have:
- [x] Layer 0 working —
hiveloop.init()+hb.agent(), agents visible with heartbeats - [x] Layer 1 working —
agent.task()+@agent.track(), tasks and actions in the timeline - [x] A plumbing pattern in place —
contextvars, parameter passing, or framework context (see Part 3, Section 5)
If Layer 1 isn't working yet, go back to Part 3 (Instrumentation Guide) and Part 4 (Layer 1 — What to Expect). These operational events build on task context and are meaningless without it.
2. The Integration Mindset #
These five methods share a pattern that's different from task.llm_call() and @agent.track():
LLM calls and action tracking are about measuring — they record something that already happened (a function ran, a model was called). You find the call site and add a line after it.
Operational events are about narrating — they describe the agent's decisions, problems, and state transitions. The code that triggers them often doesn't look like a function call you can easily spot. You need to understand the agent's decision-making flow to know where to add them.
What to look for #
| Method | Code pattern to search for |
|---|---|
task.plan() |
Step lists, strategy objects, workflow definitions, phase arrays |
task.plan_step() |
Progress tracking, step iteration, phase transitions |
task.escalate() |
Handoffs, delegation, "needs human review" logic |
agent.report_issue() |
Error detection outside of task failures, degraded conditions, health checks |
agent.queue_snapshot() |
Work queues, job lists, pending items, inbox polling |
task.retry() |
Retry loops, backoff logic, tenacity decorators, for attempt in range(N) |
3. Plans — task.plan() and task.plan_step() #
3.1 What it does #
When your agent creates a multi-step strategy — "first do X, then Y, then Z" — task.plan() makes that strategy visible on the dashboard. task.plan_step() updates each step's status as the agent progresses.
On the dashboard, this renders as a plan progress bar above the timeline:
[■ Search CRM] [■ Score lead] [▪ Generate email] [ Update CRM ]
completed completed in progress not started
3.2 The API #
task.plan(goal, steps) — declare the plan:
task.plan(
"Process and route incoming lead", # goal — what the plan achieves
[ # steps — ordered list of step descriptions
"Search CRM for existing record",
"Score lead against criteria",
"Generate follow-up email",
"Update CRM with outcome",
],
)
| Parameter | Type | Required | Description |
|---|---|---|---|
goal |
str | Yes | What the plan aims to achieve |
steps |
list[str] | Yes | Ordered step descriptions. Order determines step indices (0, 1, 2, ...) |
revision |
int | No | Plan revision number. Default 0. Increment when the agent replans |
task.plan_step(step_index, action, summary) — update a step:
task.plan_step(0, "started", "Searching CRM for lead #4801")
# ... step executes ...
task.plan_step(0, "completed", "Found existing CRM record",
turns=2, tokens=3200)
| Parameter | Type | Required | Description |
|---|---|---|---|
step_index |
int | Yes | Zero-based position in the steps list |
action |
str | Yes | "started", "completed", "failed", "skipped" |
summary |
str | Yes | Outcome or status note |
total_steps |
int | No | Auto-inferred from task.plan() if previously called |
turns |
int | No | LLM turns spent on this step |
tokens |
int | No | Total tokens spent on this step |
plan_revision |
int | No | Correlates with task.plan() revision |
3.3 Finding WHERE to add plans #
Search your codebase for code that:
Creates a sequence of steps to execute. Look for lists, arrays, or objects that define a strategy:
python# Pattern A — explicit step list steps = ["fetch data", "analyze", "generate report", "send email"] # Pattern B — workflow/pipeline definition pipeline = [FetchStage(), AnalyzeStage(), ReportStage()] # Pattern C — LLM-generated plan plan = llm.create_plan(objective) # returns structured stepsIterates through stages or phases. The agent processes work in a defined order:
pythonfor i, step in enumerate(pipeline.stages): step.execute(context)Tracks progress through sequential work. Counters, phase variables, or state machines:
pythoncurrent_phase = "scoring" # ... later ... current_phase = "routing"
3.4 Integration pattern #
Once you've found the plan creation point and the step execution loop, add instrumentation:
from myproject.observability import get_current_task
def execute_plan(objective, data):
task = get_current_task()
# Step 1: Agent creates its plan
steps = planner.generate_steps(objective)
if task:
task.plan(objective, [s.description for s in steps])
# Step 2: Execute each step, tracking progress
for i, step in enumerate(steps):
if task:
task.plan_step(i, "started", step.description)
try:
result = step.execute(data)
if task:
task.plan_step(i, "completed", f"{step.description} — {result.summary}")
except Exception as e:
if task:
task.plan_step(i, "failed", f"{step.description} — {e}")
raise
3.5 Replanning #
If the agent changes its plan mid-execution (e.g., after a step fails and it creates a new strategy), call task.plan() again with an incremented revision:
# Original plan failed at step 2
task.plan_step(2, "failed", "Email API returned 403")
# Agent replans
new_steps = planner.replan(objective, failed_step=2)
task.plan("Revised: route to manual review", [s.description for s in new_steps], revision=1)
# Continue with new plan
task.plan_step(0, "started", "Notifying manager")
The dashboard shows the latest plan. Previous plan events remain in the timeline for the full history.
3.6 loopCore example #
In loopCore, the planning system creates execution plans with explicit step lists:
Plan creation — planning.py:
# After the LLM generates a plan:
plan_steps = parse_plan_response(plan_response)
task = get_current_task()
if task:
try:
task.plan(
objective,
[step["description"] for step in plan_steps],
)
except Exception:
pass
Step execution — loop.py:
# As each step is processed in the agent loop:
task = get_current_task()
if task:
try:
task.plan_step(step_index, "started", f"Executing: {step_name}")
except Exception:
pass
# ... step executes ...
if task:
try:
task.plan_step(step_index, "completed", step_result_summary,
turns=turns_used, tokens=tokens_spent)
except Exception:
pass
3.7 Dashboard impact #
| Element | What appears |
|---|---|
| Timeline | Plan progress bar above the main track. Green = completed, blue = in progress, red = failed, gray = not started |
| Timeline detail | Click a plan node to see goal, step count, and revision |
| Activity Stream | plan_created and plan_step events with step index and status |
4. Escalations — task.escalate() #
4.1 What it does #
task.escalate() records the moment an agent decides it cannot handle something alone. This is a critical operational signal — it means the agent needs human intervention, a different agent, or a different approach.
4.2 The API #
task.escalate(
"Lead score 0.12 — below threshold, needs manual review",
assigned_to="senior-sales",
)
| Parameter | Type | Required | Description |
|---|---|---|---|
summary |
str | Yes | Why the agent escalated — this appears on the timeline and in the Activity Stream |
assigned_to |
str | No | Who or what receives the escalation (person, team, queue, agent) |
reason |
str | No | Additional reason detail (if summary isn't enough) |
4.3 Finding WHERE to add escalations #
Escalation points are where the agent says "I can't handle this." Search for:
Threshold checks that route to humans:
pythonif confidence < MIN_CONFIDENCE: notify_human(result) # ← escalation pointError handling that delegates:
pythonexcept PermissionError: queue_for_manual_review(item) # ← escalation pointDecision logic with a "give up" branch:
pythonif retries_exhausted: hand_off_to_senior(task) # ← escalation pointExplicit escalation methods or flags:
pythondef escalate_to_human(self, reason): # ← the method name itself is the clue self.status = "escalated"LLM-driven escalation decisions:
pythonif llm_decision == "escalate": assign_to_human(context) # ← escalation point
4.4 Integration pattern #
Add task.escalate() at the point where the escalation decision is made — before the actual handoff logic:
from myproject.observability import get_current_task
def handle_low_confidence_result(item, score):
task = get_current_task()
if task:
try:
task.escalate(
f"Score {score:.2f} below threshold — needs manual review",
assigned_to="senior-sales",
)
except Exception:
pass
# Existing escalation logic (unchanged):
queue_for_review(item, assignee="senior-sales")
4.5 loopCore example #
In loopCore, escalation happens when the reflection engine decides the agent should hand off:
loop.py — reflection returns "escalate":
if reflection_decision == "escalate":
task = get_current_task()
if task:
try:
task.escalate(
f"Agent escalated: {reflection_reason}",
assigned_to="human-reviewer",
)
except Exception:
pass
# Existing escalation flow continues:
create_escalation_event(agent, reason=reflection_reason)
4.6 Escalation vs. approval #
These are related but different:
| Concept | When to use | Example |
|---|---|---|
| Escalation | Agent hands off work entirely | "I can't handle this billing dispute — routing to senior support" |
| Approval request | Agent pauses and waits for permission | "I want to issue a $500 credit — need manager approval before proceeding" |
If the agent stops working and waits, use task.request_approval() (see Part 5, Section 5). If the agent passes the work to someone else and moves on, use task.escalate().
Some workflows involve both — escalate, then wait for approval:
task.escalate("Complex case — needs senior review", assigned_to="senior-support")
task.request_approval("Approval needed for account credit", approver="support-lead")
# ... agent waits ...
task.approval_received("Credit approved", approved_by="support-lead", decision="approved")
4.7 Dashboard impact #
| Element | What appears |
|---|---|
| Timeline | Amber escalation node with the summary text |
| Activity Stream | escalated event, visible under the "human" filter |
| Agent card | No direct badge change (escalation doesn't block the agent, unlike approval) |
5. Issues — agent.report_issue() #
5.1 What it does #
agent.report_issue() lets agents self-report persistent problems. Unlike task failures (which are automatic and per-task), issues are agent-level and persistent — they represent ongoing conditions like "CRM API is returning 403s" or "data quality is degrading."
Issues stay active on the dashboard until explicitly resolved with agent.resolve_issue().
5.2 The API #
Report an issue:
agent.report_issue(
summary="CRM API returning 403 for workspace queries",
severity="high",
issue_id="crm-403",
category="permissions",
context={"api": "salesforce", "error_code": 403, "last_seen": "2026-02-12T14:30:00Z"},
occurrence_count=3,
)
| Parameter | Type | Required | Description |
|---|---|---|---|
summary |
str | Yes | Issue description. Used for dedup if no issue_id |
severity |
str | Yes | "critical", "high", "medium", "low" |
issue_id |
str | No | Stable identifier for lifecycle tracking. Strongly recommended |
category |
str | No | Classification: "permissions", "connectivity", "configuration", "data_quality", "rate_limit", "other" |
context |
dict | No | Arbitrary debugging data — API names, error codes, timestamps |
occurrence_count |
int | No | Agent-tracked count of how many times this has happened |
Resolve an issue:
agent.resolve_issue(
"CRM API recovered — returning 200 again",
issue_id="crm-403",
)
| Parameter | Type | Required | Description |
|---|---|---|---|
summary |
str | Yes | Resolution message |
issue_id |
str | No | Must match the original report for lifecycle tracking |
5.3 Issues vs. task failures — when to use which #
This is the most common source of confusion. Here's the distinction:
| Task failure | Agent issue | |
|---|---|---|
| Scope | One specific task | The agent overall |
| Trigger | Exception inside agent.task() |
Agent detects an ongoing problem |
| Lifecycle | Automatic — emitted when the task's context manager catches an exception | Manual — you call report_issue() and resolve_issue() |
| Duration | Instantaneous — a single failed attempt | Persistent — stays until resolved |
| Example | "Task #4801 failed: ConnectionError" | "CRM API has been returning 403 for the last 30 minutes" |
| Dashboard | Red dot on the task row, task_failed in Activity Stream |
Red badge on the agent card, Issues table in Pipeline tab |
Rule of thumb: If the problem would go away by retrying the task, it's a task failure. If the problem persists across multiple tasks, it's an issue.
5.4 Finding WHERE to add issue reporting #
Look for code that detects persistent problems — not individual errors, but patterns:
Retry exhaustion with circuit-breaker logic:
pythonif consecutive_failures >= FAILURE_THRESHOLD: self.circuit_open = True # ← issue point log.warning("Circuit breaker opened for CRM API")Health check failures:
pythondef health_check(self): if not self.api_client.ping(): log.warning("API health check failed") # ← issue pointRate limit detection:
pythonif response.status_code == 429: self.rate_limited = True # ← issue point self.backoff_until = time.time() + retry_afterData quality checks:
pythonif invalid_records / total_records > 0.10: log.warning("10%+ records invalid") # ← issue pointConfiguration problems detected at runtime:
pythonif not os.environ.get("API_KEY"): log.error("API_KEY not configured") # ← issue point
5.5 Integration pattern #
The typical pattern has three parts: detect, report, and resolve.
# Detection — when the agent discovers a problem:
def on_api_error(self, error, api_name):
self._error_counts[api_name] = self._error_counts.get(api_name, 0) + 1
if self._error_counts[api_name] >= 3:
hiveloop_agent = get_hiveloop_agent(self.agent_name)
if hiveloop_agent:
try:
hiveloop_agent.report_issue(
summary=f"{api_name} consistently failing: {error}",
severity="high",
issue_id=f"api-error-{api_name}",
category="connectivity",
context={
"api": api_name,
"error": str(error),
"consecutive_failures": self._error_counts[api_name],
},
occurrence_count=self._error_counts[api_name],
)
except Exception:
pass
# Resolution — when the problem goes away:
def on_api_success(self, api_name):
if self._error_counts.get(api_name, 0) >= 3:
hiveloop_agent = get_hiveloop_agent(self.agent_name)
if hiveloop_agent:
try:
hiveloop_agent.resolve_issue(
f"{api_name} recovered",
issue_id=f"api-error-{api_name}",
)
except Exception:
pass
self._error_counts[api_name] = 0
5.6 Severity guidelines #
| Severity | When to use | Example |
|---|---|---|
critical |
Agent cannot function at all | "No API key configured", "Database unreachable" |
high |
Agent can work but a major capability is degraded | "CRM API returning 403", "LLM rate limited" |
medium |
Agent works but output quality is reduced | "Enrichment data stale", "Fallback model in use" |
low |
Informational — something the ops team should know | "Cache miss rate high", "Slow response times" |
5.7 The issue_id pattern #
Always use issue_id for issues that can be resolved. Without it, deduplication is hash-based on the summary text, which is fragile.
Good issue_id patterns:
issue_id="crm-api-403" # API + error code
issue_id="rate-limit-openai" # category + service
issue_id=f"data-quality-{table}" # category + entity
Avoid:
issue_id=str(uuid.uuid4()) # unique per occurrence — defeats dedup
issue_id="error" # too generic — all issues collapse into one
5.8 loopCore example #
In loopCore, the issue reporting tool (report_issue) is already a first-class agent capability. The agent calls it when it detects problems with its tools:
issue_tools.py — when a tool consistently fails:
def on_tool_failure(agent_name, tool_name, error, consecutive_count):
hiveloop_agent = get_hiveloop_agent(agent_name)
if hiveloop_agent and consecutive_count >= 3:
try:
hiveloop_agent.report_issue(
summary=f"Tool '{tool_name}' failing: {error}",
severity="high",
issue_id=f"tool-failure-{tool_name}",
category="connectivity",
context={
"tool": tool_name,
"error": str(error),
"consecutive_failures": consecutive_count,
},
occurrence_count=consecutive_count,
)
except Exception:
pass
Resolution — when the tool succeeds again:
def on_tool_success(agent_name, tool_name):
hiveloop_agent = get_hiveloop_agent(agent_name)
if hiveloop_agent:
try:
hiveloop_agent.resolve_issue(
f"Tool '{tool_name}' recovered",
issue_id=f"tool-failure-{tool_name}",
)
except Exception:
pass
5.9 Dashboard impact #
| Element | What appears |
|---|---|
| Agent card | Red issue badge (e.g. "● 1 issue") — visible in The Hive |
| Pipeline tab | Active Issues table with severity, category, occurrence count |
| Activity Stream | Issue events with warning icons; resolved issues also appear |
| Stats | active_issues count in agent stats |
Issues persist until explicitly resolved. If you report an issue and never resolve it, the red badge stays on the agent card permanently. This is by design — persistent problems should remain visible until someone addresses them.
6. Queue Snapshots — agent.queue_snapshot() #
6.1 What it does #
agent.queue_snapshot() reports the current state of the agent's work queue. This gives the dashboard visibility into how much work is pending, how old the oldest item is, and what's currently being processed.
6.2 The API #
agent.queue_snapshot(
depth=4,
oldest_age_seconds=120,
items=[
{"id": "job-001", "priority": "high", "source": "human",
"summary": "Review contract", "queued_at": "2026-02-12T14:28:00Z"},
{"id": "job-002", "priority": "normal", "source": "webhook",
"summary": "Process CRM update", "queued_at": "2026-02-12T14:29:00Z"},
],
processing={"id": "job-003", "summary": "Sending email",
"started_at": "2026-02-12T14:29:30Z", "elapsed_ms": 4500},
)
| Parameter | Type | Required | Description |
|---|---|---|---|
depth |
int | Yes | Number of items in the queue |
oldest_age_seconds |
int | No | Age of the oldest queued item |
items |
list[dict] | No | The actual queue entries (max ~10 for readability) |
processing |
dict | No | What's currently being processed |
Each item in items:
| Field | Type | Description |
|---|---|---|
id |
str | Item identifier |
priority |
str | "low", "normal", "high", "urgent" |
source |
str | Where the item came from ("human", "webhook", "scheduled", "agent") |
summary |
str | What the item is about |
queued_at |
str | ISO 8601 timestamp of when it was queued |
6.3 Two ways to report queue state #
Option A — Automatic via queue_provider callback (recommended):
Register a callback when creating the agent. It's called every heartbeat cycle:
agent = hb.agent(
"my-agent",
type="processor",
queue_provider=lambda: {
"depth": work_queue.qsize(),
"oldest_age_seconds": get_oldest_age(),
"items": [
{"id": item.id, "priority": item.priority, "summary": item.summary}
for item in list(work_queue.queue)[:10]
],
},
)
This is fire-and-forget — once registered, it reports queue state automatically every heartbeat (default 30 seconds).
Option B — Explicit calls (for non-standard queues):
Call agent.queue_snapshot() directly at any point:
hiveloop_agent = get_hiveloop_agent(agent_name)
if hiveloop_agent:
try:
hiveloop_agent.queue_snapshot(
depth=len(pending_items),
oldest_age_seconds=oldest_age,
items=[...],
)
except Exception:
pass
Use Option B when:
- The queue state is expensive to compute and you don't want to do it every heartbeat
- The queue is external (a database table, a Redis list, an SQS queue) and requires async access
- You want to report queue state at specific moments (e.g., after each dequeue)
6.4 Finding WHERE to add queue snapshots #
If your agent processes a work queue, the integration point depends on how the queue is structured:
In-memory queue (threading.Queue, asyncio.Queue, list):
python# Option A — callback (best): agent = hb.agent("worker", queue_provider=lambda: {"depth": q.qsize()}) # Option B — explicit, after each dequeue: item = queue.get() agent.queue_snapshot(depth=queue.qsize())Database-backed queue (polling a table):
python# Report after each poll cycle: pending = db.query("SELECT * FROM jobs WHERE status = 'pending'") agent.queue_snapshot( depth=len(pending), items=[{"id": j.id, "summary": j.description} for j in pending[:10]], )External message queue (SQS, RabbitMQ, Redis):
python# Report periodically or after each message: approx_depth = sqs_client.get_queue_attributes(QueueUrl=url, AttributeNames=["ApproximateNumberOfMessages"]) agent.queue_snapshot(depth=int(approx_depth))No explicit queue, but work arrives via events/callbacks:
python# Maintain a counter: class Agent: def __init__(self): self._pending_count = 0 def on_new_event(self, event): self._pending_count += 1 def on_event_processed(self, event): self._pending_count -= 1 def get_queue_depth(self): return self._pending_count # Register callback: agent = hb.agent("my-agent", queue_provider=lambda: {"depth": my_agent.get_queue_depth()})
6.5 loopCore example #
In loopCore, agents process events from an inbox. The queue state is the pending events list:
Agent registration with queue provider:
hiveloop_agent = hb.agent(
agent.name,
type=agent.type,
framework="loopcore",
queue_provider=lambda: {
"depth": len(agent.inbox.pending),
"oldest_age_seconds": agent.inbox.oldest_age(),
"items": [
{
"id": evt.id,
"priority": evt.priority,
"source": evt.source,
"summary": evt.summary[:100],
}
for evt in agent.inbox.pending[:10]
],
},
)
The queue_provider callback is called every heartbeat (30 seconds). No explicit queue_snapshot() calls needed — the SDK handles it.
6.6 Dashboard impact #
| Element | What appears |
|---|---|
| Agent card | Queue badge (e.g. "Q:4" in blue, "Q:8" in amber if >5) |
| Pipeline tab | Queue table with item details (ID, priority, source, summary, age) |
| Agent stats | queue_depth field in stats_1h response |
| Activity Stream | queue_snapshot events under the "pipeline" filter |
7. Retries — task.retry() #
7.1 What it does #
task.retry() records when an agent retries a failed operation. This makes retry patterns visible — how many retries happen, what causes them, and how much time is lost to backoff.
7.2 The API #
task.retry(
"Retrying after CRM API timeout",
attempt=2,
backoff_seconds=4.0,
)
| Parameter | Type | Required | Description |
|---|---|---|---|
summary |
str | Yes | What's being retried and why |
attempt |
int | No | Attempt number (1-based) |
backoff_seconds |
float | No | How long before the next attempt |
7.3 Finding WHERE to add retries #
Search for retry patterns in your code:
Explicit retry loops:
pythonfor attempt in range(max_retries): try: result = call_api() break except TransientError: time.sleep(2 ** attempt) # ← retry pointRetry decorators (tenacity, backoff):
python@retry(stop=stop_after_attempt(3), wait=wait_exponential()) def call_api(): # ← each retry is invisible without instrumentation return api_client.get(url)Conditional re-execution:
pythonwhile not success and retries < MAX: success = try_operation() if not success: retries += 1 # ← retry pointQueue-based retry (re-enqueue on failure):
pythonexcept ProcessingError: item.retry_count += 1 queue.put(item) # ← retry point
7.4 Integration pattern #
from myproject.observability import get_current_task
for attempt in range(1, max_retries + 1):
try:
result = call_external_api()
break
except TransientError as e:
if attempt < max_retries:
backoff = 2 ** attempt
task = get_current_task()
if task:
try:
task.retry(
f"Retrying after {type(e).__name__}: {e}",
attempt=attempt,
backoff_seconds=backoff,
)
except Exception:
pass
time.sleep(backoff)
else:
raise # final attempt — let the exception propagate
For tenacity-based retries, use a callback:
import tenacity
def on_retry(retry_state):
task = get_current_task()
if task:
try:
task.retry(
f"Retry attempt {retry_state.attempt_number}: {retry_state.outcome.exception()}",
attempt=retry_state.attempt_number,
backoff_seconds=retry_state.next_action.sleep if hasattr(retry_state.next_action, 'sleep') else None,
)
except Exception:
pass
@tenacity.retry(
stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_exponential(),
before_sleep=on_retry,
)
def call_api():
return api_client.get(url)
7.5 loopCore example #
In loopCore, retries happen when tool execution fails and the agent decides to try again:
loop.py — after a failed tool execution:
if should_retry and attempt < max_retries:
task = get_current_task()
if task:
try:
task.retry(
f"Retrying tool '{tool_name}' after failure: {error}",
attempt=attempt,
backoff_seconds=backoff,
)
except Exception:
pass
time.sleep(backoff)
agent.py — when a failed run creates a retry TODO:
task = get_current_task()
if task:
try:
task.retry(
f"Scheduling retry: {failure_reason}",
attempt=retry_count,
)
except Exception:
pass
7.6 Dashboard impact #
| Element | What appears |
|---|---|
| Timeline | Retry nodes showing attempt count and backoff |
| Activity Stream | retry_started events with attempt number |
Retries help you answer: "Is this agent spending most of its time retrying? Which operation causes the most retries? Is the backoff strategy appropriate?"
8. Tool Execution Tracking — agent.track_context() #
8.1 The problem #
Most agentic frameworks follow a loop: the LLM reasons, decides which tool(s) to call, then the framework executes those tools one at a time. A single turn may have zero or more tool calls. The tool name is determined at runtime by the LLM — you don't know at code-definition time which tool will be called.
Without instrumentation, tool execution is invisible on the dashboard. You see the LLM call (via task.llm_call()) and the task lifecycle, but the actual work — searching a CRM, sending an email, querying a database — is a black box.
8.2 The method #
agent.track_context(tool_name) is a context manager that wraps any code block and emits action_started + action_completed (or action_failed) events automatically.
with agent.track_context(tool_call.name) as ctx:
result = execute_tool(tool_call.name, tool_call.args)
This is the right choice over the alternatives:
| Method | Why it's not ideal for tool dispatch |
|---|---|
@agent.track("name") |
Decorator — requires the tool name at function definition time. In agentic loops, the LLM picks the tool at runtime |
task.event("tool_used", ...) |
Raw event — no automatic duration, no start/complete pairing, no nesting |
task.llm_call() |
Wrong scope — for the LLM API call itself, not the tool execution after it |
8.3 What you get automatically #
Each track_context() block gives you:
| Feature | How |
|---|---|
| Tool name on timeline | Passed as the string argument — shown on the blue action node |
| Duration | Automatic — measured from enter to exit |
| Success/failure | Automatic — exceptions propagate but get recorded as action_failed |
| Nesting | Automatic — if you're already inside a tracked action, tool calls become children in the action tree |
| Function name | Not available (unlike @agent.track() which captures fn.__qualname__). Use ctx.set_payload() if needed |
8.4 The full turn pattern #
A typical agentic turn has two parts: the LLM call and the tool execution(s). Here's how to instrument both:
import time
from myproject.observability import get_current_task
def run_turn(hiveloop_agent, messages, tool_definitions):
# 1. LLM call — the agent reasons and decides what tools to use
start = time.perf_counter()
response = llm.chat(messages, tools=tool_definitions)
elapsed = (time.perf_counter() - start) * 1000
task = get_current_task()
if task:
try:
task.llm_call(
"agent_turn",
model=response.model,
tokens_in=response.usage.input_tokens,
tokens_out=response.usage.output_tokens,
duration_ms=round(elapsed),
)
except Exception:
pass
# 2. Tool execution — zero or more per turn
for tool_call in response.tool_calls:
with hiveloop_agent.track_context(tool_call.name) as ctx:
result = tool_registry.execute(tool_call.name, tool_call.arguments)
On the dashboard timeline, this produces:
[■ agent_turn] → [● search_crm] → [● score_lead] → [■ agent_turn] → [● send_email]
claude-sonnet 0.8s 0.2s claude-sonnet 1.1s
Purple LLM nodes for the reasoning, blue action nodes for each tool, all in sequence with durations.
8.5 Attaching tool metadata #
Use ctx.set_payload() inside the context manager to add tool arguments, results, or other data. This shows up when you click the action node on the timeline:
with hiveloop_agent.track_context(tool_call.name) as ctx:
result = tool_registry.execute(tool_call.name, tool_call.arguments)
ctx.set_payload({
"args": {k: str(v)[:100] for k, v in tool_call.arguments.items()},
"result_preview": str(result)[:200],
})
Important: set_payload() adds data to the action_completed event. If the tool throws before set_payload() is reached, the action_failed event still captures the exception automatically — you don't need to handle that case.
8.6 Turns with zero tool calls #
If the LLM decides not to call any tools (e.g., a final answer turn), the for tool_call in response.tool_calls loop simply doesn't execute. No action events are emitted — only the LLM call. This is correct: the timeline shows a reasoning node with no tool execution, which tells the operator "the agent answered without using tools."
8.7 Error handling #
track_context() never swallows exceptions. If a tool throws, the exception propagates normally — but a red action_failed node appears on the timeline with the exception type and message:
with hiveloop_agent.track_context("crm_search") as ctx:
result = crm_client.search(query) # raises ConnectionError
# ConnectionError propagates — but the timeline now shows:
# [● crm_search] (red, failed)
# exception_type: ConnectionError
# exception_message: Connection refused
# duration: 2.1s
If you want to catch the error and continue (e.g., to try the next tool), wrap the context manager in your own try/except:
for tool_call in response.tool_calls:
try:
with hiveloop_agent.track_context(tool_call.name) as ctx:
result = tool_registry.execute(tool_call.name, tool_call.arguments)
except ToolError as e:
results.append({"error": str(e)})
continue
The failed action still appears on the timeline (red node), but execution continues.
8.8 Nested tool calls #
If a tool internally calls another tool (or another tracked function), the nesting is captured automatically:
with hiveloop_agent.track_context("process_lead") as ctx:
# This tool internally calls sub-tools:
with hiveloop_agent.track_context("crm_search") as ctx2:
record = crm_client.search(lead.email)
with hiveloop_agent.track_context("score_lead") as ctx3:
score = scorer.score(lead, record)
The timeline shows process_lead as a parent action with crm_search and score_lead as children, rendered as a branching tree.
8.9 loopCore example #
In loopCore, tools are dispatched by the Phase 2 loop. Each tool call from the LLM response is executed sequentially:
from loop_core.observability import get_hiveloop_agent
# In the Phase 2 tool execution loop:
for tool_use in phase2_response.tool_calls:
hiveloop_agent = get_hiveloop_agent(agent.name)
if hiveloop_agent:
with hiveloop_agent.track_context(tool_use.name) as ctx:
result = tool_runner.execute(tool_use.name, tool_use.input)
ctx.set_payload({"result_preview": str(result)[:200]})
else:
result = tool_runner.execute(tool_use.name, tool_use.input)
If you don't want to duplicate the execute call, restructure:
for tool_use in phase2_response.tool_calls:
hiveloop_agent = get_hiveloop_agent(agent.name)
if hiveloop_agent:
ctx_mgr = hiveloop_agent.track_context(tool_use.name)
else:
from contextlib import nullcontext
ctx_mgr = nullcontext()
with ctx_mgr as ctx:
result = tool_runner.execute(tool_use.name, tool_use.input)
if ctx and hasattr(ctx, 'set_payload'):
ctx.set_payload({"result_preview": str(result)[:200]})
8.10 Dashboard impact #
| Element | What appears |
|---|---|
| Timeline | Blue action nodes for each tool call, with tool name and duration |
| Timeline (failed) | Red action node with exception type and message |
| Timeline (nested) | Parent-child branching for nested tool calls |
| Activity Stream | action_started, action_completed, action_failed events |
| Activity Stream — "action" filter | Shows only tool execution events |
8.11 Validation checklist #
- [ ] Trigger a turn with 1+ tool calls
- [ ] Timeline: Blue action nodes appear with correct tool names
- [ ] Timeline: Duration is shown on each node
- [ ] Trigger a tool failure — verify red node with exception details
- [ ] Activity Stream:
action_started/action_completedevents appear - [ ] Verify tool nodes appear between LLM call nodes in the correct sequence
9. Putting It All Together #
Here's a complete example showing all methods integrated into a single agent task. This demonstrates how the events interleave to create a full operational narrative.
import hiveloop
from myproject.observability import get_current_task, get_hiveloop_agent
hb = hiveloop.init(api_key="hb_live_xxx", endpoint="http://localhost:8000")
agent = hb.agent(
"lead-qualifier",
type="sales",
queue_provider=lambda: {"depth": work_queue.qsize()}, # queue snapshot (automatic)
)
def process_lead(lead):
with agent.task(f"lead-{lead.id}", project="sales-pipeline", type="lead_processing") as task:
# Create a plan
task.plan("Qualify and route lead", [
"Search CRM for existing record",
"Score lead against criteria",
"Enrich with external data",
"Route to sales rep",
])
# Step 0: CRM search — tracked as a tool execution
task.plan_step(0, "started", "Searching CRM")
with agent.track_context("search_crm") as ctx:
crm_record = search_crm(lead.email)
task.plan_step(0, "completed", f"Found: {crm_record is not None}")
# Step 1: Score lead — tracked as a tool execution
task.plan_step(1, "started", "Scoring lead")
with agent.track_context("score_lead") as ctx:
score = score_lead(lead, crm_record)
task.plan_step(1, "completed", f"Score: {score}")
# Step 2: Enrich — with retry on failure, each attempt tracked
task.plan_step(2, "started", "Enriching lead data")
for attempt in range(1, 4):
try:
with agent.track_context("enrich_lead") as ctx:
enrichment = enrich_lead(lead)
task.plan_step(2, "completed", "Enrichment succeeded")
break
except APITimeoutError as e:
if attempt < 3:
task.retry(f"Enrichment API timeout", attempt=attempt, backoff_seconds=2.0)
time.sleep(2.0)
else:
task.plan_step(2, "failed", f"Enrichment failed after 3 attempts")
# Report persistent issue
agent.report_issue(
summary="Enrichment API consistently timing out",
severity="high",
issue_id="enrichment-timeout",
category="connectivity",
context={"api": "clearbit", "timeout_ms": 5000},
)
# Step 3: Route — with escalation for low scores
task.plan_step(3, "started", "Routing lead")
if score < 0.2:
task.escalate(
f"Lead score {score:.2f} — below threshold, needs manual review",
assigned_to="senior-sales",
)
task.plan_step(3, "completed", "Escalated to senior sales")
else:
with agent.track_context("assign_to_rep") as ctx:
assign_to_rep(lead, score)
task.plan_step(3, "completed", f"Assigned to {get_rep(score)}")
# Later, when the API recovers:
agent.resolve_issue("Enrichment API recovered", issue_id="enrichment-timeout")
On the dashboard, this task's timeline would show:
PLAN: [■ Search CRM] [■ Score lead] [■ Enrich data] [■ Route lead]
completed completed completed completed
TIMELINE:
[started] → [● search_crm 0.8s] → [● score_lead 0.2s]
→ [● enrich_lead ✗] → [retry #1] → [● enrich_lead ✗] → [retry #2]
→ [● enrich_lead 1.1s] → [▲ escalate] → [● assign_to_rep 0.3s]
→ [completed]
Blue ● nodes are tool executions (from track_context), red ✗ marks failed attempts, amber ▲ is the escalation.
Plus:
- Agent card shows queue badge from
queue_provider - If enrichment fails 3 times, red issue badge appears on the agent card
- Escalation event appears in Activity Stream under "human" filter
- Retry nodes show attempt count and backoff timing
- Each tool node shows duration — click to see payload details
10. Finding Integration Points in Your Codebase #
Here's a systematic approach to finding where each method belongs in any agentic framework:
10.1 Use this prompt with your LLM #
Ask Claude (or your preferred LLM) to analyze your codebase:
"In [your codebase], trace the execution path from the agent's main loop to task completion. For each of the following, identify the file and line where it would go:
- Where does the agent create a plan or strategy? →
task.plan()- Where does the agent iterate through plan steps? →
task.plan_step()- Where does the agent decide to hand off to a human? →
task.escalate()- Where does the agent detect persistent problems (not single failures)? →
agent.report_issue()- Where is the work queue managed? →
agent.queue_snapshot()orqueue_provider- Where does the agent retry after failure? →
task.retry()For each, give the file path, line number, and a code snippet showing the integration point."
10.2 Search patterns by framework type #
| Framework type | Plans | Escalations | Issues | Queue | Retries |
|---|---|---|---|---|---|
| Custom loop | Look for step lists in the main loop | Look for threshold checks or "give up" logic | Look for circuit breakers or error counters | Look for the queue data structure | Look for for attempt in range or while retry loops |
| LangChain | Agent's plan() method or chain-of-thought |
HumanApprovalCallbackHandler usage |
Custom tool error handling | CallbackManager or custom queue |
RetryOutputParser or custom retry logic |
| CrewAI | Crew's task planning phase | human_input=True on tasks |
Agent error handling callbacks | Crew's task queue | Built-in retry mechanisms |
| AutoGen | Multi-agent conversation planning | human_input_mode="ALWAYS" |
Agent failure handling | Message queue between agents | max_consecutive_auto_reply |
| FastAPI agent | Request processing pipeline | Error responses that route to humans | Health check endpoints | Request queue (Redis, SQS) | Middleware retry logic |
10.3 The priority order #
If you're adding all five, do it in this order (highest value first):
agent.report_issue()— low effort, high value. Find 2-3 places where you log warnings about persistent problems and addreport_issue(). Immediate pipeline tab visibility.task.plan()+task.plan_step()— medium effort. If your agent creates plans, this gives you the progress bar. If it doesn't create plans, skip this entirely.task.escalate()— low effort. Find the handoff point(s) and add one line each. Immediate Activity Stream visibility.agent.queue_snapshot()orqueue_provider— low effort if you have a queue. Register the callback at agent creation time and forget about it.task.retry()— medium effort. Depends on how many retry patterns exist. Start with the most common retry loop.
11. Validation Checklists #
After adding each method, verify on the dashboard.
11.1 Plans #
- [ ] Trigger a task that creates a plan
- [ ] Timeline: Plan progress bar appears above the timeline track
- [ ] Timeline: Steps show correct colors (gray → blue → green or red)
- [ ] Activity Stream:
plan_createdevent appears with goal and step count - [ ] Activity Stream:
plan_stepevents appear as steps progress - [ ] Trigger a step failure — verify red segment in plan bar
11.2 Escalations #
- [ ] Trigger a task that escalates
- [ ] Timeline: Amber escalation node appears with summary text
- [ ] Activity Stream:
escalatedevent appears - [ ] Activity Stream: "human" filter includes the escalation event
11.3 Issues #
- [ ] Trigger an issue report (e.g., fail an API call 3 times)
- [ ] Agent card: Red issue badge appears (e.g., "● 1 issue")
- [ ] Pipeline tab: Issue appears with correct severity and category
- [ ] Resolve the issue — verify badge disappears
- [ ] Activity Stream: Both report and resolve events appear
11.4 Queue snapshots #
- [ ] Register
queue_provideron agent - [ ] Wait 30 seconds (one heartbeat cycle)
- [ ] Agent card: Queue badge appears (e.g., "Q:3")
- [ ] Pipeline tab: Queue section shows items (if
itemsarray provided) - [ ] Add items to queue — verify badge count increases on next heartbeat
11.5 Retries #
- [ ] Trigger a task that retries an operation
- [ ] Timeline: Retry nodes appear with attempt number
- [ ] Activity Stream:
retry_startedevents appear
11.6 Tool execution tracking #
- [ ] Trigger a turn with 1+ tool calls
- [ ] Timeline: Blue action nodes appear with correct tool names
- [ ] Timeline: Duration is shown on each node
- [ ] Trigger a tool failure — verify red node with exception details
- [ ] Activity Stream:
action_started/action_completedevents appear - [ ] Verify tool nodes appear between LLM call nodes in the correct sequence
12. Common Mistakes #
12.1 Reporting issues inside task failure handlers #
# ❌ Wrong — this reports an issue for every single failure:
except Exception as e:
agent.report_issue(summary=str(e), severity="high")
Issues are for persistent problems, not individual failures. Track failure counts and only report when a threshold is crossed:
# ✅ Correct — only report after repeated failures:
except Exception as e:
self.failure_count += 1
if self.failure_count >= 3:
agent.report_issue(
summary=f"API consistently failing: {e}",
severity="high",
issue_id="api-failure",
occurrence_count=self.failure_count,
)
12.2 Forgetting to resolve issues #
If you report an issue but never resolve it, the red badge stays on the agent card permanently. Always pair report_issue() with resolve_issue():
# Report
agent.report_issue(..., issue_id="crm-403")
# Later, when the problem goes away:
agent.resolve_issue("CRM API recovered", issue_id="crm-403")
12.3 Using queue_snapshot() too frequently #
Don't call queue_snapshot() on every enqueue/dequeue — it generates events. Use queue_provider for automatic periodic reporting, or call queue_snapshot() at most once per processing cycle.
12.4 Plan step indices off by one #
task.plan() creates zero-indexed steps. The first step is index 0, not 1:
task.plan("My plan", ["Step A", "Step B", "Step C"])
task.plan_step(0, "started", "Step A") # ✅ correct
task.plan_step(1, "started", "Step A") # ❌ wrong — this is Step B
12.5 Escalating when you mean to request approval #
If the agent stops and waits, use task.request_approval(), not task.escalate(). Escalation means the agent hands off and moves on. Approval means the agent pauses until a human responds.
12.6 Not using issue_id #
Without issue_id, the server deduplicates issues by hashing the summary text. If the summary includes variable data (timestamps, counts), each report creates a new issue instead of updating the existing one:
# ❌ Every call creates a new issue (summary changes each time):
agent.report_issue(
summary=f"API failed {count} times as of {datetime.now()}",
severity="high",
)
# ✅ Updates the same issue each time:
agent.report_issue(
summary="API consistently failing",
severity="high",
issue_id="api-failure",
occurrence_count=count,
)
12.7 Calling task methods outside a task context #
task.plan(), task.escalate(), and task.retry() are task-scoped — they require an active task. If you call them outside a task context, they won't work.
agent.report_issue() and agent.queue_snapshot() are agent-scoped — they work anywhere, with or without a task.
# ✅ Task-scoped methods — call on the task object:
with agent.task(task_id) as task:
task.plan(...)
task.escalate(...)
task.retry(...)
# ✅ Agent-scoped methods — call on the agent object:
agent.report_issue(...) # works anywhere
agent.queue_snapshot(...) # works anywhere