Skip to content

Sealed Pipeline Monitoring

The Sealed Pipeline is the central data processing backbone of the GOVERN monitoring system. Every event that GOVERN captures flows through this pipeline before appearing in any dashboard or report.

Pipeline Overview

EMIT → An event enters the system
ACCUMULATE → Events are batched in memory windows
ROLLUP → Batches are aggregated into metrics
INSCRIBE → Metrics are persisted to Supabase
RECALL → Persisted data is retrieved for dashboards
BUILD → Metrics drive convergence engine and skill evolution

The pipeline is “sealed” in that each stage has a guaranteed handoff to the next. No stage can be bypassed. If a stage fails, events are held and retried rather than dropped.

Stage 1 — EMIT

Where: API Gateway (packages/api-gateway/src/routes/monitoring.ts) Trigger: External call to POST /api/monitoring/emit Output: Event written to monitoring_events table + placed in accumulator queue

An event enters GOVERN when:

  • A probe container captures an AI inference call
  • A browser extension captures an AI-assisted page interaction
  • A developer SDK call emits a custom governance event
  • The GOVERN API itself emits a build event

Event schema

interface MonitoringEvent {
id: string; // UUID, generated at emit time
systemId: string; // Which AI system this came from
orgId: string; // Which customer org
eventType: 'inference' | 'assessment' | 'policy_eval' | 'custom';
provider?: string; // 'anthropic' | 'openai' | 'groq' | ...
model?: string; // Model identifier
inputTokens?: number;
outputTokens?: number;
latencyMs?: number;
policyResult?: 'pass' | 'flag' | 'block';
policyFlags?: string[];
payload: Record<string, unknown>;
timestamp: string; // UTC ISO-8601
emittedAt: string; // When GOVERN received it
}

Emit rate limits

The API enforces per-org rate limits on emit:

TierLimitWindow
Starter1,000 eventsper hour
Growth50,000 eventsper hour
EnterpriseUnlimited

Rate limit headers are returned on every emit response:

X-RateLimit-Limit: 50000
X-RateLimit-Remaining: 49847
X-RateLimit-Reset: 1744502400

Stage 2 — ACCUMULATE

Where: packages/api-gateway/src/lib/monitoring-accumulator.ts Trigger: Events arriving via EMIT Output: Accumulated batch ready for ROLLUP

The accumulator collects events in 60-second windows per system and per org. At the end of each window, the accumulated batch is handed to ROLLUP.

// Accumulator state (held in Durable Object memory)
interface AccumulatorState {
orgId: string;
systemId: string;
windowStart: string;
windowEnd: string;
eventCount: number;
totalInputTokens: number;
totalOutputTokens: number;
totalLatencyMs: number;
policyFlagCount: number;
modelBreakdown: Record<string, number>;
providerBreakdown: Record<string, number>;
events: MonitoringEvent[]; // Held until ROLLUP
}

Monitoring the accumulator

The Internal Dashboard shows per-org accumulator state:

  • Active windows — How many accumulator windows are currently open
  • Window fill rate — Events per second flowing into open windows
  • Queue depth — Events waiting to enter a window
  • Window close lag — How long after the 60s mark before a window closes (should be < 5s)

Accumulator failure modes

FailureDetectionRecovery
DO memory overflowAccumulator size > 10MBFlush early, log warning
Window never closesWindow age > 10 minutesForce-close, log error
Events arriving after closeLate event detectedAdd to next window with flag

Stage 3 — ROLLUP

Where: packages/api-gateway/src/lib/monitoring-rollup.ts Trigger: Accumulator window closes Output: Hourly, daily, weekly summary metrics

ROLLUP takes a closed accumulator batch and computes aggregate metrics:

interface RollupResult {
orgId: string;
systemId: string;
period: 'hourly' | 'daily' | 'weekly';
periodStart: string;
periodEnd: string;
totalEvents: number;
totalInputTokens: number;
totalOutputTokens: number;
totalCostUsd: number;
avgLatencyMs: number;
p95LatencyMs: number;
policyPassRate: number; // 0.0–1.0
flagRate: number;
blockRate: number;
topModels: Array<{ model: string; count: number }>;
topProviders: Array<{ provider: string; count: number }>;
qualityScore: number; // Composite quality metric
}

Rollup hierarchy

Rollups are computed at three granularities:

  1. Hourly — Computed when each accumulator window closes
  2. Daily — Computed at midnight UTC from hourly rollups
  3. Weekly — Computed at Sunday midnight UTC from daily rollups

Stage 4 — INSCRIBE

Where: Supabase write operations in monitoring-rollup.ts Trigger: ROLLUP completes Output: Persisted rows in monitoring_rollup table

INSCRIBE writes the rollup results to Supabase for durable storage and querying. It uses upsert semantics so that re-processing a window doesn’t create duplicates.

-- monitoring_rollup table (abbreviated)
CREATE TABLE monitoring_rollup (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
org_id UUID NOT NULL REFERENCES orgs(id),
system_id TEXT NOT NULL,
period TEXT NOT NULL CHECK (period IN ('hourly', 'daily', 'weekly')),
period_start TIMESTAMPTZ NOT NULL,
period_end TIMESTAMPTZ NOT NULL,
total_events INTEGER DEFAULT 0,
total_input_tokens BIGINT DEFAULT 0,
total_output_tokens BIGINT DEFAULT 0,
total_cost_usd NUMERIC(10,4) DEFAULT 0,
quality_score NUMERIC(5,4),
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now(),
UNIQUE(org_id, system_id, period, period_start)
);

Stage 5 — RECALL

Where: GET /api/monitoring/rollup route Trigger: Dashboard query or report generation Output: Historical metrics returned to caller

RECALL queries the persisted rollup data and returns it for display. It supports time-range filtering, aggregation level selection, and org-scoped access.

expressiveCode.terminalWindowFallbackTitle
# Example: Get daily rollup for the last 7 days
curl "$API_URL/api/monitoring/rollup?period=daily&days=7&systemId=prod-system-001" \
-H "Authorization: Bearer $AUTH_TOKEN"

Stage 6 — BUILD

Where: Convergence engine + skill evolution system Trigger: INSCRIBE writes new rollup data Output: Convergence score updates, skill evolution triggers

The BUILD stage closes the loop. Rollup data feeds:

  • The convergence engine — updates V(Q) scores for each system
  • The skill evolution system — triggers skill level increases when thresholds are met
  • The cost governor — checks rollup totals against budget thresholds
  • The deploy watchdog — correlates quality score changes with recent deploys

Pipeline Health Metrics

The Internal Dashboard monitors these pipeline health indicators:

MetricHealthyWarningCritical
EMIT latency (P95)< 50ms50–200ms> 200ms
Accumulator queue depth< 100100–500> 500
ROLLUP lag (time from window close to rollup complete)< 30s30–120s> 120s
INSCRIBE success rate> 99.9%99–99.9%< 99%
RECALL query time (P95)< 200ms200–500ms> 500ms

Pipeline Monitoring SQL

-- Check pipeline health: events emitted in last hour
SELECT
COUNT(*) as event_count,
AVG(EXTRACT(EPOCH FROM (inscribed_at - emitted_at)) * 1000) as avg_pipeline_latency_ms
FROM monitoring_events
WHERE emitted_at > NOW() - INTERVAL '1 hour';
-- Check rollup lag
SELECT
period,
MAX(EXTRACT(EPOCH FROM (updated_at - period_end)) / 60) as max_lag_minutes
FROM monitoring_rollup
WHERE period_end > NOW() - INTERVAL '24 hours'
GROUP BY period;