Skip to main content

External Data Tracking

Use external data tracking to snapshot and monitor upstream dependencies that influence AI decisions.

Overview

This feature manages data fetched from APIs, databases, and files. It captures snapshots on configurable policies, detects drift signals, and enforces retention windows for operational review.

What Engineers Use It For

  • Track upstream data changes that can alter model behavior
  • Compare current inputs to historical snapshots
  • Trigger review when drift exceeds configured thresholds
  • Keep retention and snapshot policies explicit in code

Features

  • Flexible Snapshot Policies - Configure frequency, retention, and compression
  • Drift Detection - Identify meaningful changes in external data
  • Multi-Source Tracking - API calls, database queries, file fetches
  • Automatic Retention - Enforce maximum snapshot counts and age limits
  • Change Threshold - Configure sensitivity for drift detection
  • Compression - Optional gzip compression of snapshots

Quick Start

from briefcase_ai.external_data import ExternalDataTracker, SnapshotPolicy, SnapshotFrequency

# Create tracker with policy
tracker = ExternalDataTracker()

# Track API call
result = tracker.track_api_call(
api_name="risk_scoring_api",
endpoint="/api/v1/events",
method="GET",
response_data={"status": "success", "data": [...]},
store_snapshot=True
)

# Detect drift
drift_report = tracker.detect_drift(
source="risk_scoring_api",
change_threshold=0.15 # 15% change
)

if drift_report.has_drift:
print(f"Drift detected: {drift_report.summary}")

# Enforce retention
cleanup = tracker.enforce_retention(source="risk_scoring_api")
print(f"Cleaned up {cleanup.snapshots_removed} old snapshots")

Snapshot Policies

SnapshotFrequency Enum

Controls how often snapshots are taken:

class SnapshotFrequency(Enum):
EVERY_CALL # Snapshot on every data fetch
ON_CHANGE # Snapshot only when data changes
HOURLY # One snapshot per hour
DAILY # One snapshot per day
WEEKLY # One snapshot per week

SnapshotPolicy Configuration

class SnapshotPolicy:
frequency: SnapshotFrequency # Snapshot frequency
retention_days: int # Keep snapshots for N days
change_threshold: float # Drift sensitivity (0.0-1.0)
max_snapshots: int # Maximum snapshots to retain
compress: bool # Gzip compress snapshots

Example policies:

# High-frequency tracking
strict_policy = SnapshotPolicy(
frequency=SnapshotFrequency.EVERY_CALL,
retention_days=90,
max_snapshots=1000,
compress=True
)

# Event-driven tracking
event_policy = SnapshotPolicy(
frequency=SnapshotFrequency.ON_CHANGE,
retention_days=365,
change_threshold=0.1, # 10% change triggers snapshot
max_snapshots=500
)

# Daily batch tracking
batch_policy = SnapshotPolicy(
frequency=SnapshotFrequency.DAILY,
retention_days=180,
max_snapshots=365,
compress=True
)

Tracking Operations

track_api_call

Capture data from external API:

result = tracker.track_api_call(
api_name="payment_api", # required
endpoint="/v1/transactions", # required
method="GET", # required
response_data={ # required
"transactions": [...],
"total": 1500.00,
"currency": "USD"
},
version="v1.2", # optional
status_code=200, # optional, default 200
record_count=42, # optional
store_snapshot=True # optional, default True
)

# Returns dict with snapshot ID, data hash, and metadata
print(f"Snapshot: {result['snapshot_id']}")
print(f"Hash: {result['data_hash']}")

track_db_query

Capture data from database query:

result = tracker.track_db_query(
source="claims_db",
query="SELECT * FROM claims WHERE status='pending'",
result_count=42,
rows=[...]
)

track_file_fetch

Capture data from file source:

result = tracker.track_file_fetch(
source="s3_policies",
path="s3://bucket/policies/v2/",
file_count=156,
total_bytes=52428800 # 50MB
)

Drift Detection

detect_drift

Identify meaningful changes in external data:

report = tracker.detect_drift(
source="pricing_api",
change_threshold=0.05 # 5% change threshold
)

# DriftReport properties
print(f"Has drift: {report.has_drift}")
print(f"Change magnitude: {report.change_magnitude:.2%}")
print(f"Summary: {report.summary}")

if report.has_drift:
for change in report.changes:
print(f" {change.field}: {change.old_value} -> {change.new_value}")

Retention Management

enforce_retention

Automatically clean up old snapshots:

cleanup = tracker.enforce_retention(
source="api_data",
policy=retention_policy
)

# CleanupResult properties
print(f"Snapshots removed: {cleanup.snapshots_removed}")
print(f"Storage freed: {cleanup.bytes_freed / 1024 / 1024:.1f} MB")
print(f"Retention policy: {cleanup.policy_applied}")

Constructor

ExternalDataTracker(
versioned_client=None, # Optional lakeFS client
repository=None, # Optional repository name
branch="main", # Branch name (default "main")
default_policy=None # Optional SnapshotPolicy
)

Usage Patterns

API Response Monitoring

from briefcase_ai.external_data import ExternalDataTracker

tracker = ExternalDataTracker()

def get_claims(claim_id):
api_response = external_api.get(f"/claims/{claim_id}")
tracker.track_api_call(
api_name="claims_api",
endpoint=f"/claims/{claim_id}",
method="GET",
response_data=api_response,
store_snapshot=True
)
return api_response

Database Change Tracking

def get_pending_claims():
results = db.query("SELECT * FROM claims WHERE status='pending'")
tracker.track_db_query(
source="claims_db",
query="SELECT * FROM claims WHERE status='pending'",
result_count=len(results),
rows=results
)
return results

Installation

External data tracking is included in the core SDK:

pip install "briefcase-ai"

No additional dependencies needed.