Cowork End-to-End Workflow
This guide walks through a complete integration: receiving Cowork OTLP events, applying PII redaction, correlating by prompt, generating dashboard metrics, and firing alerts — all in a single pipeline.
Prerequisites
pip install briefcase-ai
The Pipeline
Cowork Agent → OTLP logs → CoworkEventReceiver → RedactionFilter
│
┌──────────────────┼──────────────────┐
▼ ▼ ▼
PromptCorrelation CoworkDashboards CoworkAlertManager
│ │ │
▼ ▼ ▼
Prompt traces Cost / Tool / PagerDuty / Slack
API / Cache notifications
Full Example
"""
End-to-end Cowork event processing pipeline.
Demonstrates: receive → redact → correlate → dashboard → alert.
"""
from briefcase_ai.cowork import (
CoworkEventReceiver,
CoworkRedactionFilter,
PromptCorrelationEngine,
CoworkDashboards,
CoworkAlertManager,
)
# ──────────────────────────────────────────────────────────────────
# Step 1: Configure PII redaction
# ──────────────────────────────────────────────────────────────────
redaction = CoworkRedactionFilter(
enabled=True,
redact_prompt_content=True, # Full prompt replacement
custom_patterns={
# Add org-specific patterns
"account_number": r"\bACCT-\d{10}\b",
},
)
# ──────────────────────────────────────────────────────────────────
# Step 2: Configure alerts
# ──────────────────────────────────────────────────────────────────
alert_log = []
def on_alert(alert):
"""Handle alerts — send to PagerDuty, Slack, etc."""
alert_log.append(alert)
print(f"[{alert.severity.upper()}] {alert.alert_type}: {alert.message}")
alert_manager = CoworkAlertManager(
handlers=[on_alert],
cost_threshold_usd=0.10, # Warn on expensive requests
)
# ──────────────────────────────────────────────────────────────────
# Step 3: Create the receiver with redaction + alert callback
# ──────────────────────────────────────────────────────────────────
receiver = CoworkEventReceiver(
redaction_filter=redaction,
on_event=lambda evt: alert_manager.evaluate(evt),
validate_strict=False,
)
# ──────────────────────────────────────────────────────────────────
# Step 4: Simulate incoming OTLP log records from Cowork
# ──────────────────────────────────────────────────────────────────
# In production, these come from your OTLP logs endpoint.
# Here we simulate a typical prompt interaction.
prompt_id = "550e8400-e29b-41d4-a716-446655440000"
sample_records = [
# User types a prompt
{
"body": {"event_type": "user_prompt"},
"attributes": {
"session.id": "sess-abc",
"organization.id": "org-acme",
"user.account_uuid": "acct-001",
"user.id": "alice",
"user.email": "alice@acme.com",
"terminal.type": "vscode",
"event.timestamp": "2026-03-01T10:00:00Z",
"event.sequence": 1,
"prompt.id": prompt_id,
"prompt": "Read the config file and update the database URL",
"prompt_length": "49",
},
"resource": {
"attributes": {
"service.name": "cowork",
"service.version": "1.0.25",
}
},
},
# Tool decision: allow Read
{
"body": {"event_type": "tool_decision"},
"attributes": {
"session.id": "sess-abc",
"organization.id": "org-acme",
"user.account_uuid": "acct-001",
"user.id": "alice",
"user.email": "alice@acme.com",
"terminal.type": "vscode",
"event.timestamp": "2026-03-01T10:00:01Z",
"event.sequence": 2,
"prompt.id": prompt_id,
"tool_name": "Read",
"decision": "allow",
"source": "auto",
},
"resource": {"attributes": {"service.name": "cowork"}},
},
# Tool result: Read succeeds
{
"body": {"event_type": "tool_result"},
"attributes": {
"session.id": "sess-abc",
"organization.id": "org-acme",
"user.account_uuid": "acct-001",
"user.id": "alice",
"user.email": "alice@acme.com",
"terminal.type": "vscode",
"event.timestamp": "2026-03-01T10:00:02Z",
"event.sequence": 3,
"prompt.id": prompt_id,
"tool_name": "Read",
"success": "true",
"duration_ms": "45",
"tool_result_size_bytes": "2048",
},
"resource": {"attributes": {"service.name": "cowork"}},
},
# API request: LLM call
{
"body": {"event_type": "api_request"},
"attributes": {
"session.id": "sess-abc",
"organization.id": "org-acme",
"user.account_uuid": "acct-001",
"user.id": "alice",
"user.email": "alice@acme.com",
"terminal.type": "vscode",
"event.timestamp": "2026-03-01T10:00:03Z",
"event.sequence": 4,
"prompt.id": prompt_id,
"model": "claude-sonnet-4-20250514",
"cost_usd": "0.0035",
"duration_ms": "1450",
"input_tokens": "1200",
"output_tokens": "350",
"cache_read_tokens": "800",
"cache_creation_tokens": "200",
"speed": "241.38",
},
"resource": {"attributes": {"service.name": "cowork"}},
},
# Tool result: Edit succeeds
{
"body": {"event_type": "tool_result"},
"attributes": {
"session.id": "sess-abc",
"organization.id": "org-acme",
"user.account_uuid": "acct-001",
"user.id": "alice",
"user.email": "alice@acme.com",
"terminal.type": "vscode",
"event.timestamp": "2026-03-01T10:00:04Z",
"event.sequence": 5,
"prompt.id": prompt_id,
"tool_name": "Edit",
"success": "true",
"duration_ms": "12",
"tool_result_size_bytes": "512",
},
"resource": {"attributes": {"service.name": "cowork"}},
},
]
# ──────────────────────────────────────────────────────────────────
# Step 5: Process the batch
# ──────────────────────────────────────────────────────────────────
accepted = receiver.receive_batch(sample_records)
print(f"\nAccepted {len(accepted)} of {len(sample_records)} events")
print(f"Validation errors: {len(receiver.validation_errors)}")
# ──────────────────────────────────────────────────────────────────
# Step 6: Correlate events by prompt.id
# ──────────────────────────────────────────────────────────────────
engine = PromptCorrelationEngine()
engine.add_many(receiver.events)
for trace in engine.traces():
print(f"\n── Prompt Trace: {trace.prompt_id} ──")
print(f" User: {trace.user_id} ({trace.organization_id})")
print(f" Events: {trace.event_count}")
print(f" Cost: ${trace.total_cost_usd:.4f}")
print(f" Tokens: {trace.total_input_tokens} in / {trace.total_output_tokens} out")
print(f" Cache: {trace.total_cache_read_tokens} read / "
f"{trace.total_cache_creation_tokens} created")
print(f" Tools: {trace.tool_count} ({trace.tool_failure_count} failures)")
print(f" API errors: {trace.api_error_count}")
# The user prompt is redacted
if trace.user_prompt:
print(f" Prompt (redacted): {trace.user_prompt.attributes.get('prompt')}")
# ──────────────────────────────────────────────────────────────────
# Step 7: Generate dashboard metrics
# ──────────────────────────────────────────────────────────────────
dashboards = CoworkDashboards()
dashboards.ingest_many(receiver.events)
print("\n── Cost Summary ──")
total = dashboards.cost_total()
print(f" Total: ${total.total_cost_usd:.4f} across {total.request_count} requests")
print(f" Avg per request: ${total.avg_cost_per_request:.4f}")
for model, cost in total.models.items():
print(f" {model}: ${cost:.4f}")
print("\n── Tool Usage ──")
for name, summary in dashboards.tool_usage().items():
print(f" {name}: {summary.invocation_count} calls, "
f"{summary.success_rate:.0%} success, "
f"{summary.avg_duration_ms:.0f}ms avg")
print("\n── Cache Efficiency ──")
for model, summary in dashboards.cache_efficiency().items():
print(f" {model}: {summary.cache_hit_ratio:.0%} hit ratio")
# ──────────────────────────────────────────────────────────────────
# Step 8: Check alerts
# ──────────────────────────────────────────────────────────────────
print(f"\n── Alerts Fired: {len(alert_log)} ──")
for alert in alert_log:
print(f" [{alert.severity}] {alert.alert_type}: {alert.message}")
Expected Output
Accepted 5 of 5 events
Validation errors: 0
── Prompt Trace: 550e8400-e29b-41d4-a716-446655440000 ──
User: alice (org-acme)
Events: 5
Cost: $0.0035
Tokens: 1200 in / 350 out
Cache: 800 read / 200 created
Tools: 2 (0 failures)
API errors: 0
Prompt (redacted): [REDACTED_PROMPT]
── Cost Summary ──
Total: $0.0035 across 1 requests
Avg per request: $0.0035
claude-sonnet-4-20250514: $0.0035
── Tool Usage ──
Read: 1 calls, 100% success, 45ms avg
Edit: 1 calls, 100% success, 12ms avg
── Cache Efficiency ──
claude-sonnet-4-20250514: 80% hit ratio
── Alerts Fired: 0 ──
What Happens with Errors
When an API error or tool failure occurs, the alert manager fires automatically:
error_record = {
"body": {"event_type": "api_error"},
"attributes": {
"session.id": "sess-abc",
"organization.id": "org-acme",
"user.account_uuid": "acct-001",
"user.id": "alice",
"user.email": "alice@acme.com",
"terminal.type": "vscode",
"event.timestamp": "2026-03-01T10:01:00Z",
"event.sequence": 10,
"prompt.id": "another-prompt-id",
"model": "claude-sonnet-4-20250514",
"error": "rate_limit_exceeded",
"status_code": "429",
"duration_ms": "50",
"attempt": "1",
"speed": "0",
},
"resource": {"attributes": {"service.name": "cowork"}},
}
receiver.receive(error_record)
# Output: [CRITICAL] api_error: API error: model=claude-sonnet-4-20250514 status=429 error=rate_limit_exceeded
Next Steps
- Custom alert rules: Add domain-specific rules for slow tools, high error rates, etc.
- Storage: Pipe
CoworkEventobjects into your data store via theon_eventcallback - Grafana/Datadog: Use the dashboard metrics to build real-time monitoring panels
- Compliance: Combine with Briefcase compliance reports for audit trails