HiveBoard Integration Guide for Agentic Frameworks #
A layer-by-layer technical guide to making your AI agents observable.
This guide is for developers who build or maintain agentic AI frameworks -- systems where an LLM reasons, selects tools, executes them, and repeats until a task is complete. The guide assumes Python but the concepts apply to any language with an HiveLoop SDK.
The integration follows a layered approach. Each layer builds on the previous one, and each one unlocks new dashboard capabilities. You can ship after any layer and come back later.
| Layer | What you add | What the dashboard shows | Effort |
|---|---|---|---|
| 0 | SDK init + agent registration | Heartbeat, online/offline, stuck detection | ~10 lines |
| 1 | Task boundaries + action tracking | Task table, timelines, success/failure rates | ~30 lines |
| 2a | LLM call tracking | Cost explorer, token usage, model breakdown | ~10 lines per LLM call site |
| 2b | Tool execution tracking | Tool nodes in timeline with inputs/outputs | ~15 lines at tool dispatch |
| 2c | Rich events | Plans, escalations, approvals, issues, TODOs, queue state | ~5 lines per event site |
| 3 | Advanced observability events | Insights tab: learning, compaction, config, memory ops | ~10 lines per event site |
| 4 | Client-side detectors | Insights tab: cycle detection, prompt bloat, state drift | ~50 lines per detector |
Prerequisites #
pip install hiveloop
You need:
- A HiveBoard server (self-hosted or cloud) with an API key (
hb_live_...orhb_test_...) - A project created on the server (a slug like
"my-project")
Layer 0: Initialization and Agent Registration #
Goal: Get agents visible on the dashboard with heartbeats and stuck detection.
Step 1: Initialize the SDK #
Call hiveloop.init() once at application startup -- before any agents are created.
import hiveloop
hb = hiveloop.init(
api_key="hb_live_your_key_here",
endpoint="https://your-hiveboard-server.com",
environment="production",
)
This creates a singleton client that buffers events in memory and flushes them to the server every 5 seconds via a background thread. It never blocks your application code. If the server is unreachable, events are queued (up to 10,000) and retried with exponential backoff.
Where to put this: In your application's entry point -- the main() function, the FastAPI lifespan, the CLI command handler, or wherever your framework boots up.
Testing: hiveloop.init() is a singleton -- subsequent calls log a warning and return the existing instance. In test suites where you need a fresh SDK state between tests, call hiveloop.reset() in your teardown to flush pending events, stop background threads, and clear the singleton.
hiveloop.init() parameters:
| Parameter | Type | Default | Notes |
|---|---|---|---|
api_key |
str | required | Starts with hb_live_, hb_test_, or hb_read_ |
endpoint |
str | "https://api.hiveboard.io" |
Your HiveBoard server URL |
environment |
str | "production" |
Filterable on dashboard. Use "development", "staging", etc. |
flush_interval |
float | 5.0 |
Seconds between automatic flushes |
batch_size |
int | 100 |
Max events per HTTP request (server caps at 500) |
max_queue_size |
int | 10000 |
Events buffered in memory before oldest are dropped |
debug |
bool | False |
Logs SDK operations to stderr |
Step 2: Register agents #
For each agent in your framework, call hb.agent(). This is idempotent -- calling it twice with the same agent_id returns the same handle.
hiveloop_agent = hb.agent(
agent_id="sales",
type="sales", # role or classification
version="claude-sonnet-4-5", # model or agent version
framework="my-framework", # your framework name
heartbeat_interval=30, # seconds between heartbeats (0 to disable)
stuck_threshold=300, # seconds without heartbeat before "stuck" badge
)
Where to put this: Wherever your framework creates or initializes agent instances. If agents are created lazily, call hb.agent() right after the agent object is constructed.
Store the handle. You'll need it later for task tracking, tool tracking, and event reporting. Common patterns:
# Option A: Store on the agent object
agent._hiveloop = hb.agent(agent_id=agent.id, ...)
# Option B: Store in a registry dict
_hiveloop_agents[agent.id] = hb.agent(agent_id=agent.id, ...)
hb.agent() parameters:
| Parameter | Type | Default | Notes |
|---|---|---|---|
agent_id |
str | required | Unique identifier, max 256 chars |
type |
str | "general" |
Agent classification (role, department, etc.) |
version |
str | None |
Agent or model version string |
framework |
str | "custom" |
Your framework name |
heartbeat_interval |
int | 30 |
Seconds. Set to 0 to disable heartbeats |
stuck_threshold |
int | 300 |
Seconds. Agent shows STUCK badge if no heartbeat within this window |
heartbeat_payload |
Callable | None |
() -> dict called each heartbeat, included in payload |
queue_provider |
Callable | None |
() -> dict returning queue state each heartbeat |
Heartbeat payload: Rich agent telemetry #
The heartbeat_payload callback is called every heartbeat cycle (default 30s). Use it to attach runtime metrics that flow into the dashboard's agent cards and the Insights tab. The callback must return a dict — keep it small (under 2KB) and fast (under 10ms).
def make_heartbeat_payload(agent_obj):
"""Build a heartbeat payload callback for an agent."""
_boot_time = time.time()
def payload():
return {
# Uptime and lifecycle
"uptime_seconds": int(time.time() - _boot_time),
"version": agent_obj.config.version,
# Cumulative token/cost counters (since boot)
"total_tokens_in": agent_obj.metrics.total_tokens_in,
"total_tokens_out": agent_obj.metrics.total_tokens_out,
"total_cost_usd": round(agent_obj.metrics.total_cost, 4),
# Error counters
"total_errors": agent_obj.metrics.error_count,
"consecutive_failures": agent_obj.metrics.consecutive_failures,
"last_error": agent_obj.metrics.last_error_message,
# Current state
"current_model": agent_obj.current_model,
"tasks_completed": agent_obj.metrics.tasks_completed,
"tasks_failed": agent_obj.metrics.tasks_failed,
}
return payload
# At agent registration
hiveloop_agent = hb.agent(
agent_id="sales",
type="sales",
version="claude-sonnet-4-5",
framework="my-framework",
heartbeat_payload=make_heartbeat_payload(agent_obj),
queue_provider=make_queue_provider("sales"),
)
What this unlocks: The Insights tab uses heartbeat payload fields for fleet-wide health views — total cost across all agents, error rate trends, uptime monitoring, and context pressure metrics. Without a rich heartbeat payload, these views show empty or incomplete data.
Keep it lean. The callback runs every 30 seconds per agent. Don't call external APIs, read files, or do anything that could block or fail. Read from in-memory counters only. If a field isn't available yet, omit it rather than returning None.
What you see on the dashboard after Layer 0 #
- Agent cards appear in The Hive panel with IDLE status
- Heartbeat sparklines pulse every 30 seconds
- STUCK detection fires if an agent stops responding
- Connection indicator shows green "Connected"
Safety contract #
Every HiveLoop call should be wrapped so it never crashes your agent:
# Guard pattern -- use everywhere
if hiveloop_agent:
try:
hiveloop_agent.some_method(...)
except Exception:
pass # observability must never break the agent
This is non-negotiable. Observability is a side channel. If the HiveBoard server goes down, your agents must continue running identically.
Layer 1: Task Boundaries and Action Tracking #
Goal: Get task timelines, success/failure tracking, and duration measurement.
Concept: What is a "task"? #
A task is a single unit of work your agent performs from start to finish. In most frameworks this maps to:
- One execution of the agentic loop (receive message -> reason -> use tools -> respond)
- One event being processed from a queue
- One API request being handled
- One scheduled job running
The key question: "Where does a unit of work start and end?" Find that boundary in your code.
Step 3: Wrap task execution with agent.task() #
agent.task() is a context manager. It emits task_started on entry and task_completed or task_failed on exit (depending on whether an exception was raised).
def execute_agent(agent, message, event_context=None):
hiveloop_agent = agent._hiveloop # the handle from Layer 0
if hiveloop_agent is not None:
task_id = generate_unique_task_id() # must be unique per execution
with hiveloop_agent.task(
task_id,
project="my-project",
type=event_context.get("source", "api") if event_context else "api",
) as task:
# Make the task handle available deeper in the call stack
set_current_task(task)
set_hiveloop_agent(hiveloop_agent)
try:
result = agent.run(message)
finally:
clear_current_task()
clear_hiveloop_agent()
return result
else:
return agent.run(message)
Critical: Task IDs must be unique per execution. If you reuse task IDs (like using a session ID), the dashboard will show only one row per ID instead of one row per run. Generate unique IDs using the event ID, a UUID, or a combination:
def generate_unique_task_id(agent_id, event_id=None):
if event_id:
return f"{agent_id}-{event_id}"
return f"{agent_id}-{uuid.uuid4().hex[:8]}"
agent.task() parameters:
| Parameter | Type | Default | Notes |
|---|---|---|---|
task_id |
str | required | Unique per execution |
project |
str | None |
Groups tasks in the dashboard |
type |
str | None |
Classification: "heartbeat", "webhook", "human", "api", etc. |
Alternative: Manual task lifecycle with agent.start_task() #
If your framework doesn't support context managers (e.g., callback-driven or event-sourced architectures where start and end happen in different functions), use start_task() with explicit complete()/fail() calls:
task = hiveloop_agent.start_task(
task_id,
project="my-project",
type="webhook",
)
set_current_task(task)
# ... later, when the work finishes:
try:
result = do_work()
task.complete() # emits task_completed
except Exception as exc:
task.fail(exception=exc) # emits task_failed
finally:
clear_current_task()
agent.start_task() accepts the same parameters as agent.task(). The caller is responsible for calling task.complete() or task.fail() -- if neither is called, the task will appear stuck on the dashboard.
Step 4: Plumb the task handle through the call stack #
The task handle created by agent.task() needs to be accessible from functions deep in the call stack -- your LLM call wrapper, your tool executor, your reflection module, etc. Don't pass it as a parameter through every function signature. Use contextvars.
Create a small plumbing module:
# observability.py
import contextvars
from typing import Any, Optional
_current_task = contextvars.ContextVar("hiveloop_task", default=None)
_current_agent = contextvars.ContextVar("hiveloop_agent", default=None)
def set_current_task(task: Any) -> None:
_current_task.set(task)
def get_current_task() -> Optional[Any]:
return _current_task.get()
def clear_current_task() -> None:
_current_task.set(None)
def set_hiveloop_agent(agent: Any) -> None:
_current_agent.set(agent)
def get_hiveloop_agent() -> Optional[Any]:
return _current_agent.get()
def clear_hiveloop_agent() -> None:
_current_agent.set(None)
Why two contextvars? They have different lifetimes. The task exists only during one execution. The agent handle persists for the agent's entire lifecycle. Some methods (report_issue, todo, queue_snapshot) are agent-level and work outside any task context. Others (llm_call, plan, escalate) are task-level and require an active task.
Set both when entering the task context, clear both in the finally block. This ensures downstream code can safely call get_current_task() or get_hiveloop_agent() and get None if no context is active.
Step 5: Track key functions with @agent.track() #
For functions whose names are known at definition time (not dynamically dispatched tools), use the @agent.track() decorator:
@hiveloop_agent.track("reflect")
def reflect(self, context):
# ... reflection logic ...
This adds action nodes to the task timeline. The "5-7 nodes" rule: Track 5-7 high-value functions, not 30. Good candidates are functions that call external services, make decisions, take >100ms, or could fail independently. Internal utilities, fast helpers, and functions called hundreds of times per task are bad candidates.
If your functions are class methods where the agent handle isn't available at import time, use track_context() instead (covered in Layer 2b).
What you see on the dashboard after Layer 1 #
- Task Table populates with rows: task ID, agent, type, status, duration
- Timeline shows task start/end with action nodes in between
- Stats Ribbon shows: Processing count, Success Rate, Avg Duration, Errors
- Activity Stream shows
task_started,task_completed,task_failed, and action events - Agent cards show PROCESSING badge and current task ID during execution
Layer 2a: LLM Call Tracking #
Goal: Get cost visibility, token usage, and model breakdown.
Step 6: Find all LLM call sites #
Search your codebase for every place an LLM API is called. Common patterns:
# Direct API calls
response = client.chat.completions.create(...)
response = client.messages.create(...)
# Framework wrappers
response = llm_client.complete(...)
response = llm_client.complete_with_tools(...)
response = llm_client.complete_json(...)
Build a catalog. A typical agentic framework has 3-8 LLM call sites:
| # | Location | Purpose | Example name |
|---|---|---|---|
| 1 | Main loop - reasoning | Agent decides what to do | "reasoning" |
| 2 | Main loop - tool use | Agent generates tool parameters | "tool_use" |
| 3 | Reflection module | Agent evaluates its progress | "reflection" |
| 4 | Planning module | Agent creates execution plan | "create_plan" |
| 5 | Summary/compression | Context window management | "context_compaction" |
| 6 | Classification/routing | Routing to sub-agents | "classify" |
Step 7: Determine token extraction for each site #
Different LLM clients expose tokens differently. Inspect each response object:
# Discovery helper -- run once per client type
for attr in dir(response):
if any(k in attr.lower() for k in ['token', 'usage', 'cost', 'model']):
print(f"response.{attr} = {getattr(response, attr, '?')}")
Common patterns:
| Pattern | Example | How to extract |
|---|---|---|
| On response object | Anthropic SDK | response.usage.input_tokens, response.usage.output_tokens |
| On client after call | Some wrappers | client._last_input_tokens, client._last_output_tokens |
| In response metadata | LangChain | response.response_metadata['token_usage'] |
| Not available | Some wrappers | Send name and model only -- tokens are optional |
Step 8: Add task.llm_call() at each site #
The standard pattern:
import time
from your_app.observability import get_current_task
# 1. Time the call
_start = time.perf_counter()
response = llm_client.complete(prompt=prompt, system=system, max_tokens=2048)
_elapsed_ms = (time.perf_counter() - _start) * 1000
# 2. Extract tokens (adapt to your response shape)
tokens_in = response.usage.input_tokens
tokens_out = response.usage.output_tokens
# 3. Report to HiveLoop
_task = get_current_task()
if _task:
try:
_task.llm_call(
"reasoning", # descriptive name (not the model name)
model=response.model, # model identifier
tokens_in=tokens_in,
tokens_out=tokens_out,
cost=estimate_cost(response.model, tokens_in, tokens_out),
duration_ms=round(_elapsed_ms),
)
except Exception:
pass
Cost estimation helper #
The SDK doesn't calculate costs -- you provide them. Build a lookup table for the models you use:
# Prices as of early 2026 -- update when pricing changes
COST_PER_MILLION = {
# Anthropic
"claude-opus-4-6": {"input": 15.00, "output": 75.00},
"claude-sonnet-4-5-20250929": {"input": 3.00, "output": 15.00},
"claude-haiku-4-5-20251001": {"input": 0.80, "output": 4.00},
"claude-3-haiku-20240307": {"input": 0.25, "output": 1.25},
# OpenAI
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
# Add your models here
}
def estimate_cost(model: str, tokens_in: int, tokens_out: int) -> float | None:
rates = COST_PER_MILLION.get(model)
if not rates:
return None
return (tokens_in * rates["input"] / 1_000_000) + (tokens_out * rates["output"] / 1_000_000)
Optional: Prompt and response previews #
task.llm_call() supports prompt_preview and response_preview parameters. These make Timeline LLM nodes clickable with real content. But prompts can contain PII, credentials, or internal context -- every string flows through the network and gets stored.
Recommendation: Don't add them everywhere. Gate them behind a config flag, and truncate aggressively (300 chars max). Only add them to the most opaque call sites where seeing what the agent asked is valuable for debugging:
_task.llm_call(
"reasoning",
model=response.model,
tokens_in=tokens_in,
tokens_out=tokens_out,
cost=estimate_cost(response.model, tokens_in, tokens_out),
duration_ms=round(_elapsed_ms),
# Only when config flag is enabled
prompt_preview=prompt[:300] if config.log_prompts else None,
response_preview=str(response.content)[:300] if config.log_prompts else None,
)
task.llm_call() parameters:
| Parameter | Type | Required | Notes |
|---|---|---|---|
name |
str | yes | Descriptive label for the Timeline node. Use the purpose, not the model name |
model |
str | yes | Model identifier. Used for Cost Explorer grouping |
tokens_in |
int | no | Input token count |
tokens_out |
int | no | Output token count |
cost |
float | no | USD cost. If omitted, Cost Explorer won't aggregate |
duration_ms |
int | no | Wall-clock LLM latency |
prompt_preview |
str | no | Truncated prompt for debugging. Consider PII implications |
response_preview |
str | no | Truncated response for debugging |
metadata |
dict | no | Arbitrary key-value pairs |
LLM call metadata: Unlocking advanced analytics #
The metadata parameter on task.llm_call() accepts an arbitrary dict. Use it to capture dimensions that power the Insights tab's optimization views — cache efficiency, context window pressure, turn-level cost tracking, and stop reason analysis.
_task = get_current_task()
if _task:
try:
_task.llm_call(
"reasoning",
model=response.model,
tokens_in=tokens_in,
tokens_out=tokens_out,
cost=estimate_cost(response.model, tokens_in, tokens_out),
duration_ms=round(_elapsed_ms),
metadata={
# Turn tracking — which turn of the agentic loop is this?
"turn_number": turn_count,
# Cache tokens — how much prompt was cached by the provider?
"cache_creation_input_tokens": getattr(
response.usage, "cache_creation_input_tokens", 0
),
"cache_read_input_tokens": getattr(
response.usage, "cache_read_input_tokens", 0
),
# Stop reason — did the model finish, hit max tokens, or use a tool?
"stop_reason": response.stop_reason, # "end_turn", "max_tokens", "tool_use"
# Context window utilization — how full is the context?
"context_window_size": model_context_limit, # e.g., 200000
"context_used_tokens": tokens_in,
"context_utilization_pct": round(
tokens_in / model_context_limit * 100, 1
),
# Prompt composition — what makes up the input tokens?
"system_prompt_tokens": count_tokens(system_prompt),
"history_tokens": count_tokens(messages),
"tool_results_tokens": count_tokens(tool_results),
},
)
except Exception:
pass
Field conventions: Use snake_case keys. The Insights tab looks for these specific field names in metadata:
| Metadata field | Insights tab usage |
|---|---|
turn_number |
Turn-level cost and token charts (Q14) |
cache_read_input_tokens |
Cache hit ratio analysis (Q12) |
cache_creation_input_tokens |
Cache efficiency trends (Q12) |
stop_reason |
Max-token truncation detection (Q20) |
context_utilization_pct |
Context pressure monitoring (Q16) |
context_window_size |
Context capacity analysis (Q16) |
system_prompt_tokens |
Prompt composition breakdown (Q19) |
history_tokens |
Prompt growth analysis (Q19) |
Don't over-instrument. Start with turn_number and stop_reason — they're the most universally useful. Add cache and context fields only if your LLM provider exposes them. Anthropic's API returns cache token counts directly; OpenAI requires separate calculation.
What you see on the dashboard after Layer 2a #
- Cost Explorer is fully functional: cost by model, cost by agent, total spend
- Task Table LLM column shows call count, COST column shows dollar amounts
- Timeline has purple LLM nodes with model badges
- Stats Ribbon Cost (1h) populates
- Activity Stream "llm" filter works
Layer 2b: Tool Execution Tracking #
Goal: Get tool nodes in the timeline showing what tools the agent used, with inputs and outputs.
The problem #
In agentic frameworks, the LLM picks tools at runtime. You can't use @agent.track("tool_name") because the tool name isn't known at definition time. track_context() solves this by accepting the name as a runtime parameter.
Step 9: Wrap tool dispatch with agent.track_context() #
Find the place in your code where tools are dispatched -- typically a single function that takes a tool name and parameters and routes to the correct implementation.
from your_app.observability import get_hiveloop_agent
def execute_tool(tool_name, parameters):
_agent = get_hiveloop_agent()
if _agent is not None:
with _agent.track_context(tool_name) as ctx:
result = tool_registry.execute(tool_name, parameters)
ctx.set_payload({
"args": {k: str(v)[:100] for k, v in parameters.items()},
"result_preview": str(result)[:200],
"success": result.success,
"error": result.error,
})
return result
else:
return tool_registry.execute(tool_name, parameters)
track_context() automatically:
- Emits
action_startedwhen entering thewithblock - Measures duration
- Emits
action_completedon clean exit (with the payload you set) - Emits
action_failedon exception (capturing exception type and message) - Handles nesting (if one tool calls another, the child becomes a nested node)
Separated lifecycle pattern #
If your tool dispatch doesn't use exceptions for error signaling (e.g., it returns a result object with a success flag), separate the __enter__ and __exit__ calls to ensure the tool always executes exactly once:
_ctx = None
if _agent is not None:
try:
_ctx = _agent.track_context(tool_name)
_ctx.__enter__()
except Exception:
_ctx = None
# Tool always executes exactly once, regardless of tracking
result = tool_registry.execute(tool_name, parameters)
if _ctx is not None:
try:
_ctx.set_payload({
"args": {k: str(v)[:100] for k, v in parameters.items()},
"result_preview": (result.output or "")[:200],
"success": result.success,
"error": result.error,
})
_ctx.__exit__(None, None, None)
except Exception:
pass
This avoids the double-execute bug where a naive with-based fallback could run the tool twice if track_context() succeeds but the tool fails.
Standardized tool payloads with tool_payload() #
Building the payload dict for ctx.set_payload() requires deciding which fields to include, how long to let strings grow, and stripping empty values. The tool_payload() helper standardizes this:
from hiveloop import tool_payload
def execute_tool(tool_name, parameters):
_agent = get_hiveloop_agent()
if _agent is not None:
with _agent.track_context(tool_name) as ctx:
result = tool_registry.execute(tool_name, parameters)
ctx.set_payload(tool_payload(
args=parameters,
result=result.data,
success=result.ok,
error=result.error,
tool_category="crm",
http_status=result.status_code,
result_size_bytes=len(result.raw),
))
return result
else:
return tool_registry.execute(tool_name, parameters)
tool_payload() parameters:
| Parameter | Type | Default | Notes |
|---|---|---|---|
args |
dict | None |
Tool arguments. Values truncated to args_max_len |
result |
Any | None |
Tool result (stringified & truncated to result_max_len) |
success |
bool | True |
Whether the call succeeded |
error |
str | None |
Error message on failure |
duration_ms |
int | None |
Elapsed milliseconds |
tool_category |
str | None |
Grouping label (e.g. "crm", "search") |
http_status |
int | None |
HTTP status code for API-backed tools |
result_size_bytes |
int | None |
Size of the raw result |
args_max_len |
int | 500 |
Max chars per arg value before truncation |
result_max_len |
int | 1000 |
Max chars for the result string |
The function strips all None-valued optional fields from the output so payloads stay compact.
What you see on the dashboard after Layer 2b #
- Timeline has blue tool nodes with names matching the actual tools called
- Clicking a tool node shows the payload (args, result preview, success/error)
- Activity Stream shows
action_completedandaction_failedfor each tool call - Failed tools appear as red nodes with error details
Layer 2c: Rich Events #
Goal: Plans, escalations, approvals, issues, retries, TODOs, queue state, and schedules.
These are the narrative events that tell the story of what your agent is doing and why. Each one lights up a specific dashboard feature.
Plans and plan steps #
If your framework breaks complex tasks into ordered steps (a plan), report the plan structure and step transitions.
When a plan is created:
_task = get_current_task()
if _task:
try:
step_descriptions = [step.description for step in plan.steps]
_task.plan(goal=task_description, steps=step_descriptions)
# Report first step started
_task.plan_step(step_index=0, action="started", summary=plan.steps[0].description)
except Exception:
pass
When a step completes and the next one starts:
if _task:
try:
_task.plan_step(step_index=current_step.index, action="completed",
summary=result_summary, turns=current_step.turns_taken)
_task.plan_step(step_index=next_step.index, action="started",
summary=next_step.description)
except Exception:
pass
When a step fails or is blocked:
if _task:
try:
_task.plan_step(step_index=step.index, action="failed",
summary=f"Blocked: {reason}")
except Exception:
pass
When the plan is revised (replanned):
if _task:
try:
_task.plan(goal=revised_goal, steps=new_step_descriptions, revision=revision_count)
_task.plan_step(step_index=0, action="started", summary=new_steps[0].description)
except Exception:
pass
Dashboard effect: A plan progress bar appears above the Timeline: green segments for completed steps, red for failed, gray for pending. This is the fastest way for an operator to see "where did it go wrong?"
task.plan() parameters:
| Parameter | Type | Required | Notes |
|---|---|---|---|
goal |
str | yes | What the plan achieves |
steps |
list[str] | yes | Ordered step descriptions |
revision |
int | no | Default 0. Increment on each replan |
task.plan_step() parameters:
| Parameter | Type | Required | Notes |
|---|---|---|---|
step_index |
int | yes | Zero-based position in the steps list |
action |
str | yes | "started", "completed", "failed", "skipped" |
summary |
str | yes | Outcome description |
turns |
int | no | LLM turns spent on this step |
tokens |
int | no | Tokens spent on this step |
Escalations #
When the agent decides it cannot handle something and hands it to a human.
Typical trigger: Your reflection or self-evaluation module returns a "give up" or "escalate" decision.
if reflection.decision == "escalate":
_task = get_current_task()
if _task:
try:
_task.escalate(
f"Agent escalated: {reflection.reasoning[:200]}",
assigned_to="human",
)
except Exception:
pass
Dashboard effect: Escalation node appears in the Timeline. The agent card may show a warning indicator.
task.escalate() parameters:
| Parameter | Type | Required | Notes |
|---|---|---|---|
reason |
str | yes | Why the agent is escalating |
assigned_to |
str | no | Who should handle it |
Approvals #
When the agent needs human approval before proceeding.
This is architecturally tricky: the request and the response often happen in different execution contexts (different threads, different HTTP requests). You may not have an active task context when the approval is granted.
Inside a task context (the standard case):
_task = get_current_task()
if _task:
try:
_task.request_approval(
f"Approval needed: {event_title}",
approver="human",
)
except Exception:
pass
When a human approves (inside a task context):
_task = get_current_task()
if _task:
try:
_task.approval_received(
f"Event '{event_id}' approved by operator",
approved_by="human",
decision="approved",
)
except Exception:
pass
When a human rejects (inside a task context):
_task = get_current_task()
if _task:
try:
_task.approval_received(
f"Event '{event_id}' dropped by operator",
approved_by="human",
decision="rejected",
)
except Exception:
pass
Outside a task context (e.g., approval granted from a separate HTTP handler or queue consumer where no task is active). Use agent.event() to emit the raw event types:
_hl_agent = getattr(get_agent(agent_id), "_hiveloop", None)
if _hl_agent:
try:
_hl_agent.event(
"approval_requested",
payload={"summary": f"Approval needed: {event_title}", "approver": "human"},
)
except Exception:
pass
# Later, when the human responds:
if _hl_agent:
try:
_hl_agent.event(
"approval_received",
payload={"summary": f"Event '{event_id}' approved", "approved_by": "human", "decision": "approved"},
)
except Exception:
pass
Dashboard effect: Agent card shows WAITING badge. The Pipeline tab shows the pending approval. After resolution, the approval decision appears in the Activity Stream.
Issues #
Persistent problems the agent encounters -- broken credentials, API failures, configuration errors. Unlike tool failures (which are per-execution), issues represent ongoing operational problems.
When the agent detects a persistent problem:
_agent = get_hiveloop_agent()
if _agent:
try:
_agent.report_issue(
summary="CRM API returning 403",
severity="high", # critical, high, medium, low
category="permissions", # permissions, connectivity, configuration, etc.
issue_id="crm-api-403", # stable ID for deduplication
context={"api": "crm", "status_code": 403},
occurrence_count=error_count,
)
except Exception:
pass
When the issue is resolved (human action or automatic recovery):
if _agent:
try:
_agent.resolve_issue(
summary="CRM API recovered",
issue_id="crm-api-403",
)
except Exception:
pass
Important: Always use issue_id for recurring issues. Without it, the SDK deduplicates on summary text. If the summary contains timestamps or variable data, each occurrence looks like a new issue.
Dashboard effect: Agent card shows a red issue badge with count. The Pipeline tab lists open issues by severity. Resolving an issue removes the badge.
agent.report_issue() parameters:
| Parameter | Type | Required | Notes |
|---|---|---|---|
summary |
str | yes | Human-readable description |
severity |
str | yes | "critical", "high", "medium", "low" |
category |
str | no | "permissions", "connectivity", "configuration", "data_quality", "rate_limit", "other" |
issue_id |
str | no | Stable ID for lifecycle tracking and dedup |
context |
dict | no | Debugging data (API name, status code, etc.) |
occurrence_count |
int | no | How many times the agent has seen this |
Severity guidelines:
| Severity | When to use | Example |
|---|---|---|
critical |
Agent cannot function at all | No API key configured |
high |
Major capability degraded | CRM API returning 403 |
medium |
Output quality reduced | Fallback model in use |
low |
Informational | Cache miss rate high |
Log forwarding with HiveBoardLogHandler #
Every Python project uses logging. Instead of manually calling agent.report_issue() at every log site, attach a HiveBoardLogHandler and let WARNING+ log records flow to HiveBoard automatically.
from hiveloop.contrib.log_handler import HiveBoardLogHandler
agent = hb.agent("my-agent", ...)
logging.getLogger("my_app").addHandler(HiveBoardLogHandler(agent))
Level mapping: WARNING → medium, ERROR → high, CRITICAL → critical. INFO and below are ignored by default (configurable via level).
Deduplication: Each log record gets a stable issue_id of f"log-{logger_name}-{level_name}", so repeated warnings from the same logger deduplicate naturally on the dashboard.
Context: Each forwarded record includes logger, filename, lineno, and funcName in the issue context dict, so operators can jump straight to the source.
Safety: emit() never raises — it inherits the SDK's safety contract.
HiveBoardLogHandler constructor parameters:
| Parameter | Type | Default | Notes |
|---|---|---|---|
agent |
Agent | required | The HiveLoop agent handle from hb.agent() |
level |
int | logging.WARNING |
Minimum log level to forward |
category |
str | "log" |
Issue category string |
Retries #
When a failed task creates a follow-up for retry.
_task = get_current_task()
if _task:
try:
_task.retry(
f"Failed ({status}): {message[:100]}",
attempt=1,
)
except Exception:
pass
task.retry() parameters:
| Parameter | Type | Required | Notes |
|---|---|---|---|
reason |
str | yes | Why the retry is happening |
attempt |
int | yes | 1-based attempt number |
backoff_seconds |
float | no | Wait time before next attempt |
Retry patterns for common failure modes #
Structured output parse failure: When the LLM returns malformed JSON or doesn't follow the required schema, retry with the error message fed back into the prompt.
MAX_PARSE_RETRIES = 3
for attempt in range(1, MAX_PARSE_RETRIES + 1):
response = llm_client.complete(prompt=prompt, system=system)
try:
result = json.loads(response.content)
validate_schema(result)
break # success
except (json.JSONDecodeError, ValidationError) as e:
_task = get_current_task()
if _task:
try:
_task.retry(
f"Parse failure (attempt {attempt}): {type(e).__name__}: {str(e)[:100]}",
attempt=attempt,
)
except Exception:
pass
if attempt == MAX_PARSE_RETRIES:
raise # all retries exhausted
# Feed the error back into the prompt for the next attempt
prompt += f"\n\nYour previous response was invalid: {e}. Please fix and try again."
API rate limit / transient failure: When an external API returns 429 or 5xx.
for attempt in range(1, 4):
try:
result = external_api.call(params)
break
except RateLimitError as e:
backoff = 2 ** attempt # 2s, 4s, 8s
_task = get_current_task()
if _task:
try:
_task.retry(
f"Rate limited by {api_name}: {e}",
attempt=attempt,
backoff_seconds=backoff,
)
except Exception:
pass
time.sleep(backoff)
Dashboard effect: Retry events appear as nodes in the Timeline. The Insights tab aggregates retry rates per agent and per failure type, helping identify agents with chronic parse failures or unreliable external dependencies.
TODOs #
Agent-managed work items -- retries, follow-ups, pending actions.
_agent = get_hiveloop_agent()
if _agent:
try:
_agent.todo(
todo_id="td_001",
action="created", # created, completed, failed, dismissed, deferred
summary="Retry: Process lead TechNova",
priority="high", # optional
source="failed_run", # optional: who/what created it
context="error=timeout", # optional: extra detail
)
except Exception:
pass
Report at every lifecycle point: when a TODO is created, when it's completed, when it's dismissed.
Dashboard effect: The Pipeline tab shows active TODOs. Completed/dismissed TODOs clear from the list.
Queue snapshots #
If your agents have a work queue (inbox, event queue, task queue), report its state so operators can see if work is piling up.
Best approach: Use the queue_provider callback at agent registration. The SDK calls this callback every heartbeat cycle (30s) and includes the result in the heartbeat payload automatically.
def make_queue_provider(agent_id):
def provider():
queue = get_queue_for_agent(agent_id)
if not queue:
return {"depth": 0}
return {
"depth": len(queue),
"oldest_age_seconds": queue.oldest_age_seconds(),
"items": [
{
"id": item.id,
"priority": item.priority,
"source": item.source,
"summary": item.title or item.message[:80],
"queued_at": item.timestamp.isoformat(),
}
for item in queue[:10] # cap at 10 items
],
"processing": {
"id": current.id,
"summary": current.title or current.message[:80],
"started_at": current.timestamp.isoformat(),
} if (current := get_currently_processing(agent_id)) else None,
}
return provider
# At agent registration
hiveloop_agent = hb.agent(
agent_id="sales",
# ... other params ...
queue_provider=make_queue_provider("sales"),
)
Note: The queue_provider is called lazily by the SDK on each heartbeat. It can safely reference runtime state that doesn't exist at registration time (like a runtime that starts later). Just return {"depth": 0} if the state isn't available yet.
Alternative: Manual snapshots with agent.queue_snapshot()
If the callback pattern doesn't fit your architecture, call agent.queue_snapshot() directly at any point:
_agent = get_hiveloop_agent()
if _agent:
try:
_agent.queue_snapshot(
depth=len(queue),
oldest_age_seconds=queue.oldest_age_seconds(),
items=[{"id": i.id, "summary": i.title[:80]} for i in queue[:10]],
processing={"id": current.id, "summary": current.title[:80]} if current else None,
)
except Exception:
pass
This is useful when queue state changes at irregular intervals (e.g., after each enqueue/dequeue) rather than on a fixed heartbeat schedule.
Dashboard effect: Queue depth badge (Q:8) on agent cards. The Pipeline tab shows queue contents. Operators can see if work is piling up.
Scheduled work #
If your agents have recurring tasks (periodic syncs, cleanup jobs, heartbeat timers), report them once at startup so the Pipeline tab shows the schedule.
_agent = getattr(agent_obj, "_hiveloop", None)
if _agent:
try:
_agent.scheduled(items=[
{
"id": "hb_crm_sync",
"name": "CRM Sync",
"interval": "10m",
"enabled": True,
"last_status": None,
},
{
"id": "hb_email_check",
"name": "Email Check",
"interval": "15m",
"enabled": True,
"last_status": None,
},
])
except Exception:
pass
Where to put this: In your agent start/boot function, after scheduled work is configured.
Update on completion. After each scheduled job runs, call scheduled() again with last_status set to "success" or "failed" and last_run set to the ISO timestamp. This lets the dashboard show when each job last ran and whether it succeeded.
# After a scheduled job completes
if _agent:
try:
_agent.scheduled(items=[
{
"id": "hb_crm_sync",
"name": "CRM Sync",
"interval": "10m",
"enabled": True,
"last_status": "success",
"last_run": datetime.utcnow().isoformat() + "Z",
},
# ... other scheduled items unchanged ...
])
except Exception:
pass
Note for frameworks with built-in schedulers: If your framework already has a scheduler (e.g., APScheduler, Celery Beat, or a custom timer loop), you already know all the schedule metadata. Just call scheduled() once at boot with the full list, then again after each job runs. You don't need to change how your scheduler works — just report what it already knows.
Layer 3: Advanced Observability Patterns #
Goal: Unlock Insights tab analytics with custom events that capture agent behavior patterns — learning, context management, errors, runtime lifecycle, and configuration.
All patterns in this layer use agent.event() or task.event() — the same generic event methods listed in Layer 2c. The SDK doesn't need any changes. You're simply emitting custom events with specific event_type values and structured payloads.
Key principle: task.event() is for events that happen during a task (context compaction, parse errors, learning moments). agent.event() is for events that happen outside any task (startup, config changes, session lifecycle).
Learning and self-correction events #
When your agent detects it made a mistake and corrects itself, or when your reflection module identifies a pattern for future improvement, emit a learning event. These power the Insights tab's "self-correction rate" metric.
_task = get_current_task()
if _task:
try:
_task.event("custom", payload={
"kind": "learning",
"data": {
"trigger": "reflection", # what triggered the learning
"category": "tool_selection", # what was learned about
"description": "Selected search_contacts instead of search_deals",
"correction_applied": True,
"turn_number": turn_count,
},
})
except Exception:
pass
When to emit: After your reflection module identifies an error and the agent adjusts its approach. Also when the agent backtracks on a plan step or changes strategy mid-task.
Context compaction tracking #
When your framework compresses, summarizes, or truncates the conversation history to fit within the context window, emit a compaction event. These power the Insights tab's "context pressure" and "prompt bloat" analysis.
_task = get_current_task()
if _task:
try:
_task.event("custom", payload={
"kind": "context_compaction",
"data": {
"tokens_before": tokens_before_compaction,
"tokens_after": tokens_after_compaction,
"tokens_removed": tokens_before_compaction - tokens_after_compaction,
"compression_ratio": round(tokens_after_compaction / tokens_before_compaction, 2),
"method": "summary", # "summary", "truncation", "sliding_window"
"turn_number": turn_count,
"messages_removed": num_messages_dropped,
},
})
except Exception:
pass
Where to put this: In your context window management function — the code that runs when the prompt is about to exceed the model's context limit.
Rich error context #
Tool failures and task failures are already captured by track_context() and task(). But some failures carry important diagnostic context that deserves its own event — the specific API response, the validation error, the state that led to the failure.
# When an external API returns an error with useful diagnostic data
_task = get_current_task()
if _task:
try:
_task.event("custom", payload={
"kind": "error_context",
"data": {
"error_type": type(exc).__name__, # "ValidationError", "HTTPError"
"error_message": str(exc)[:500],
"api_name": "crm",
"status_code": response.status_code,
"response_body": response.text[:200],
"retry_eligible": is_retryable(exc),
"turn_number": turn_count,
"action_name": current_tool_name,
},
})
except Exception:
pass
When to emit: Don't emit this for every error — track_context() already captures tool failures. Emit error_context when you have additional diagnostic information that wouldn't fit in the action payload, or for errors that happen outside a tool call (e.g., prompt construction failures, response parsing failures).
Runtime lifecycle events #
Emit events at key moments in your agent's lifecycle — startup, shutdown, configuration reloads, model switches. These power the Insights tab's "agent lifecycle" timeline and help correlate behavior changes with configuration changes.
# At agent startup
_agent = getattr(agent_obj, "_hiveloop", None)
if _agent:
try:
_agent.event("custom", payload={
"kind": "runtime",
"data": {
"event": "agent_started",
"config": {
"model": agent_obj.config.model,
"max_turns": agent_obj.config.max_turns,
"temperature": agent_obj.config.temperature,
"tools_enabled": [t.name for t in agent_obj.tools],
},
"environment": {
"python_version": sys.version.split()[0],
"framework_version": framework.__version__,
},
},
})
except Exception:
pass
# When the model changes mid-session (e.g., fallback to cheaper model)
if _agent:
try:
_agent.event("custom", payload={
"kind": "runtime",
"data": {
"event": "model_switched",
"from_model": previous_model,
"to_model": new_model,
"reason": "cost_optimization", # or "rate_limit", "fallback", "user_request"
},
})
except Exception:
pass
Session and memory operations #
If your agents maintain persistent memory (vector stores, conversation histories, knowledge bases), emit events when memory is read or written. These help diagnose stale memory issues and track memory utilization.
# After writing to persistent memory
_task = get_current_task()
if _task:
try:
_task.event("custom", payload={
"kind": "memory_op",
"data": {
"operation": "write", # "read", "write", "delete", "search"
"store": "vector_db", # "vector_db", "session_history", "knowledge_base"
"key_or_query": memory_key[:100],
"result_count": num_results, # for reads/searches
"latency_ms": round(elapsed_ms),
},
})
except Exception:
pass
Configuration snapshots #
Emit the full agent configuration at startup and whenever it changes. This lets operators compare "what was the config when this agent was working" vs "what changed when it broke."
# At agent boot or after config reload
_agent = getattr(agent_obj, "_hiveloop", None)
if _agent:
try:
_agent.event("custom", payload={
"kind": "config_snapshot",
"data": {
"model": agent_obj.config.model,
"temperature": agent_obj.config.temperature,
"max_turns": agent_obj.config.max_turns,
"max_tokens": agent_obj.config.max_tokens,
"tools": [t.name for t in agent_obj.tools],
"system_prompt_hash": hashlib.md5(
system_prompt.encode()
).hexdigest()[:8],
"system_prompt_tokens": count_tokens(system_prompt),
"guardrails_enabled": agent_obj.config.guardrails_enabled,
"version": agent_obj.config.version,
},
})
except Exception:
pass
When to emit: At agent startup, and again whenever configuration changes at runtime (model switch, tool list change, temperature adjustment). The system_prompt_hash lets operators detect prompt changes without logging the full prompt.
Layer 4: Client-Side Detection Patterns #
Goal: Detect behavioral anomalies in your agentic loop and report them to HiveBoard.
These patterns require logic that runs in your framework, not in the SDK. The SDK is the reporting channel — you implement the detection, then emit the findings via task.event() or agent.event(). These patterns power the Insights tab's "Smart Detectors" (contradiction detector, silent drop detector, prompt bloat detector, etc.).
Loop and cycle detection #
Agentic loops can get stuck — calling the same tool with the same arguments, or alternating between two tools without making progress. Detect this by tracking recent actions and looking for repetition.
# Cycle detection state — maintain per task
_recent_actions = [] # list of (tool_name, args_hash) tuples
MAX_HISTORY = 20
CYCLE_THRESHOLD = 3 # same action 3+ times = probable loop
def detect_cycle(tool_name: str, arguments: dict) -> bool:
"""Returns True if a cycle is detected."""
args_hash = hashlib.md5(
json.dumps(arguments, sort_keys=True).encode()
).hexdigest()[:8]
_recent_actions.append((tool_name, args_hash))
if len(_recent_actions) > MAX_HISTORY:
_recent_actions.pop(0)
# Check for exact repetition
recent = _recent_actions[-CYCLE_THRESHOLD:]
if len(recent) == CYCLE_THRESHOLD and len(set(recent)) == 1:
return True
# Check for A-B-A-B alternation
if len(_recent_actions) >= 4:
last4 = _recent_actions[-4:]
if last4[0] == last4[2] and last4[1] == last4[3] and last4[0] != last4[1]:
return True
return False
# In your tool dispatch function
def execute_tool(tool_name, arguments):
if detect_cycle(tool_name, arguments):
_task = get_current_task()
if _task:
try:
_task.event("custom", payload={
"kind": "anomaly",
"data": {
"detector": "cycle",
"tool_name": tool_name,
"args_hash": hashlib.md5(
json.dumps(arguments, sort_keys=True).encode()
).hexdigest()[:8],
"cycle_length": CYCLE_THRESHOLD,
"turn_number": turn_count,
"message": f"Agent called {tool_name} {CYCLE_THRESHOLD}x with same args",
},
})
except Exception:
pass
# Optional: break the cycle by failing the task or injecting guidance
# raise CycleDetectedError(f"Loop detected: {tool_name}")
result = tool_registry.execute(tool_name, arguments)
return result
What to do when a cycle is detected: Reporting the anomaly is mandatory. Breaking the cycle is your design choice — you can raise an exception to fail the task, inject a system message ("you are repeating yourself"), reduce max_turns, or let it continue but alert the operator.
Prompt composition breakdown #
Track what percentage of the context window is consumed by each component: system prompt, conversation history, tool results, and user message. This detects "prompt bloat" — when one component grows to dominate the context window.
def analyze_prompt_composition(
system_prompt: str,
messages: list,
tool_results: list | None = None,
model_context_limit: int = 200_000,
) -> dict:
"""Analyze token distribution across prompt components."""
sys_tokens = count_tokens(system_prompt)
history_tokens = count_tokens(messages)
tool_tokens = count_tokens(tool_results) if tool_results else 0
total = sys_tokens + history_tokens + tool_tokens
breakdown = {
"system_prompt_tokens": sys_tokens,
"history_tokens": history_tokens,
"tool_results_tokens": tool_tokens,
"total_input_tokens": total,
"context_utilization_pct": round(total / model_context_limit * 100, 1),
"largest_component": max(
[("system", sys_tokens), ("history", history_tokens), ("tools", tool_tokens)],
key=lambda x: x[1],
)[0],
}
# Detect bloat: any single component > 60% of total
for name, tokens in [("system", sys_tokens), ("history", history_tokens), ("tools", tool_tokens)]:
pct = (tokens / total * 100) if total > 0 else 0
breakdown[f"{name}_pct"] = round(pct, 1)
if pct > 60:
breakdown["bloat_warning"] = f"{name} is {pct:.0f}% of prompt"
return breakdown
# Before each LLM call
composition = analyze_prompt_composition(system_prompt, messages, tool_results)
_task = get_current_task()
if _task and composition.get("context_utilization_pct", 0) > 50:
try:
_task.event("custom", payload={
"kind": "prompt_composition",
"data": {
**composition,
"turn_number": turn_count,
},
})
except Exception:
pass
When to emit: Every turn is too noisy. Emit when context utilization exceeds 50%, or when the composition changes significantly (e.g., tool results jump from 10% to 40% of the prompt). The Insights tab aggregates these to show how prompt composition evolves across turns within a task.
State mutation tracking #
If your agents maintain mutable state (working memory, scratchpads, accumulated results), track significant mutations. This helps diagnose "the agent forgot what it learned" or "the state silently changed."
import copy
class StateMutationTracker:
"""Track changes to agent working state and report significant mutations."""
def __init__(self):
self._previous_state = {}
self._mutation_count = 0
def check_mutation(self, current_state: dict, turn_number: int) -> dict | None:
"""Compare current state to previous. Returns diff if changed."""
if not self._previous_state:
self._previous_state = copy.deepcopy(current_state)
return None
changes = {}
all_keys = set(list(self._previous_state.keys()) + list(current_state.keys()))
for key in all_keys:
old_val = self._previous_state.get(key)
new_val = current_state.get(key)
if old_val != new_val:
changes[key] = {
"action": "added" if old_val is None else "removed" if new_val is None else "modified",
"old_type": type(old_val).__name__ if old_val is not None else None,
"new_type": type(new_val).__name__ if new_val is not None else None,
}
if changes:
self._mutation_count += 1
self._previous_state = copy.deepcopy(current_state)
return {
"mutation_number": self._mutation_count,
"turn_number": turn_number,
"keys_changed": list(changes.keys()),
"changes": changes,
"state_size": len(current_state),
}
return None
# Usage — at the end of each turn
_tracker = StateMutationTracker() # one per task
def after_turn(agent_state: dict, turn_number: int):
mutation = _tracker.check_mutation(agent_state, turn_number)
if mutation:
_task = get_current_task()
if _task:
try:
_task.event("custom", payload={
"kind": "state_mutation",
"data": mutation,
})
except Exception:
pass
What to track: Track your agent's working memory, not internal SDK state. Good candidates: accumulated search results, extracted entities, plan progress, user preferences collected during conversation. Don't track conversation history (that's covered by prompt composition) or transient loop variables.
Privacy note: Don't include actual state values in the mutation event — only keys, types, and whether they were added/modified/removed. State values may contain PII or sensitive data.
Putting It All Together: Complete Turn Instrumentation #
Here's how all layers combine in a single agentic loop turn:
import time
import json
from your_app.observability import get_current_task, get_hiveloop_agent, estimate_cost
def run_turn(messages, tool_catalog, system_prompt):
"""One turn of the agentic loop with full HiveLoop instrumentation."""
# --- LLM CALL (Layer 2a) ---
_start = time.perf_counter()
response = llm_client.chat(
messages=messages,
tools=tool_catalog,
system=system_prompt,
)
_elapsed_ms = (time.perf_counter() - _start) * 1000
tokens_in = response.usage.input_tokens
tokens_out = response.usage.output_tokens
_task = get_current_task()
if _task:
try:
_task.llm_call(
"agent_turn",
model=response.model,
tokens_in=tokens_in,
tokens_out=tokens_out,
cost=estimate_cost(response.model, tokens_in, tokens_out),
duration_ms=round(_elapsed_ms),
)
except Exception:
pass
# --- TOOL EXECUTION (Layer 2b) ---
if response.tool_calls:
_agent = get_hiveloop_agent()
for tool_call in response.tool_calls:
if _agent is not None:
with _agent.track_context(tool_call.name) as ctx:
result = tool_registry.execute(tool_call.name, tool_call.arguments)
ctx.set_payload({
"args": {k: str(v)[:100] for k, v in tool_call.arguments.items()},
"result_preview": str(result)[:200],
"success": result.success,
})
else:
result = tool_registry.execute(tool_call.name, tool_call.arguments)
messages.append({"role": "tool", "content": str(result)})
return response
Architecture Patterns #
Single-agent system #
The simplest case. One agent, one hb.agent() call, one task context at a time.
hb = hiveloop.init(api_key="...", endpoint="...")
agent = hb.agent(agent_id="main", ...)
while True:
message = queue.get()
with agent.task(f"task-{uuid.uuid4().hex[:8]}", project="my-app") as task:
set_current_task(task)
set_hiveloop_agent(agent)
try:
process(message)
finally:
clear_current_task()
clear_hiveloop_agent()
Multi-agent system #
Each agent gets its own hb.agent() handle. The contextvars ensure that concurrent agents (in threads or async tasks) don't cross-contaminate their telemetry.
hb = hiveloop.init(api_key="...", endpoint="...")
for agent_config in config.agents:
agent = create_agent(agent_config)
agent._hiveloop = hb.agent(
agent_id=agent_config.id,
type=agent_config.role,
version=agent_config.model,
framework="my-framework",
)
API-driven agents (FastAPI, Flask) #
Each HTTP request is a task. Set the task context in middleware or at the handler level.
@app.post("/agents/{agent_id}/run")
async def run_agent(agent_id: str, body: RunRequest):
agent = get_agent(agent_id)
_hl = getattr(agent, "_hiveloop", None)
if _hl:
with _hl.task(f"{agent_id}-{uuid.uuid4().hex[:8]}", project="my-app") as task:
set_current_task(task)
set_hiveloop_agent(_hl)
try:
result = agent.run(body.message)
finally:
clear_current_task()
clear_hiveloop_agent()
else:
result = agent.run(body.message)
return {"response": result}
Framework callbacks (LangChain, CrewAI, AutoGen) #
If your framework provides callback hooks, wire HiveLoop into them:
# LangChain example
class HiveLoopCallbackHandler(BaseCallbackHandler):
def on_llm_start(self, serialized, prompts, **kwargs):
self._llm_start = time.perf_counter()
def on_llm_end(self, response, **kwargs):
elapsed = (time.perf_counter() - self._llm_start) * 1000
_task = get_current_task()
if _task:
_task.llm_call("llm_call", model=response.model,
tokens_in=response.usage.input_tokens,
tokens_out=response.usage.output_tokens,
duration_ms=round(elapsed))
def on_tool_start(self, serialized, input_str, **kwargs):
self._tool_ctx = get_hiveloop_agent().track_context(serialized["name"])
self._tool_ctx.__enter__()
def on_tool_end(self, output, **kwargs):
if self._tool_ctx:
self._tool_ctx.set_payload({"result_preview": str(output)[:200]})
self._tool_ctx.__exit__(None, None, None)
Incremental Adoption Strategy #
Don't do everything at once. Ship in tiers:
Tier 1 (highest value, lowest effort) #
hiveloop.init()+hb.agent()(Layer 0)agent.task()wrapping (Layer 1)task.llm_call()at your main LLM call sites (Layer 2a)agent.report_issue()at persistent error detection points (Layer 2c)
Result: Agent cards with heartbeats, task table with durations, cost explorer, issue badges. This alone answers "are my agents running?", "how much are they costing?", and "is anything broken?"
Tier 2 (unlocks debugging) #
agent.track_context()at tool dispatch (Layer 2b)task.plan()+task.plan_step()if your framework uses planning (Layer 2c)task.escalate()at escalation decision points (Layer 2c)
Result: Full timeline with tool nodes, plan progress bars, escalation tracking. Now operators can answer "what went wrong in this specific run?"
Tier 3 (unlocks operations) #
queue_providercallback for queue snapshots (Layer 2c)- Approval request/received events (Layer 2c)
task.retry()at retry points (Layer 2c)
Result: Queue depth visibility, WAITING badges, retry tracking. Operators can answer "is work piling up?" and "does anything need my attention?"
Tier 4 (polish) #
agent.todo()at TODO lifecycle points (Layer 2c)agent.scheduled()at agent startup (Layer 2c)prompt_preview/response_previewon key LLM calls (Layer 2a)
Result: Pipeline tab fully populated. Complete operational picture.
Tier 5 (advanced analytics) #
- LLM call
metadatawith cache tokens, turn numbers, stop reasons (Layer 2a) - Context compaction and learning events (Layer 3)
- Configuration snapshots at startup (Layer 3)
- Loop/cycle detection in tool dispatch (Layer 4)
- Prompt composition breakdown (Layer 4)
- State mutation tracking (Layer 4)
Result: Insights tab fully powered. Anomaly detection, cost optimization recommendations, context pressure monitoring, and behavioral analysis all active.
Common Mistakes #
Reusing task IDs. If two executions share a task ID, the dashboard merges them into one row. Always generate unique IDs.
Forgetting the
finallyblock. If you setset_current_task(task)but don't clear it infinally, a failed task leaves stale context for the next execution.Tracking too many functions. 30+ action nodes per task makes the timeline unreadable. Track 5-7 high-value functions.
Reporting issues for every single failure. Use a threshold (3+ consecutive failures) before calling
report_issue(). Individual tool failures are already captured bytrack_context().Not using
issue_id. Without it, the SDK deduplicates on summary text. Timestamps or variable data in the summary defeats dedup.Forgetting to resolve issues. An unresolved issue shows a red badge forever. Wire
resolve_issue()into your recovery paths.Calling task-level methods outside task context.
get_current_task()returnsNonebetween tasks. Always check before calling. Agent-level methods (report_issue,todo,queue_snapshot,scheduled) work anywhere.Double-execute bug. If your
with track_context()fallback pattern runs the tool both inside and outside thewithblock on failure, the tool executes twice. Use the separated__enter__/__exit__pattern instead.Not passing
cost=. Without it, the Cost Explorer works but shows $0. Always passcost=estimate_cost(...).Plan step indices off by one.
step_indexis zero-based. Step 1 in your plan isstep_index=0.
Shutdown #
Call hiveloop.shutdown() when your application exits to flush any remaining buffered events:
import atexit
atexit.register(hiveloop.shutdown)
Or in your framework's shutdown handler:
def on_shutdown():
hiveloop.shutdown(timeout=10) # waits up to 10s for flush
Quick Reference: All SDK Methods #
Module-level #
| Method | Purpose |
|---|---|
hiveloop.init(**kwargs) |
Initialize SDK (singleton) |
hiveloop.shutdown(timeout=10) |
Flush and stop |
hiveloop.flush() |
Force flush without stopping |
hiveloop.reset() |
Flush, stop, clear singleton (testing) |
hiveloop.tool_payload(**kw) |
Build standardized tool payload dict |
Agent-level (hb.agent() returns this) #
| Method | Purpose |
|---|---|
agent.task(task_id, **kw) |
Context manager for task tracking |
agent.start_task(task_id, **kw) |
Manual task lifecycle (caller must complete/fail) |
agent.track(name) |
Decorator for action tracking |
agent.track_context(name) |
Context manager for dynamic action tracking |
agent.report_issue(summary, **kw) |
Report persistent operational issue |
agent.resolve_issue(summary, **kw) |
Resolve a reported issue |
agent.todo(todo_id, action, summary, **kw) |
Report TODO lifecycle event |
agent.queue_snapshot(**kw) |
Report queue state |
agent.scheduled(items=[...]) |
Report scheduled work |
agent.event(event_type, payload) |
Custom agent-level event |
agent.llm_call(name, model, **kw) |
LLM call outside task context |
Task-level (agent.task() yields this) #
| Method | Purpose |
|---|---|
task.llm_call(name, model, **kw) |
Log LLM call within task |
task.plan(goal, steps, **kw) |
Declare execution plan |
task.plan_step(step_index, action, summary, **kw) |
Update plan step |
task.escalate(reason, **kw) |
Escalate to human |
task.request_approval(summary, **kw) |
Request human approval |
task.approval_received(summary, **kw) |
Record approval decision |
task.retry(reason, **kw) |
Record retry attempt |
task.event(event_type, payload) |
Custom task-level event |
task.set_payload(dict) |
Add payload to completion event |
task.complete(status="success", payload=None) |
Manual completion (for start_task() flow) |
task.fail(exception=None, payload=None) |
Manual failure (for start_task() flow) |
Context manager (agent.track_context() yields this) #
| Method | Purpose |
|---|---|
ctx.set_payload(dict) |
Attach metadata to the action_completed event |
Contrib (hiveloop.contrib) #
| Class | Purpose |
|---|---|
HiveBoardLogHandler(agent, level, category) |
logging.Handler that forwards WARNING+ to report_issue() |
Custom event kind values (Layer 3 & 4) #
Use agent.event("custom", payload={"kind": "...", "data": {...}}) or task.event(...) with these kinds:
| Kind | Layer | Purpose |
|---|---|---|
learning |
3 | Self-correction or strategy change detected |
context_compaction |
3 | Context window compressed/summarized |
error_context |
3 | Rich diagnostic data for a failure |
runtime |
3 | Agent startup, shutdown, model switch |
memory_op |
3 | Read/write to persistent memory store |
config_snapshot |
3 | Full configuration at boot or change |
anomaly |
4 | Loop/cycle detected in tool dispatch |
prompt_composition |
4 | Token breakdown by prompt component |
state_mutation |
4 | Working state keys changed between turns |