Sealed Pipeline Monitoring
The Sealed Pipeline is the central data processing backbone of the GOVERN monitoring system. Every event that GOVERN captures flows through this pipeline before appearing in any dashboard or report.
Pipeline Overview
EMIT → An event enters the systemACCUMULATE → Events are batched in memory windowsROLLUP → Batches are aggregated into metricsINSCRIBE → Metrics are persisted to SupabaseRECALL → Persisted data is retrieved for dashboardsBUILD → Metrics drive convergence engine and skill evolutionThe pipeline is “sealed” in that each stage has a guaranteed handoff to the next. No stage can be bypassed. If a stage fails, events are held and retried rather than dropped.
Stage 1 — EMIT
Where: API Gateway (packages/api-gateway/src/routes/monitoring.ts)
Trigger: External call to POST /api/monitoring/emit
Output: Event written to monitoring_events table + placed in accumulator queue
An event enters GOVERN when:
- A probe container captures an AI inference call
- A browser extension captures an AI-assisted page interaction
- A developer SDK call emits a custom governance event
- The GOVERN API itself emits a build event
Event schema
interface MonitoringEvent { id: string; // UUID, generated at emit time systemId: string; // Which AI system this came from orgId: string; // Which customer org eventType: 'inference' | 'assessment' | 'policy_eval' | 'custom'; provider?: string; // 'anthropic' | 'openai' | 'groq' | ... model?: string; // Model identifier inputTokens?: number; outputTokens?: number; latencyMs?: number; policyResult?: 'pass' | 'flag' | 'block'; policyFlags?: string[]; payload: Record<string, unknown>; timestamp: string; // UTC ISO-8601 emittedAt: string; // When GOVERN received it}Emit rate limits
The API enforces per-org rate limits on emit:
| Tier | Limit | Window |
|---|---|---|
| Starter | 1,000 events | per hour |
| Growth | 50,000 events | per hour |
| Enterprise | Unlimited | — |
Rate limit headers are returned on every emit response:
X-RateLimit-Limit: 50000X-RateLimit-Remaining: 49847X-RateLimit-Reset: 1744502400Stage 2 — ACCUMULATE
Where: packages/api-gateway/src/lib/monitoring-accumulator.ts
Trigger: Events arriving via EMIT
Output: Accumulated batch ready for ROLLUP
The accumulator collects events in 60-second windows per system and per org. At the end of each window, the accumulated batch is handed to ROLLUP.
// Accumulator state (held in Durable Object memory)interface AccumulatorState { orgId: string; systemId: string; windowStart: string; windowEnd: string; eventCount: number; totalInputTokens: number; totalOutputTokens: number; totalLatencyMs: number; policyFlagCount: number; modelBreakdown: Record<string, number>; providerBreakdown: Record<string, number>; events: MonitoringEvent[]; // Held until ROLLUP}Monitoring the accumulator
The Internal Dashboard shows per-org accumulator state:
- Active windows — How many accumulator windows are currently open
- Window fill rate — Events per second flowing into open windows
- Queue depth — Events waiting to enter a window
- Window close lag — How long after the 60s mark before a window closes (should be < 5s)
Accumulator failure modes
| Failure | Detection | Recovery |
|---|---|---|
| DO memory overflow | Accumulator size > 10MB | Flush early, log warning |
| Window never closes | Window age > 10 minutes | Force-close, log error |
| Events arriving after close | Late event detected | Add to next window with flag |
Stage 3 — ROLLUP
Where: packages/api-gateway/src/lib/monitoring-rollup.ts
Trigger: Accumulator window closes
Output: Hourly, daily, weekly summary metrics
ROLLUP takes a closed accumulator batch and computes aggregate metrics:
interface RollupResult { orgId: string; systemId: string; period: 'hourly' | 'daily' | 'weekly'; periodStart: string; periodEnd: string; totalEvents: number; totalInputTokens: number; totalOutputTokens: number; totalCostUsd: number; avgLatencyMs: number; p95LatencyMs: number; policyPassRate: number; // 0.0–1.0 flagRate: number; blockRate: number; topModels: Array<{ model: string; count: number }>; topProviders: Array<{ provider: string; count: number }>; qualityScore: number; // Composite quality metric}Rollup hierarchy
Rollups are computed at three granularities:
- Hourly — Computed when each accumulator window closes
- Daily — Computed at midnight UTC from hourly rollups
- Weekly — Computed at Sunday midnight UTC from daily rollups
Stage 4 — INSCRIBE
Where: Supabase write operations in monitoring-rollup.ts
Trigger: ROLLUP completes
Output: Persisted rows in monitoring_rollup table
INSCRIBE writes the rollup results to Supabase for durable storage and querying. It uses upsert semantics so that re-processing a window doesn’t create duplicates.
-- monitoring_rollup table (abbreviated)CREATE TABLE monitoring_rollup ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), org_id UUID NOT NULL REFERENCES orgs(id), system_id TEXT NOT NULL, period TEXT NOT NULL CHECK (period IN ('hourly', 'daily', 'weekly')), period_start TIMESTAMPTZ NOT NULL, period_end TIMESTAMPTZ NOT NULL, total_events INTEGER DEFAULT 0, total_input_tokens BIGINT DEFAULT 0, total_output_tokens BIGINT DEFAULT 0, total_cost_usd NUMERIC(10,4) DEFAULT 0, quality_score NUMERIC(5,4), created_at TIMESTAMPTZ DEFAULT now(), updated_at TIMESTAMPTZ DEFAULT now(), UNIQUE(org_id, system_id, period, period_start));Stage 5 — RECALL
Where: GET /api/monitoring/rollup route
Trigger: Dashboard query or report generation
Output: Historical metrics returned to caller
RECALL queries the persisted rollup data and returns it for display. It supports time-range filtering, aggregation level selection, and org-scoped access.
# Example: Get daily rollup for the last 7 dayscurl "$API_URL/api/monitoring/rollup?period=daily&days=7&systemId=prod-system-001" \ -H "Authorization: Bearer $AUTH_TOKEN"Stage 6 — BUILD
Where: Convergence engine + skill evolution system Trigger: INSCRIBE writes new rollup data Output: Convergence score updates, skill evolution triggers
The BUILD stage closes the loop. Rollup data feeds:
- The convergence engine — updates V(Q) scores for each system
- The skill evolution system — triggers skill level increases when thresholds are met
- The cost governor — checks rollup totals against budget thresholds
- The deploy watchdog — correlates quality score changes with recent deploys
Pipeline Health Metrics
The Internal Dashboard monitors these pipeline health indicators:
| Metric | Healthy | Warning | Critical |
|---|---|---|---|
| EMIT latency (P95) | < 50ms | 50–200ms | > 200ms |
| Accumulator queue depth | < 100 | 100–500 | > 500 |
| ROLLUP lag (time from window close to rollup complete) | < 30s | 30–120s | > 120s |
| INSCRIBE success rate | > 99.9% | 99–99.9% | < 99% |
| RECALL query time (P95) | < 200ms | 200–500ms | > 500ms |
Pipeline Monitoring SQL
-- Check pipeline health: events emitted in last hourSELECT COUNT(*) as event_count, AVG(EXTRACT(EPOCH FROM (inscribed_at - emitted_at)) * 1000) as avg_pipeline_latency_msFROM monitoring_eventsWHERE emitted_at > NOW() - INTERVAL '1 hour';
-- Check rollup lagSELECT period, MAX(EXTRACT(EPOCH FROM (updated_at - period_end)) / 60) as max_lag_minutesFROM monitoring_rollupWHERE period_end > NOW() - INTERVAL '24 hours'GROUP BY period;