Skip to main content

Cowork End-to-End Workflow

This guide walks through a complete integration: receiving Cowork OTLP events, applying PII redaction, correlating by prompt, generating dashboard metrics, and firing alerts — all in a single pipeline.

Prerequisites

pip install briefcase-ai

The Pipeline

Cowork Agent  →  OTLP logs  →  CoworkEventReceiver  →  RedactionFilter

┌──────────────────┼──────────────────┐
▼ ▼ ▼
PromptCorrelation CoworkDashboards CoworkAlertManager
│ │ │
▼ ▼ ▼
Prompt traces Cost / Tool / PagerDuty / Slack
API / Cache notifications

Full Example

"""
End-to-end Cowork event processing pipeline.

Demonstrates: receive → redact → correlate → dashboard → alert.
"""

from briefcase_ai.cowork import (
CoworkEventReceiver,
CoworkRedactionFilter,
PromptCorrelationEngine,
CoworkDashboards,
CoworkAlertManager,
)


# ──────────────────────────────────────────────────────────────────
# Step 1: Configure PII redaction
# ──────────────────────────────────────────────────────────────────

redaction = CoworkRedactionFilter(
enabled=True,
redact_prompt_content=True, # Full prompt replacement
custom_patterns={
# Add org-specific patterns
"account_number": r"\bACCT-\d{10}\b",
},
)


# ──────────────────────────────────────────────────────────────────
# Step 2: Configure alerts
# ──────────────────────────────────────────────────────────────────

alert_log = []

def on_alert(alert):
"""Handle alerts — send to PagerDuty, Slack, etc."""
alert_log.append(alert)
print(f"[{alert.severity.upper()}] {alert.alert_type}: {alert.message}")

alert_manager = CoworkAlertManager(
handlers=[on_alert],
cost_threshold_usd=0.10, # Warn on expensive requests
)


# ──────────────────────────────────────────────────────────────────
# Step 3: Create the receiver with redaction + alert callback
# ──────────────────────────────────────────────────────────────────

receiver = CoworkEventReceiver(
redaction_filter=redaction,
on_event=lambda evt: alert_manager.evaluate(evt),
validate_strict=False,
)


# ──────────────────────────────────────────────────────────────────
# Step 4: Simulate incoming OTLP log records from Cowork
# ──────────────────────────────────────────────────────────────────

# In production, these come from your OTLP logs endpoint.
# Here we simulate a typical prompt interaction.

prompt_id = "550e8400-e29b-41d4-a716-446655440000"

sample_records = [
# User types a prompt
{
"body": {"event_type": "user_prompt"},
"attributes": {
"session.id": "sess-abc",
"organization.id": "org-acme",
"user.account_uuid": "acct-001",
"user.id": "alice",
"user.email": "alice@acme.com",
"terminal.type": "vscode",
"event.timestamp": "2026-03-01T10:00:00Z",
"event.sequence": 1,
"prompt.id": prompt_id,
"prompt": "Read the config file and update the database URL",
"prompt_length": "49",
},
"resource": {
"attributes": {
"service.name": "cowork",
"service.version": "1.0.25",
}
},
},
# Tool decision: allow Read
{
"body": {"event_type": "tool_decision"},
"attributes": {
"session.id": "sess-abc",
"organization.id": "org-acme",
"user.account_uuid": "acct-001",
"user.id": "alice",
"user.email": "alice@acme.com",
"terminal.type": "vscode",
"event.timestamp": "2026-03-01T10:00:01Z",
"event.sequence": 2,
"prompt.id": prompt_id,
"tool_name": "Read",
"decision": "allow",
"source": "auto",
},
"resource": {"attributes": {"service.name": "cowork"}},
},
# Tool result: Read succeeds
{
"body": {"event_type": "tool_result"},
"attributes": {
"session.id": "sess-abc",
"organization.id": "org-acme",
"user.account_uuid": "acct-001",
"user.id": "alice",
"user.email": "alice@acme.com",
"terminal.type": "vscode",
"event.timestamp": "2026-03-01T10:00:02Z",
"event.sequence": 3,
"prompt.id": prompt_id,
"tool_name": "Read",
"success": "true",
"duration_ms": "45",
"tool_result_size_bytes": "2048",
},
"resource": {"attributes": {"service.name": "cowork"}},
},
# API request: LLM call
{
"body": {"event_type": "api_request"},
"attributes": {
"session.id": "sess-abc",
"organization.id": "org-acme",
"user.account_uuid": "acct-001",
"user.id": "alice",
"user.email": "alice@acme.com",
"terminal.type": "vscode",
"event.timestamp": "2026-03-01T10:00:03Z",
"event.sequence": 4,
"prompt.id": prompt_id,
"model": "claude-sonnet-4-20250514",
"cost_usd": "0.0035",
"duration_ms": "1450",
"input_tokens": "1200",
"output_tokens": "350",
"cache_read_tokens": "800",
"cache_creation_tokens": "200",
"speed": "241.38",
},
"resource": {"attributes": {"service.name": "cowork"}},
},
# Tool result: Edit succeeds
{
"body": {"event_type": "tool_result"},
"attributes": {
"session.id": "sess-abc",
"organization.id": "org-acme",
"user.account_uuid": "acct-001",
"user.id": "alice",
"user.email": "alice@acme.com",
"terminal.type": "vscode",
"event.timestamp": "2026-03-01T10:00:04Z",
"event.sequence": 5,
"prompt.id": prompt_id,
"tool_name": "Edit",
"success": "true",
"duration_ms": "12",
"tool_result_size_bytes": "512",
},
"resource": {"attributes": {"service.name": "cowork"}},
},
]


# ──────────────────────────────────────────────────────────────────
# Step 5: Process the batch
# ──────────────────────────────────────────────────────────────────

accepted = receiver.receive_batch(sample_records)
print(f"\nAccepted {len(accepted)} of {len(sample_records)} events")
print(f"Validation errors: {len(receiver.validation_errors)}")


# ──────────────────────────────────────────────────────────────────
# Step 6: Correlate events by prompt.id
# ──────────────────────────────────────────────────────────────────

engine = PromptCorrelationEngine()
engine.add_many(receiver.events)

for trace in engine.traces():
print(f"\n── Prompt Trace: {trace.prompt_id} ──")
print(f" User: {trace.user_id} ({trace.organization_id})")
print(f" Events: {trace.event_count}")
print(f" Cost: ${trace.total_cost_usd:.4f}")
print(f" Tokens: {trace.total_input_tokens} in / {trace.total_output_tokens} out")
print(f" Cache: {trace.total_cache_read_tokens} read / "
f"{trace.total_cache_creation_tokens} created")
print(f" Tools: {trace.tool_count} ({trace.tool_failure_count} failures)")
print(f" API errors: {trace.api_error_count}")

# The user prompt is redacted
if trace.user_prompt:
print(f" Prompt (redacted): {trace.user_prompt.attributes.get('prompt')}")


# ──────────────────────────────────────────────────────────────────
# Step 7: Generate dashboard metrics
# ──────────────────────────────────────────────────────────────────

dashboards = CoworkDashboards()
dashboards.ingest_many(receiver.events)

print("\n── Cost Summary ──")
total = dashboards.cost_total()
print(f" Total: ${total.total_cost_usd:.4f} across {total.request_count} requests")
print(f" Avg per request: ${total.avg_cost_per_request:.4f}")
for model, cost in total.models.items():
print(f" {model}: ${cost:.4f}")

print("\n── Tool Usage ──")
for name, summary in dashboards.tool_usage().items():
print(f" {name}: {summary.invocation_count} calls, "
f"{summary.success_rate:.0%} success, "
f"{summary.avg_duration_ms:.0f}ms avg")

print("\n── Cache Efficiency ──")
for model, summary in dashboards.cache_efficiency().items():
print(f" {model}: {summary.cache_hit_ratio:.0%} hit ratio")


# ──────────────────────────────────────────────────────────────────
# Step 8: Check alerts
# ──────────────────────────────────────────────────────────────────

print(f"\n── Alerts Fired: {len(alert_log)} ──")
for alert in alert_log:
print(f" [{alert.severity}] {alert.alert_type}: {alert.message}")

Expected Output

Accepted 5 of 5 events
Validation errors: 0

── Prompt Trace: 550e8400-e29b-41d4-a716-446655440000 ──
User: alice (org-acme)
Events: 5
Cost: $0.0035
Tokens: 1200 in / 350 out
Cache: 800 read / 200 created
Tools: 2 (0 failures)
API errors: 0
Prompt (redacted): [REDACTED_PROMPT]

── Cost Summary ──
Total: $0.0035 across 1 requests
Avg per request: $0.0035
claude-sonnet-4-20250514: $0.0035

── Tool Usage ──
Read: 1 calls, 100% success, 45ms avg
Edit: 1 calls, 100% success, 12ms avg

── Cache Efficiency ──
claude-sonnet-4-20250514: 80% hit ratio

── Alerts Fired: 0 ──

What Happens with Errors

When an API error or tool failure occurs, the alert manager fires automatically:

error_record = {
"body": {"event_type": "api_error"},
"attributes": {
"session.id": "sess-abc",
"organization.id": "org-acme",
"user.account_uuid": "acct-001",
"user.id": "alice",
"user.email": "alice@acme.com",
"terminal.type": "vscode",
"event.timestamp": "2026-03-01T10:01:00Z",
"event.sequence": 10,
"prompt.id": "another-prompt-id",
"model": "claude-sonnet-4-20250514",
"error": "rate_limit_exceeded",
"status_code": "429",
"duration_ms": "50",
"attempt": "1",
"speed": "0",
},
"resource": {"attributes": {"service.name": "cowork"}},
}

receiver.receive(error_record)
# Output: [CRITICAL] api_error: API error: model=claude-sonnet-4-20250514 status=429 error=rate_limit_exceeded

Next Steps

  • Custom alert rules: Add domain-specific rules for slow tools, high error rates, etc.
  • Storage: Pipe CoworkEvent objects into your data store via the on_event callback
  • Grafana/Datadog: Use the dashboard metrics to build real-time monitoring panels
  • Compliance: Combine with Briefcase compliance reports for audit trails