Skip to main content

Event Publishing

Briefcase emits typed BriefcaseEvent records that downstream consumers can subscribe to. Events are published via WebhookEmitter (HTTP) or KafkaPublisher (Confluent Kafka).

BriefcaseEvent

from briefcase.events import BriefcaseEvent

event = BriefcaseEvent(
event_type="decision.captured",
payload={"decision_id": "abc123", "confidence": 0.91},
)

Standard event types:

Event typeEmitted when
decision.capturedA decision record is captured by a handler
decision.low_confidenceConfidence is below the routing threshold
decision.drift_detectedStatistical drift is detected across decisions

Convenience Emitters

from briefcase.events import emit, emit_low_confidence, emit_drift_detected

# Emit a generic event
emit(event_type="decision.captured", payload={"decision_id": "abc123"})

# Emit a low-confidence event
emit_low_confidence(payload={"decision_id": "abc123", "confidence": 0.72})

# Emit a drift event
emit_drift_detected(payload={"window": "1h", "drift_score": 0.18})

These functions read the event_bus from BriefcaseConfig and publish to whichever emitters are registered.


WebhookEmitter

Sends BriefcaseEvent records as HTTP POST requests using the CloudEvents 1.0 event format with optional HMAC-SHA256 request signing.

Constructor

from briefcase.events import WebhookEmitter

emitter = WebhookEmitter(
url="https://hooks.example.com/briefcase",
secret="", # HMAC-SHA256 signing secret (empty = unsigned)
subscribed_events=None, # None = all events; list = filtered
timeout=10.0, # Request timeout in seconds
)
ParameterDefaultDescription
urlrequiredWebhook endpoint URL
secret""HMAC-SHA256 signing secret. If empty, no signature header is sent.
subscribed_eventsNoneList of event types to filter. None sends all events.
timeout10.0HTTP request timeout in seconds

Sent Headers

Content-Type: application/json
ce-specversion: 1.0
ce-type: decision.captured
ce-source: briefcase-ai
ce-id: <uuid>
X-Briefcase-Signature: sha256=<hmac> (only when secret is set)

Example

import asyncio
from briefcase.events import WebhookEmitter, BriefcaseEvent

emitter = WebhookEmitter(
url="https://hooks.example.com/decisions",
secret="my-webhook-secret",
subscribed_events=["decision.low_confidence"], # only low-confidence events
)

event = BriefcaseEvent(
event_type="decision.low_confidence",
payload={"decision_id": "abc123", "confidence": 0.72},
)
asyncio.run(emitter.emit(event))

Signature Verification (Receiver Side)

import hashlib
import hmac

def verify_signature(body: bytes, secret: str, received_sig: str) -> bool:
expected = "sha256=" + hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, received_sig)

KafkaPublisher

Publishes BriefcaseEvent records to a Confluent Kafka topic. The Kafka producer is lazily initialized on the first publish call.

Prerequisites

pip install confluent-kafka

Constructor

from briefcase.events import KafkaPublisher

publisher = KafkaPublisher(
brokers=["localhost:9092"], # List of broker addresses
topic="briefcase-events", # Kafka topic name
)
ParameterDescription
brokersList of Kafka broker addresses (host:port)
topicTopic to publish events to

Example

import asyncio
from briefcase.events import KafkaPublisher, BriefcaseEvent

publisher = KafkaPublisher(
brokers=["kafka-1:9092", "kafka-2:9092"],
topic="ai-decisions",
)

event = BriefcaseEvent(
event_type="decision.captured",
payload={"decision_id": "abc123", "function_name": "claims_review"},
)
asyncio.run(publisher.emit(event))

Events are serialized as JSON and published with decision_id as the message key (for partition consistency).


Configuring via setup()

from briefcase.config import setup
from briefcase.events import WebhookEmitter

setup(
webhook_url="https://hooks.example.com/decisions",
webhook_secret="my-secret",
events=["decision.low_confidence", "decision.drift_detected"],
)

Or register an event bus directly:

setup(event_bus=WebhookEmitter(url="...", secret="..."))

See Also