Skip to content

refactor: tech-debt cleanup + partial modularization (coordinator + provider)#444

Merged
Gajesh2007 merged 24 commits into
masterfrom
chore/tech-debt-modularize
Jun 22, 2026
Merged

refactor: tech-debt cleanup + partial modularization (coordinator + provider)#444
Gajesh2007 merged 24 commits into
masterfrom
chore/tech-debt-modularize

Conversation

@Gajesh2007

@Gajesh2007 Gajesh2007 commented Jun 22, 2026

Copy link
Copy Markdown
Member

Summary

Behavior-preserving tech-debt cleanup + first-pass modularization across the coordinator (Go) and provider (Swift), done in parallel in an isolated worktree. 20 commits, every one green (go build/vet/test, swift build/test, gofmt). No protocol/telemetry wire changes; libs/ untouched; no root files changed.

Highlights:

  • ~3,300 LOC of dead code removed — coordinator's parallel score-based routing brain (ScoreProvider/FindProvider/FindProviderWithTrust + orphans) and its false-confidence tests; the dead vllm-mlx full-stack test; dead r2SitePackagesCDNURL config; the envconfig re-export shim; provider's dead single-request inference/SSE/formatter stack; unused dev-only targets (vlm-smoke, kv-engine-demo). The 40 MB tracked coordinator binary is now untracked.
  • ~4.4k LOC out of the shipped provider binaryBenchmark/ (incl. fatalError stub caches) extracted into its own ProviderBenchmark SwiftPM target, no longer linked into every provider.
  • Duplication collapsed — coordinator alias-fallback helpers merged into one; provider's 4-way Harmony channel stripping and 4-way formatDuration each unified to a single helper.
  • Silent failures fixed — coordinator no longer swallows Stripe/settlement/credit/WS-send errors; provider no longer emits silent {} on encode failure (both now log + emit telemetry).
  • Security-critical test gaps closed — coordinator's first-ever auth/ tests (incl. JWT alg-confusion) + env/telemetry; provider telemetry wire-symmetry test (makes the Go symmetry test's cross-language claim real) + darkbloom-publish hashing tests.
  • Modularization startedconsumer.go 5,700→4,884 (request_introspection.go + apikey_handlers.go); ProviderLoop.swift 3,660→3,577 (ProviderLoopConfig + ProviderLogger).

The deepest god-file splits (routing-gate pipeline unification, runInferenceRequest orchestrator, store.Store interface segregation, registry.go/provider.go splits; deep ProviderLoop/BatchScheduler actor splits) are intentionally deferred to the stacked follow-up branch chore/tech-debt-godfiles (base = this branch) — they're either high-drift-risk or require an access-control tradeoff that warrants its own review.

Before / After

Behavior — externally unchanged; dead paths and silent failures removed

flowchart LR
  subgraph Before
    R1[dispatch] --> P1["ReserveProviderEx → selectBestCandidate"]
    R1 -. "dead (tests only)" .-> S1["ScoreProvider / FindProvider"]
    B1["Stripe webhook / settlement"] --> E1["errors silently swallowed"]
  end
  subgraph After
    R2[dispatch] --> P2["ReserveProviderEx → selectBestCandidate"]
    B2["Stripe webhook / settlement"] --> E2["errors logged + DogStatsD metric"]
  end
Loading

Code structure

flowchart TB
  subgraph Before
    C1["api/consumer.go ~5,700 LOC (inference + keys + introspection)"]
    PC1["ProviderCore library"] --> BM1["Benchmark/ ~4.4k LOC + fatalError stubs"]
    PL1["ProviderLoop.swift 3,660 (inline config/logger)"]
    D1["dead: single-request inference / SSE / formatter"]
  end
  subgraph After
    C2["api/consumer.go 4,884"] --> RI["request_introspection.go"]
    C2 --> AK["apikey_handlers.go"]
    PC2["ProviderCore library"]
    BM2["ProviderBenchmark target (bench / CLI only)"]
    PL2["ProviderLoop.swift 3,577"] --> CFG["ProviderLoopConfig.swift"]
    PL2 --> LOG["ProviderLogger.swift"]
  end
Loading

Test plan

  • go build ./... + go vet ./... clean
  • go test ./... green (registry suite also verified under -race)
  • swift build green; swift test no new failures (1052 swift-testing + new XCTest suites, 0 fail)
  • gofmt -l empty; branch scoped to coordinator/ + provider-swift/ only
  • Reviewer pass

Notes

  • No protocol/telemetry wire-format changes — the Swift Protocol/ mirror needs nothing.
  • Deferred deep splits land on stacked branch chore/tech-debt-godfiles (base = this branch).

View with Codesmith Autofix with Codesmith
Need help on this PR? Tag /codesmith with what you need. Autofix is disabled.

Gajesh2007 and others added 20 commits June 21, 2026 21:27
…r stack

The production inference path is MultiModelBatchSchedulerEngine+Translation
+ BatchScheduler. The single-request driver, its SSE formatter, and the
chat-prompt formatter were only referenced by tests. Remove them and the
types they orphaned:

- delete SingleRequestInference.swift, SSEChunkFormatting.swift,
  ChatPromptFormatting.swift
- drop orphaned InferenceUsage + UsageAccumulator (UsageAccounting.swift),
  keeping the live StreamedGenerationUsage / CancelledGenerationTerminal
- drop orphaned SSEChunk + streaming ChatCompletionChunk/ChunkChoice/
  ChunkDelta (ChatRequest.swift); keep ChunkUsage + ChatCompletionResponse
  (still used by the live response path / InferenceLiveTests)
- trim InferenceTests.swift to the live cancellation-registry test
- fix the now-dangling ChatPromptFormatter mention in the translate() doc

Behavior-preserving: swift build green; swift test 1049 pass / 0 fail
(baseline 1054 minus the 5 removed dead tests).

Co-authored-by: Cursor <cursoragent@cursor.com>
Remove the score-based FindProvider / FindProviderWithTrust / ScoreProvider
cluster (plus the now-unused TrustMultiplier and the MaxConcurrentRequests
alias) from the registry. Production dispatch routes through the cost-based
ReserveProviderEx -> selectBestCandidate path; these functions were a second
routing brain reachable only from tests, giving false confidence by testing
behavior that diverged from production (6m vs 16m challenge freshness window,
score-vs-cost selection, crashed-provider routing).

Tests that exercised only the dead path (scoring math, score-based selection,
dead-specific challenge boundaries, crashed-only routing) are deleted. Tests
that used FindProvider as a routability probe for gates SHARED with production
(privacy caps, manifest/SIP checks, trust, catalog, eviction, challenge
freshness, concurrency cap) are repointed to a findRoutableProvider helper that
drives the real ReserveProviderEx path; stale challenge times were bumped past
the 16m production freshness window to preserve staleness coverage.

No production behavior change. build/vet/test/gofmt all green.

Co-authored-by: Cursor <cursoragent@cursor.com>
…es config

- git rm --cached coordinator/coordinator: the 40MB built binary was tracked
  despite already being in .gitignore. Stop committing the artifact.
- Remove Server.r2SitePackagesCDNURL plus its ServerConfig field and env wiring
  (EIGENINFERENCE_R2_SITE_PACKAGES_CDN_URL). It was written from env but never
  read: install.sh stopped substituting __DARKBLOOM_R2_SITE_PACKAGES_CDN_URL__
  post-Swift-cutover (v0.5.0+), so the field was pure dead config.

Co-authored-by: Cursor <cursoragent@cursor.com>
The 40MB coordinator/coordinator artifact was tracked despite already being
listed in .gitignore. Untrack it so builds don't keep committing the binary.

Co-authored-by: Cursor <cursoragent@cursor.com>
config/config.go only re-exported env helpers (EnvOr/FirstNonEmpty/EnvFloat/
EnvInt). FirstNonEmpty/EnvFloat/EnvInt had zero callers, and EnvOr was used only
by app_config.go in the same package. Point app_config.go at env.EnvOr directly,
move the package doc onto app_config.go, and delete the shim so there is a single
env-helper source of truth.

Co-authored-by: Cursor <cursoragent@cursor.com>
…elper

maybeFallbackAliasCapacity and maybeFallbackAliasTTFT were near-identical
copies differing only in the TTFT ceiling check and the failure-path return
model. Merge into maybeFallbackAlias(parsed, mode, ...) with an aliasFallbackMode
(capacity vs ttft) flag; capacity callers pass ttftThreshold=0. Failure-path
return semantics are preserved exactly (capacity reports currentModel, TTFT
reports the probed previous build). No behavior change.

Co-authored-by: Cursor <cursoragent@cursor.com>
…target

Benchmark/ (~5.2k LOC, incl. fatalError protocol-conformance stub caches)
was compiled into the ProviderCore library and thus linked into every
provider/enclave binary. Move it into a dedicated `ProviderBenchmark`
SwiftPM target that depends on ProviderCore, so only the benchmark-bearing
executables link it (darkbloom `benchmark`, kv-quant-gate, kv-attn-selftest,
kv-engine-demo) and the shipped enclave/publish/core surface no longer
carries benchmark code.

- new target `ProviderBenchmark` (Sources/ProviderBenchmark), 18 files moved
- the two engine-facing types the live scheduler needs stay in ProviderCore:
  KVQuantCandidateMode (split out of KVQuantTypes.swift) and the
  self-contained KVQuantPolicy.swift, now under ProviderCore/Inference/KVQuant
- widen KVEstimation / parseModelArchitecture / resolvedKVBytesPerToken /
  ModelArchitecture to public (already used by the live scheduler) so the
  benchmark target can reach them across the module boundary
- wire ProviderBenchmark into the four executables + the four benchmark
  test files; add `import ProviderCore` to the moved files

Behavior-preserving: swift build green; swift test 1049 pass / 0 fail.
Co-authored-by: Cursor <cursoragent@cursor.com>
…comments

- Delete api/fullstack_integration_test.go (~1155 LOC): it spawned real vllm-mlx
  subprocesses (the retired subprocess backend), was gated behind
  LIVE_FULLSTACK_TEST=1 (never run in CI), and the repo-root e2e/ harness covers
  the current MLX-Swift provider over the WebSocket relay.
- Refresh stale vllm-mlx comments to MLX-Swift / E2E-relay reality in consumer.go,
  protocol/messages.go, and registry.go (comments only; no wire-type changes).
- Fix install.sh path comment (internal/api -> api) in server.go.
- Reword the telemetry symmetry test comment so it states the Go test pins the
  shape and the Swift/TS mirrors must match, without asserting mirror tests this
  package cannot see.

Co-authored-by: Cursor <cursoragent@cursor.com>
…nonical

coordinator/protocol/telemetry_symmetry_test.go references a Swift mirror that
did not exist. Add Tests/ProviderCoreTests/TelemetrySymmetryTests.swift to pin:
- enum casing (source=provider, severity=error, kind=backend_crash)
- snake_case keys + nil-optional omission (machine_id/account_id/request_id/
  stack/fields), matching Go omitempty
- the TelemetryKind set + count (mirror of Go KnownKinds())
- TelemetrySource/TelemetrySeverity raw values

Add CaseIterable to TelemetryKind (the Swift equivalent of KnownKinds()) and
fix two stale `coordinator/internal/protocol|api/...` comment paths to the
current `coordinator/protocol|api/...` locations.

swift build green; swift test 1052 pass / 0 fail (+3 new).

Co-authored-by: Cursor <cursoragent@cursor.com>
… errors

Replace `_ = ...` discards with explicit error handling + a DogStatsD counter:

- Stripe webhook: CompleteBillingSession / Referral.Apply failures now log an
  error and increment billing.{session_complete,referral_apply}_failed instead
  of vanishing after a successful deposit.
- Settlement: failed consumer refund and platform-fee Credit calls now log an
  error and increment billing.credit_failed{op} — these are financial and must
  never be silent (a dropped refund over-charges the consumer).
- Provider WS: best-effort EnqueueText sends (runtime/trust status) now log at
  debug and increment provider.enqueue_failed{msg} so a wedged send is visible.

Co-authored-by: Cursor <cursoragent@cursor.com>
The assistant-message Harmony wrapper (role guard + content/thinking/
reasoning_content field iteration) was duplicated in JinjaSanitization and
TemplateRenderCheck. Centralize it in ProviderCoreFoundation alongside the
existing string transform:

- add `harmonyAssistantTextFields` + `stripHarmonyFramingFromAssistantMessage`
  in HarmonySanitization.swift (Linux-buildable, no Apple deps)
- TemplateRenderCheck now calls the shared helper (drops its private copy)
- JinjaSanitization iterates the shared field-list constant
- BatchScheduler already routes through the shared `stripHarmonyChannelFraming`

Behavior-preserving: swift build green; swift test 1052 pass / 0 fail.
Co-authored-by: Cursor <cursoragent@cursor.com>
…uting scope

- auth: first unit tests for the previously-untested package — PEM/key parsing
  (NewPrivyAuth), ES256 JWT verification incl. expiry/issuer/audience/wrong-key/
  alg-confusion rejection (VerifyToken), user provisioning (GetOrCreateUser), and
  Config.Check. Uses a real in-memory store and real generated keys; no mocks of
  the unit under test.
- env: 100% coverage of EnvOr/FirstNonEmpty/EnvFloat/EnvInt/EnvBool.
- telemetry: Emitter default-fill, metric counting (capturing fake sink),
  nil-emitter/nil-sink safety, and version defaulting.
- registry: document that pendingModelLoads is consulted ONLY by warm-pool/swap
  planning and never participates in routing/admission (see AGENTS.md state model).

Co-authored-by: Cursor <cursoragent@cursor.com>
Four near-identical duration formatters had diverged in user-visible ways
(download progress shows seconds; idle/status logs use compact "2h30m";
the scheduler uses spaced "2h 30m"; trailing zero-minute elision differed).
Add one `DurationFormatting.compact` helper whose flags reproduce each style
exactly, and delegate the four call-site helpers to it. Output is unchanged
at every call site; the logic now lives in one place.

Behavior-preserving: swift build green; swift test 1052 pass / 0 fail.
Co-authored-by: Cursor <cursoragent@cursor.com>
…sumer.go

Move the pure request-introspection / token-estimation cluster out of the 5.7k-LOC
consumer.go into a focused request_introspection.go (token + billing cost
estimation, media/tool detection, cache-affinity keys, provider-serial allowlist
parsing, and the media-cost constants). Same package, no behavior change — these
functions have no Server dependency. consumer.go drops from ~5700 to ~5311 LOC.

Co-authored-by: Cursor <cursoragent@cursor.com>
…ilent {}

Heartbeat / outbound / inference-error encoding used `try?` and degraded to
hardcoded `{}` or minimal JSON on failure — silent protocol corruption the
coordinator can't attribute. Replace with do/catch that logs at error and
emits a `protocol_error` telemetry event, and make the inference-error path
fall back to a parseable, correctly-typed payload (carrying the real
request_id/error/status_code) rather than `{}`.

Behavior-preserving on the success path: swift build green; swift test
1052 pass / 0 fail.

Co-authored-by: Cursor <cursoragent@cursor.com>
…timeout

- PersistentEnclaveKey: guard the Keychain `result` optional and throw a typed
  keyLookupFailed instead of force-unwrapping nil (the CFTypeRef->SecKey cast
  is compiler-guaranteed).
- ProtocolSafeQuantizedKVCache.copy(): assert the inner-copy type via guard +
  preconditionFailure with a clear message rather than a blind `as!`.
- ProviderLoop.waitForPreloads: replace `DispatchQueue.global().asyncAfter`
  with a structured `Task.sleep(for:)` timeout (OneShotBoolContinuation still
  dedupes, so the first-resume-wins race is unchanged).

Behavior-preserving: swift build green; swift test 1052 pass / 0 fail.
Co-authored-by: Cursor <cursoragent@cursor.com>
Move the consumer-facing API key management endpoints and their
validation/response helpers (create/list/get/update/rotate/delete,
apiKeyToResponse, validateKeyLimitInputs, keyModelAllowed, checkKeySpendCap,
applyKeyPatch) into apikey_handlers.go, matching the existing
apikey_handlers_test.go. Same package, no behavior change. consumer.go drops
from ~5311 to ~4884 LOC.

Co-authored-by: Cursor <cursoragent@cursor.com>
…m ProviderLoop

First, low-risk step of breaking up the ProviderLoop god file: move the two
standalone top-level value types out of the 3.6k-line ProviderLoop.swift into
their own single-responsibility files (no actor internals touched, no access
loosened). The loop file now holds the actor + its behavior, not its inputs
or its logger type.

Note: the actor's instance-method sections are intentionally NOT split across
files here — ProviderLoop deliberately keeps method extensions in-file to reach
its `private` model registry without loosening ~40 members to `internal` (see
the documented rationale at the local-endpoint extension). That deeper split is
deferred pending an explicit encapsulation decision.

Behavior-preserving: swift build green; swift test 1052 pass / 0 fail.
Co-authored-by: Cursor <cursoragent@cursor.com>
darkbloom-publish (the security-critical model-manifest hasher used by the
registry publish flow) had zero tests on its CLI surface. Add a
DarkbloomPublishTests target exercising the `hash` subcommand end-to-end
against a temp snapshot dir:
- writes a manifest.json that decodes back to the expected id/version/files/
  64-hex aggregate
- hashing is deterministic across runs on identical bytes
- rejects unsafe model ids and version tags (spaces, '..', '/', empty) before
  any hashing happens

(The ManifestBuilder library itself remains covered by ProviderCoreFoundationTests.)

swift build green; swift test 1052 swift-testing + 4 new XCTest pass / 0 fail.

Co-authored-by: Cursor <cursoragent@cursor.com>
vlm-smoke (explicitly "Safe to delete") and kv-engine-demo (DAR-318 capacity
demo, "NOT a product") are dev-only executables referenced nowhere outside
Package.swift — no CI, scripts, Makefile, or docs build them. Delete both
(~1040 LOC) to shrink the default build surface.

Kept kv-se-harness: it is the only way to exercise the Secure-Enclave KEK +
Keychain round-trip on real SE hardware (a path `swift test` cannot reach), so
it retains test value despite not being a product.

Behavior-preserving: swift build green; swift test 1052 pass / 0 fail.
Co-authored-by: Cursor <cursoragent@cursor.com>
@vercel

vercel Bot commented Jun 22, 2026

Copy link
Copy Markdown

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
d-inference Ready Ready Preview Jun 22, 2026 6:12am
d-inference-console-ui-dev Ready Ready Preview Jun 22, 2026 6:12am
d-inference-landing Ready Ready Preview Jun 22, 2026 6:12am

Request Review

Resolve coordinator/api/consumer.go conflict: master (#443 inference
reliability) inserted the new remote-media-URL rejection helpers into the
request-introspection region that this branch had already relocated out of
consumer.go. Keep the branch's modular structure and move master's five new
funcs (isInlineDataURI, mediaPartURLString, validateMediaParts,
visionRejectRemoteEnabled, and the *Server method rejectRemoteMediaURLs) into
coordinator/api/request_introspection.go next to detectMediaRequirement/
isMediaPartType. The os import moves with them; the strconv import stays in
consumer.go (used elsewhere).

Behavior-preserving: master's stable-identity ejection breaker calls and both
rejectRemoteMediaURLs call sites (handleChatCompletions, handleGenericInference)
auto-merged unchanged and are retained. go build/vet/test all green.

Co-authored-by: Cursor <cursoragent@cursor.com>
@github-actions

github-actions Bot commented Jun 22, 2026

Copy link
Copy Markdown

This PR is a large refactoring / dead-code removal pass with several correctness fixes; it does not introduce new attack surface and makes minor positive changes to billing error visibility and SE key resilience, but leaves all previously-open threat mitigations unchanged.


Trust Boundaries Touched

  • TB-002 — coordinator→provider WebSocket (registry routing refactor, registry.go)
  • TB-003 — provider operator vs process (PersistentEnclaveKey.swift)
  • TB-007 — provider inference engine (Swift inference types, ProviderLoop.swift, BatchScheduler+KVEstimation.swift)
  • TB-008 — coordinator→payments (provider.go billing credit error handling)
  • TB-009 — Apple attestation chain (PersistentEnclaveKey.swift)

Per-Threat Assessment

T-034 — Provider runs modified code while advertising a trusted identity (coordinator/api/provider.go, coordinator/registry/registry.go)

ℹ️ Neutral. The large deletion in registry.go removes ScoreProvider, FindProvider, FindProviderWithTrust, TrustMultiplier, and MaxConcurrentRequests. The comment added at pendingModelLoads (line ~1154) explicitly documents that this map is not read on the routing hot path — this is a documentation improvement but not a security posture change. The CodeAttested gate and providerSupportsPrivateTextLocked are not in the diff, so the APNs-based code-identity round-trip (the actual T-034 mitigation) is untouched.

The three EnqueueText sites now log and metric on error instead of silently dropping (_ =). This does not affect the security of the attestation chain.

T-033 / T-035 — Attestation blob replay / SE key identity (PersistentEnclaveKey.swift)

Strengthens mitigation (minor). The force-unwrap on errSecSuccess with a nil result is replaced by a guarded throw (PersistentEnclaveKeyError.keyLookupFailed). Previously, a Keychain contract violation (Apple bug or race) would crash the process with no diagnostic — the crash could be exploited as a reboot trigger to force a re-enrollment cycle. Now it surfaces as a typed error that the caller can handle cleanly. This is a robustness fix, not a new control, but it closes a subtle reliability gap in the SE key lifecycle path.

No change to key creation, access group, or label — SLDQ2GJ6TL.io.darkbloom.provider / io.darkbloom.provider.attestation-signing.v2 are untouched. The existing open finding (same-team binary can load the key) remains.

T-008 — Provider sends plaintext SSE chunks on encryption failure (ProviderLoop.swift)

ℹ️ Neutral. The ProviderLogger struct and ProviderLoopConfig struct are removed (dead code extracted elsewhere). The encryption failure path (SEC-016) is not touched in this diff. The open finding stands.

T-009 / T-010 — Swift provider excluded from private routing / cancellation (ProviderLoop.swift)

ℹ️ Neutral. The one substantive change to ProviderLoop.swift is replacing DispatchQueue.global().asyncAfter with a structured Task { try? await Task.sleep(for:) } for the APNs challenge timeout (line ~2144). This is a concurrency hygiene fix. OneShotBoolContinuation deduplication means the race semantics are identical; the timeout still fires and resumes the continuation with false. No impact on cancellation wiring or privacy-flag reporting.

T-007 / T-027 — Model weight substitution / manipulation (BatchScheduler+KVEstimation.swift, BatchSchedulerTypes.swift, KVQuantCandidateMode.swift)

ℹ️ Neutral. Changes are access-level promotions (internalpublic) on KVEstimation, parseModelArchitecture, resolvedKVBytesPerToken, and ModelArchitecture, plus a new KVQuantCandidateMode enum. These expose types to benchmark/gate tooling. The weight-hash computation path (WeightHasher.swift) is not changed. SEC-007 (fail-open weight hash enforcement) remains open. The new KVQuantCandidateMode enum is pure data/logic with no trust-boundary crossing.

T-005 / T-030 — Billing correctness (coordinator/api/provider.go)

Strengthens observability (minor positive). Three _ = s.store.Credit(...) calls are replaced with explicit error checks that log at Error level and emit Datadog metrics (billing.credit_failed). This is not a security control but makes silent financial failures detectable in monitoring, which matters for T-030 (duplicate Stripe webhook double-credit) and T-005 (billing repudiation). The underlying atomicity issues (SEC-012) are not addressed here.

T-017 / T-025 — SSRF, stale release cache

ℹ️ Not touched by this diff.


New Attack Surface

None identified. The public visibility promotions on KVEstimation and ModelArchitecture are module-internal to the Swift package and do not expose any network or IPC surface. The new KVQuantCandidateMode enum is a pure value type with no side effects and no I/O.

The comment added to pendingModelLoads explicitly documents that it must not be read from routing paths — this is a positive defensive annotation and introduces no new surface.


Open Findings Resolved

None. All previously open findings (SEC-007, SEC-016, SEC-017, SEC-021, SEC-034, SEC-012, etc.) remain unaddressed by this PR.


Reviewer Notes

  1. registry.go deletion of ScoreProvider/FindProvider: confirm in the non-diffed files that the new routing hot path (snapshotProviderLockedEx / buildCandidateWithReason) correctly enforces the CodeAttested flag — the deleted ScoreProvider checked RuntimeVerified and returned 0 if false; the replacement must preserve an equivalent fail-closed gate. This is the most security-sensitive consequence of the refactor and is not visible in the focused diff.

  2. PersistentEnclaveKey.swift guard: the result as! SecKey force-cast immediately after the guard is still a force-cast — it is safe given the errSecSuccess contract, but worth noting if the error path is ever extended.


🔐 Threat model: docs/threat-model.yaml · Updates on each push to this PR

@ethenotethan ethenotethan left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated Code Review — Layr-Labs/d-inference#

Verdict: REQUEST_CHANGES

Security — ⚠️ SKIPPED

Error: Expecting property name enclosed in double quotes: line 2 column 5 (char 6)

Performance — 3 finding(s) (1 blocking)

  • 🔵 [INFO] coordinator/api/billing_handlers.go:176-188 — Synchronous billing operations in webhook handler could block request processing
    • Suggestion: Consider making CompleteBillingSession and Apply operations asynchronous or use a background worker queue to avoid blocking the webhook response
  • 🟡 [MEDIUM] coordinator/api/consumer.go:388-450 — maybeFallbackAlias function duplicates capacity checking logic across two modes
    • Suggestion: Extract common capacity checking logic to avoid redundant QuickCapacityCheckWithTTFTForRequest calls and reduce code duplication
  • 🔵 [INFO] coordinator/api/apikey_handlers.go:285-320 — Multiple synchronous database operations in handleUpdateAPIKey without batching
    • Suggestion: Consider batching the invalidateAllAPIKeyCache calls or making them asynchronous to reduce blocking time during key updates

Type_diligence — ✅ No issues found

Additive_complexity — 5 finding(s) (1 blocking)

  • 🔵 [INFO] coordinator/api/apikey_handlers.go:1-447 — New 447-line file extracted from consumer.go may be premature separation
    • Suggestion: Consider if API key handlers truly need their own file or if they could remain in consumer.go with better organization
  • 🔵 [INFO] coordinator/api/request_introspection.go:1-561 — New 561-line file for request introspection helpers may be over-modularized
    • Suggestion: Evaluate if these pure helper functions need their own file or could be organized differently within existing files
  • 🟡 [MEDIUM] coordinator/api/consumer.go:388-445 — Complex aliasFallbackMode enum and maybeFallbackAlias function with multiple similar code paths
    • Suggestion: Consider simplifying the fallback logic or extracting common patterns to reduce the branching complexity
  • 🔵 [INFO] provider-swift/Sources/ProviderBenchmark/KVQuant/ProtocolSafeQuantizedKVCache.swift:101-108 — Added preconditionFailure with detailed error message for type casting that should never fail
    • Suggestion: Consider if the guard+preconditionFailure is necessary or if the original force-cast with a comment would be clearer
  • 🔵 [INFO] coordinator/api/billing_handlers.go:175-191 — Similar error handling pattern repeated for billing session completion and referral application
    • Suggestion: Consider extracting a helper function for the common 'log error + increment metric' pattern

8 finding(s) total, 2 blocking. Verdict: REQUEST_CHANGES.

🤖 Automated review by Centaur · DAR-186

Comment on lines +176 to +188
// Best-effort: the deposit is already credited above, but a failure here
// leaves the session marked incomplete (and replayable). Surface it.
if err := s.billing.Store().CompleteBillingSession(billingSessionID); err != nil {
s.logger.Error("stripe: failed to mark billing session complete",
"billing_session_id", billingSessionID, "error", err)
s.ddIncr("billing.session_complete_failed", nil)
}
}
if referralCode != "" {
_ = s.billing.Referral().Apply(consumerKey, referralCode)
// Best-effort: a failure here means the referrer is not credited for this
// deposit; never silently swallow it.
if err := s.billing.Referral().Apply(consumerKey, referralCode); err != nil {
s.logger.Error("stripe: failed to apply referral credit", "error", err)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 [INFO] ⚡ Synchronous billing operations in webhook handler could block request processing

💡 Suggestion: Consider making CompleteBillingSession and Apply operations asynchronous or use a background worker queue to avoid blocking the webhook response

📊 Score: 2×3 = 6 · Category: blocking_io

Comment on lines +285 to +320
if err != nil {
writeJSON(w, http.StatusNotFound, errorResponse("not_found", "key not found"))
return
}

// Decode into a presence map so we can distinguish "field omitted" (leave
// unchanged) from "field set to null" (clear the limit).
var patch map[string]json.RawMessage
if err := json.NewDecoder(r.Body).Decode(&patch); err != nil {
writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", "invalid JSON body"))
return
}
if msg := applyKeyPatch(existing, patch); msg != "" {
writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", msg))
return
}
if msg := validateKeyLimitInputs(existing.LimitReset, nil, existing.RPMLimit, existing.ITPMLimit, existing.OTPMLimit, existing.ExpiresAt); msg != "" {
writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", msg))
return
}

// Bump the auth-cache generation before AND after the mutation so a
// concurrent request cannot keep authenticating with a stale (e.g.
// just-disabled) cached record.
s.invalidateAllAPIKeyCache()
updated, err := s.store.UpdateAPIKey(user.AccountID, id, *existing)
if err != nil {
writeJSON(w, http.StatusInternalServerError, errorResponse("server_error", "failed to update key"))
return
}
s.invalidateAllAPIKeyCache()
writeJSON(w, http.StatusOK, s.apiKeyToResponse(updated))
}

// applyKeyPatch merges a presence-aware PATCH body into an existing key record.
// Returns a human-readable error string on invalid input (empty when ok).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 [INFO] ⚡ Multiple synchronous database operations in handleUpdateAPIKey without batching

💡 Suggestion: Consider batching the invalidateAllAPIKeyCache calls or making them asynchronous to reduce blocking time during key updates

📊 Score: 2×3 = 6 · Category: blocking_io

Comment on lines +1 to +447
package api

// apikey_handlers.go holds the consumer-facing API key management endpoints
// (create/list/get/update/rotate/delete) plus their request validation and
// response-shaping helpers. Split out of consumer.go to keep key management
// separate from the inference request path.

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"

"github.com/eigeninference/d-inference/coordinator/auth"
"github.com/eigeninference/d-inference/coordinator/store"

"github.com/eigeninference/d-inference/coordinator/api/types"
)

// handleCreateKey handles POST /v1/auth/keys — creates a new consumer API key.
// Requires Privy authentication. The key is linked to the user's account so
// requests made with the key are billed to the same account.
func (s *Server) handleCreateKey(w http.ResponseWriter, r *http.Request) {
user := auth.UserFromContext(r.Context())
if user == nil {
writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error",
"API key creation requires a Privy account — authenticate with a Privy access token"))
return
}

key, err := s.store.CreateKeyForAccount(user.AccountID)
if err != nil {
writeJSON(w, http.StatusInternalServerError, errorResponse("server_error", "failed to create key"))
return
}
writeJSON(w, http.StatusOK, types.CreateKeyResponse{
APIKey: key,
AccountID: user.AccountID,
})
}

// handleRevokeKey handles DELETE /v1/auth/keys — revokes an API key.
// The caller must own the key (same account). Requires Privy auth so a
// compromised API key cannot revoke legitimate keys.
func (s *Server) handleRevokeKey(w http.ResponseWriter, r *http.Request) {
user := auth.UserFromContext(r.Context())
if user == nil {
writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required"))
return
}

var body struct {
Key string `json:"key"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Key == "" {
writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", "provide {\"key\": \"sk-db-...\"}"))
return
}

owner := s.store.GetKeyAccount(body.Key)
if owner != user.AccountID {
writeJSON(w, http.StatusForbidden, errorResponse("forbidden", "you can only revoke your own keys"))
return
}

if !s.store.RevokeKey(body.Key) {
writeJSON(w, http.StatusNotFound, errorResponse("not_found", "key not found or already revoked"))
return
}
s.invalidateAPIKeyCache(body.Key)

writeJSON(w, http.StatusOK, types.RevokeKeyResponse{Status: "revoked"})
}

// apiKeyToResponse projects a stored key into its masked API representation,
// computing the current-window spend and remaining budget.
func (s *Server) apiKeyToResponse(k *store.APIKey) types.APIKeyResponse {
resp := types.APIKeyResponse{
ID: k.ID,
Name: k.Name,
Label: k.Label,
Disabled: k.Disabled,
LimitReset: store.NormalizeResetWindow(k.LimitReset),
RPMLimit: k.RPMLimit,
ITPMLimit: k.ITPMLimit,
OTPMLimit: k.OTPMLimit,
AllowedModels: k.AllowedModels,
SelfRouteOnly: k.SelfRouteOnly,
ExpiresAt: k.ExpiresAt,
CreatedAt: k.CreatedAt,
LastUsedAt: k.LastUsedAt,
}
since := store.KeySpendWindowStart(resp.LimitReset, time.Now())
spent := s.store.KeySpendSince(k.ID, since)
resp.UsageUSD = microToUSD(spent)
if k.LimitMicroUSD != nil {
limitUSD := microToUSD(*k.LimitMicroUSD)
resp.LimitUSD = &limitUSD
remaining := *k.LimitMicroUSD - spent
if remaining < 0 {
remaining = 0
}
remUSD := microToUSD(remaining)
resp.RemainingUSD = &remUSD
}
return resp
}

// validateKeyLimitInputs sanity-checks user-supplied limit values. Returns a
// human-readable error string (empty when valid).
func validateKeyLimitInputs(reset string, limitUSD *float64, rpm, itpm, otpm *int64, expiresAt *time.Time) string {
switch reset {
case "", store.KeyResetNone, store.KeyResetDaily, store.KeyResetWeekly, store.KeyResetMonthly:
default:
return "limit_reset must be one of: none, daily, weekly, monthly"
}
if limitUSD != nil && *limitUSD < 0 {
return "limit_usd must be >= 0"
}
if rpm != nil && *rpm < 0 {
return "rpm_limit must be >= 0"
}
if itpm != nil && *itpm < 0 {
return "itpm_limit must be >= 0"
}
if otpm != nil && *otpm < 0 {
return "otpm_limit must be >= 0"
}
if expiresAt != nil && !expiresAt.IsZero() && expiresAt.Before(time.Now()) {
return "expires_at must be in the future"
}
return ""
}

// keyModelAllowed returns false when the calling key restricts models via an
// allow-list that does not include the requested model. Account-scoped/legacy
// keys (no key in context) and keys without an allow-list always pass.
func (s *Server) keyModelAllowed(ctx context.Context, model string) bool {
k := apiKeyFromContext(ctx)
if k == nil || len(k.AllowedModels) == 0 {
return true
}
for _, m := range k.AllowedModels {
if m == model {
return true
}
}
return false
}

// checkKeySpendCap reports whether charging additionalMicroUSD to the calling
// key would exceed its per-key spend cap in the current window. It returns
// (message, ok); ok=false means the request must be rejected with 402. The
// per-account balance ledger is still the hard atomic ceiling — this is the
// soft, per-key sub-cap, enforced against settled usage (so concurrent
// in-flight requests are eventually-consistent, never over the account balance).
func (s *Server) checkKeySpendCap(ctx context.Context, additionalMicroUSD int64) (string, bool) {
k := apiKeyFromContext(ctx)
if k == nil || k.ID == "" || k.LimitMicroUSD == nil {
return "", true
}
since := store.KeySpendWindowStart(k.LimitReset, time.Now())
spent := s.store.KeySpendSince(k.ID, since)
if spent+additionalMicroUSD > *k.LimitMicroUSD {
window := store.NormalizeResetWindow(k.LimitReset)
if window == store.KeyResetNone {
window = "total"
}
return fmt.Sprintf("API key spend limit reached (%s cap $%.2f, used $%.2f) — raise this key's limit or use another key",
window, microToUSD(*k.LimitMicroUSD), microToUSD(spent)), false
}
return "", true
}

// handleListAPIKeys handles GET /v1/keys — lists the caller's keys (masked).
func (s *Server) handleListAPIKeys(w http.ResponseWriter, r *http.Request) {
user := auth.UserFromContext(r.Context())
if user == nil {
writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required"))
return
}
keys, err := s.store.ListAPIKeys(user.AccountID)
if err != nil {
writeJSON(w, http.StatusInternalServerError, errorResponse("server_error", "failed to list keys"))
return
}
out := make([]types.APIKeyResponse, 0, len(keys))
for i := range keys {
out = append(out, s.apiKeyToResponse(&keys[i]))
}
writeJSON(w, http.StatusOK, types.APIKeyListResponse{Object: "list", Data: out})
}

// handleCreateAPIKey handles POST /v1/keys — mints a new named, optionally
// limited key. The raw secret is returned exactly once.
func (s *Server) handleCreateAPIKey(w http.ResponseWriter, r *http.Request) {
user := auth.UserFromContext(r.Context())
if user == nil {
writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error",
"API key creation requires a Privy account — authenticate with a Privy access token"))
return
}

var req createAPIKeyRequest
if r.Body != nil {
// A missing/empty body is allowed (creates a default unnamed key).
if err := json.NewDecoder(r.Body).Decode(&req); err != nil && !errors.Is(err, io.EOF) {
writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", "invalid JSON body"))
return
}
}
if msg := validateKeyLimitInputs(req.LimitReset, req.LimitUSD, req.RPMLimit, req.ITPMLimit, req.OTPMLimit, req.ExpiresAt); msg != "" {
writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", msg))
return
}

opts := store.APIKeyCreate{
Name: strings.TrimSpace(req.Name),
LimitReset: store.NormalizeResetWindow(req.LimitReset),
RPMLimit: req.RPMLimit,
ITPMLimit: req.ITPMLimit,
OTPMLimit: req.OTPMLimit,
AllowedModels: req.AllowedModels,
SelfRouteOnly: req.SelfRouteOnly,
ExpiresAt: req.ExpiresAt,
}
if req.LimitUSD != nil {
m := usdToMicro(*req.LimitUSD)
opts.LimitMicroUSD = &m
}

raw, rec, err := s.store.CreateAPIKey(user.AccountID, opts)
if err != nil {
writeJSON(w, http.StatusInternalServerError, errorResponse("server_error", "failed to create key"))
return
}
writeJSON(w, http.StatusOK, types.CreateAPIKeyResponse{
Key: raw,
Data: s.apiKeyToResponse(rec),
})
}

// handleGetAPIKey handles GET /v1/keys/{id}.
func (s *Server) handleGetAPIKey(w http.ResponseWriter, r *http.Request) {
user := auth.UserFromContext(r.Context())
if user == nil {
writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required"))
return
}
id := r.PathValue("id")
k, err := s.store.GetAPIKeyByID(user.AccountID, id)
if err != nil {
writeJSON(w, http.StatusNotFound, errorResponse("not_found", "key not found"))
return
}
writeJSON(w, http.StatusOK, s.apiKeyToResponse(k))
}

// handleGetCallingKey handles GET /v1/key — returns the metadata for the API
// key used to authenticate the request (OpenRouter parity).
func (s *Server) handleGetCallingKey(w http.ResponseWriter, r *http.Request) {
k := apiKeyFromContext(r.Context())
if k == nil || k.ID == "" {
writeJSON(w, http.StatusNotFound, errorResponse("not_found",
"no key metadata — this endpoint requires API key authentication"))
return
}
writeJSON(w, http.StatusOK, s.apiKeyToResponse(k))
}

// handleUpdateAPIKey handles PATCH /v1/keys/{id} — sparse update of a key's
// name, disabled flag, limits, reset window, expiry, and model allow-list.
func (s *Server) handleUpdateAPIKey(w http.ResponseWriter, r *http.Request) {
user := auth.UserFromContext(r.Context())
if user == nil {
writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required"))
return
}
id := r.PathValue("id")
existing, err := s.store.GetAPIKeyByID(user.AccountID, id)
if err != nil {
writeJSON(w, http.StatusNotFound, errorResponse("not_found", "key not found"))
return
}

// Decode into a presence map so we can distinguish "field omitted" (leave
// unchanged) from "field set to null" (clear the limit).
var patch map[string]json.RawMessage
if err := json.NewDecoder(r.Body).Decode(&patch); err != nil {
writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", "invalid JSON body"))
return
}
if msg := applyKeyPatch(existing, patch); msg != "" {
writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", msg))
return
}
if msg := validateKeyLimitInputs(existing.LimitReset, nil, existing.RPMLimit, existing.ITPMLimit, existing.OTPMLimit, existing.ExpiresAt); msg != "" {
writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", msg))
return
}

// Bump the auth-cache generation before AND after the mutation so a
// concurrent request cannot keep authenticating with a stale (e.g.
// just-disabled) cached record.
s.invalidateAllAPIKeyCache()
updated, err := s.store.UpdateAPIKey(user.AccountID, id, *existing)
if err != nil {
writeJSON(w, http.StatusInternalServerError, errorResponse("server_error", "failed to update key"))
return
}
s.invalidateAllAPIKeyCache()
writeJSON(w, http.StatusOK, s.apiKeyToResponse(updated))
}

// applyKeyPatch merges a presence-aware PATCH body into an existing key record.
// Returns a human-readable error string on invalid input (empty when ok).
func applyKeyPatch(k *store.APIKey, patch map[string]json.RawMessage) string {
if raw, ok := patch["name"]; ok {
var name string
if err := json.Unmarshal(raw, &name); err != nil {
return "invalid value for name"
}
k.Name = strings.TrimSpace(name)
}
if raw, ok := patch["disabled"]; ok {
var disabled bool
if err := json.Unmarshal(raw, &disabled); err != nil {
return "invalid value for disabled"
}
k.Disabled = disabled
}
if raw, ok := patch["limit_reset"]; ok {
var reset string
if err := json.Unmarshal(raw, &reset); err != nil {
return "invalid value for limit_reset"
}
k.LimitReset = store.NormalizeResetWindow(reset)
}
if raw, ok := patch["limit_usd"]; ok {
if string(raw) == "null" {
k.LimitMicroUSD = nil
} else {
var usd float64
if err := json.Unmarshal(raw, &usd); err != nil {
return "invalid value for limit_usd"
}
if usd < 0 {
return "limit_usd must be >= 0"
}
m := usdToMicro(usd)
k.LimitMicroUSD = &m
}
}
if raw, ok := patch["allowed_models"]; ok {
if string(raw) == "null" {
k.AllowedModels = nil
} else {
var models []string
if err := json.Unmarshal(raw, &models); err != nil {
return "invalid value for allowed_models"
}
k.AllowedModels = models
}
}
if raw, ok := patch["self_route_only"]; ok {
var v bool
if err := json.Unmarshal(raw, &v); err != nil {
return "invalid value for self_route_only"
}
k.SelfRouteOnly = v
}
for field, dst := range map[string]**int64{
"rpm_limit": &k.RPMLimit,
"itpm_limit": &k.ITPMLimit,
"otpm_limit": &k.OTPMLimit,
} {
if raw, ok := patch[field]; ok {
if string(raw) == "null" {
*dst = nil
} else {
var v int64
if err := json.Unmarshal(raw, &v); err != nil {
return "invalid value for " + field
}
*dst = &v
}
}
}
if raw, ok := patch["expires_at"]; ok {
if string(raw) == "null" {
k.ExpiresAt = nil
} else {
var t time.Time
if err := json.Unmarshal(raw, &t); err != nil {
return "invalid value for expires_at (use RFC 3339)"
}
k.ExpiresAt = &t
}
}
return ""
}

// handleDeleteAPIKey handles DELETE /v1/keys/{id} — permanently deletes a key.
func (s *Server) handleDeleteAPIKey(w http.ResponseWriter, r *http.Request) {
user := auth.UserFromContext(r.Context())
if user == nil {
writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required"))
return
}
id := r.PathValue("id")
s.invalidateAllAPIKeyCache()
if err := s.store.RevokeAPIKeyByID(user.AccountID, id); err != nil {
writeJSON(w, http.StatusNotFound, errorResponse("not_found", "key not found"))
return
}
s.invalidateAllAPIKeyCache()
writeJSON(w, http.StatusOK, types.RevokeKeyResponse{Status: "revoked"})
}

// handleRotateAPIKey handles POST /v1/keys/{id}/rotate — mints a fresh secret
// carrying the same limits and metadata, then deletes the old key. The new
// secret is returned exactly once.
func (s *Server) handleRotateAPIKey(w http.ResponseWriter, r *http.Request) {
user := auth.UserFromContext(r.Context())
if user == nil {
writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required"))
return
}
id := r.PathValue("id")
// Bump the auth-cache generation before AND after the mutation so the old
// secret stops authenticating the instant rotation commits.
s.invalidateAllAPIKeyCache()
raw, rec, err := s.store.RotateAPIKey(user.AccountID, id)
if err != nil {
writeJSON(w, http.StatusNotFound, errorResponse("not_found", "key not found"))
return
}
s.invalidateAllAPIKeyCache()
writeJSON(w, http.StatusOK, types.CreateAPIKeyResponse{
Key: raw,
Data: s.apiKeyToResponse(rec),
})
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 [INFO] 🧩 New 447-line file extracted from consumer.go may be premature separation

💡 Suggestion: Consider if API key handlers truly need their own file or if they could remain in consumer.go with better organization

📊 Score: 2×3 = 6 · Category: over-abstraction

Comment on lines +1 to +561
package api

// request_introspection.go holds helpers for introspecting and lightly
// reshaping inbound inference request bodies before routing/dispatch:
// token and cost estimation (routing vs billing), media/tool detection,
// remote media-URL rejection, cache-affinity key derivation, and
// provider-serial allowlist parsing. Most are pure helpers with no Server
// state; the pre-dispatch media-URL rejection (rejectRemoteMediaURLs) hangs
// off *Server only to record rejection telemetry. Split out of consumer.go
// to keep the request-handling orchestrator thin.

import (
"crypto/sha256"
"encoding/json"
"fmt"
"net/http"
"os"
"strconv"
"strings"

"github.com/eigeninference/d-inference/coordinator/store"
)

// Media prompt-token costs. A vision encoder turns each image/video into a
// bounded number of soft tokens (Gemma 4 caps around a few hundred per image)
// regardless of the base64 byte length, so counting a `data:` URI as text
// inflates the estimate by orders of magnitude — distorting routing admission and
// over-reserving balance. These flat per-media costs keep both sane.
const (
imagePromptTokenCost = 300
videoPromptTokenCost = 1500
)

func intFromRequestValue(v any) (int, bool) {
switch x := v.(type) {
case int:
return x, true
case int32:
return int(x), true
case int64:
return int(x), true
case float64:
return int(x), true
case json.Number:
n, err := x.Int64()
if err != nil {
return 0, false
}
return int(n), true
default:
return 0, false
}
}

// approximateTokenCount returns a rough token estimate for routing and queue
// admission. The len/4 heuristic is a reasonable average for English text
// with GPT-style BPE tokenizers. This value feeds into the scheduler's
// capacity checks (pendingTokenBudget, freeMemoryAdmits) where a tighter
// estimate produces better routing decisions.
//
// For billing reservation (where underestimation causes provider shortfall),
// use approximateTokenCountUpperBound instead.
func approximateTokenCount(v any) int {
if v == nil {
return 0
}
switch x := v.(type) {
case string:
if x == "" {
return 0
}
tokens := len(x) / 4
if tokens < 1 {
tokens = 1
}
return tokens
default:
b, err := json.Marshal(v)
if err != nil {
return 0
}
tokens := len(b) / 4
if tokens < 1 {
tokens = 1
}
return tokens
}
}

// approximateTokenCountUpperBound returns a guaranteed upper bound on the
// number of tokens a BPE tokenizer would produce for v. Every BPE vocabulary
// starts with one token per byte and can only merge, so len(text) >= tokens
// for any model family, any language, forever. This is used only for billing
// reservation to ensure the pre-flight debit always covers the actual cost.
//
// Using len(text) over-reserves by ~3-4x on average for English prose, but
// the difference is refunded immediately after inference completes, so
// consumers are never overcharged — they only need sufficient balance to
// cover the reservation hold.
func approximateTokenCountUpperBound(v any) int {
if v == nil {
return 0
}
switch x := v.(type) {
case string:
return len(x)
default:
b, err := json.Marshal(v)
if err != nil {
return 0
}
return len(b)
}
}

func estimatePromptTokens(parsed map[string]any) int {
total := 0
if v, ok := parsed["messages"]; ok {
total += messagesPromptTokens(v)
}
if v, ok := parsed["input"]; ok {
total += inputPromptTokens(v)
}
if v, ok := parsed["prompt"]; ok {
total += approximateTokenCount(v)
}
if total == 0 {
total = approximateTokenCount(parsed)
}
return total
}

// estimateBillingPromptTokens returns a guaranteed upper bound on prompt
// tokens for billing reservation. Uses byte-length (not len/4) so the
// pre-flight reservation always covers actual cost. This value must NOT
// be used for routing — see estimatePromptTokens for that.
func estimateBillingPromptTokens(parsed map[string]any) int {
total := 0
if v, ok := parsed["messages"]; ok {
// Billing MUST stay a guaranteed upper bound (len(bytes) >= tokens for any
// BPE tokenizer), so it keeps counting full message bytes — including a
// base64 image's bytes and every non-content field (role, tool_calls,
// name). Switching to the media-aware flat count here would DROP those
// fields and under-reserve for tool-calling requests. Over-reservation on a
// large image is safe (it is refunded after inference); the routing/ITPM
// estimate (estimatePromptTokens) is the media-aware one.
total += approximateTokenCountUpperBound(v)
}
if v, ok := parsed["input"]; ok {
total += approximateTokenCountUpperBound(v)
}
if v, ok := parsed["prompt"]; ok {
total += approximateTokenCountUpperBound(v)
}
if total == 0 {
total = approximateTokenCountUpperBound(parsed)
}
return total
}

// isMediaPartType reports whether an OpenAI/OpenRouter content-part type denotes
// image or video input.
func isMediaPartType(t string) bool {
switch t {
// OpenAI chat (image_url/video_url), OpenAI Responses (input_image/input_video),
// and Anthropic /v1/messages content blocks ({"type":"image"|"video","source":…}).
case "image_url", "input_image", "image", "video_url", "input_video", "video":
return true
}
return false
}

// messageContentTokens estimates ROUTING prompt tokens for one message's
// `content`, counting text parts as text (len/4) and each image/video part as a
// flat media cost (never the base64 length). Used only for the routing/ITPM
// estimate; billing uses approximateTokenCountUpperBound (a guaranteed upper
// bound that intentionally still counts the base64 bytes).
func messageContentTokens(content any) int {
textTokens := func(s string) int {
if s == "" {
return 0
}
if t := len(s) / 4; t > 0 {
return t
}
return 1
}
switch c := content.(type) {
case string:
return textTokens(c)
case []any:
total := 0
for _, part := range c {
pm, ok := part.(map[string]any)
if !ok {
continue
}
typ, _ := pm["type"].(string)
switch {
case typ == "text" || typ == "input_text":
if s, ok := pm["text"].(string); ok {
total += textTokens(s)
}
case typ == "image_url" || typ == "input_image" || typ == "image":
total += imagePromptTokenCost
case typ == "video_url" || typ == "input_video" || typ == "video":
total += videoPromptTokenCost
default:
if b, err := json.Marshal(pm); err == nil {
total += len(b) / 4
}
}
}
return total
default:
return approximateTokenCount(content)
}
}

// messagesPromptTokens sums media-aware routing content tokens across a messages
// array. Falls back to the len/4 heuristic when messages isn't the standard
// array shape.
func messagesPromptTokens(messages any) int {
arr, ok := messages.([]any)
if !ok {
return approximateTokenCount(messages)
}
total := 0
for _, m := range arr {
mm, ok := m.(map[string]any)
if !ok {
total += approximateTokenCount(m)
continue
}
total += 4 // small per-message framing (role + delimiters)
total += messageContentTokens(mm["content"])
}
return total
}

// inputPromptTokens estimates the Responses API `input` field. A string input
// is plain text (len/4). Structured input is an array of message-like items with
// `content` parts, so reuse the same media-aware content estimator as chat
// messages instead of counting JSON wrapper bytes.
func inputPromptTokens(input any) int {
switch x := input.(type) {
case string:
return approximateTokenCount(x)
case []any:
total := 0
for _, item := range x {
switch m := item.(type) {
case string:
total += approximateTokenCount(m)
case map[string]any:
content, ok := m["content"]
if !ok {
total += approximateTokenCount(m)
continue
}
total += 4 // role/type framing, matching messagesPromptTokens.
total += messageContentTokens(content)
default:
total += approximateTokenCount(item)
}
}
return total
default:
return approximateTokenCount(input)
}
}

// contentPartsHaveMedia reports whether a `content` value (a content-part array)
// carries any image/video part.
func contentPartsHaveMedia(content any) bool {
parts, ok := content.([]any)
if !ok {
return false
}
for _, part := range parts {
pm, ok := part.(map[string]any)
if !ok {
continue
}
if typ, _ := pm["type"].(string); isMediaPartType(typ) {
return true
}
}
return false
}

// detectMediaRequirement reports whether the request carries image/video input.
// The coordinator sees plaintext at this point (sealedTransport decrypts before
// the handler), so this drives the vision routing gate and the fail-fast "no
// vision-capable provider" response. It scans both the Chat Completions
// `messages[].content` parts and the Responses API `input[].content` parts so a
// media request on either surface is gated (never silently routed text-blind).
func detectMediaRequirement(parsed map[string]any) bool {
if messages, ok := parsed["messages"].([]any); ok {
for _, m := range messages {
if mm, ok := m.(map[string]any); ok && contentPartsHaveMedia(mm["content"]) {
return true
}
}
}
// Responses API: `input` may be a string (no media) or an array of items,
// each carrying `content` parts in the same image_url/input_image shape.
if input, ok := parsed["input"].([]any); ok {
for _, item := range input {
if im, ok := item.(map[string]any); ok && contentPartsHaveMedia(im["content"]) {
return true
}
}
}
return false
}

// isInlineDataURI reports whether a media reference is an inline base64 data: URI
// — the ONLY form the provider's E2E-encrypted VLM path accepts (see
// VLMRequestInference.MediaError.invalidURL, which 400s anything else).
func isInlineDataURI(s string) bool {
return strings.HasPrefix(strings.TrimSpace(s), "data:")
}

// mediaPartURLString returns the URL/inline reference carried by a media content
// part and whether the part IS a media part. Covers OpenAI chat (image_url/
// video_url objects or bare strings), OpenAI Responses (input_image/input_video),
// and Anthropic source blocks ({type:"image"|"video", source:{type,…}}). A media
// part whose reference can't be read returns ("", true) so the caller fails OPEN.
func mediaPartURLString(pm map[string]any) (ref string, isMedia bool) {
typ, _ := pm["type"].(string)
if !isMediaPartType(typ) {
return "", false
}
switch typ {
case "image_url", "input_image", "video_url", "input_video":
field := "image_url"
if typ == "video_url" || typ == "input_video" {
field = "video_url"
}
switch v := pm[field].(type) {
case string:
return v, true
case map[string]any:
if u, ok := v["url"].(string); ok {
return u, true
}
}
return "", true
case "image", "video": // Anthropic source block
if src, ok := pm["source"].(map[string]any); ok {
switch st, _ := src["type"].(string); st {
case "url":
u, _ := src["url"].(string)
return u, true // remote reference
case "base64":
// Inline raw base64 (not a data: URI). Treated as inline/OK in v1 —
// the provider's Anthropic path accepts it; only remote refs are the
// production storm. Marked inline so it is never rejected here.
return "data:anthropic-inline-base64", true
}
}
return "", true
}
return "", true
}

// validateMediaParts walks every media part in a chat (messages[]) or Responses
// (input[]) body and returns the first REMOTE / non-inline media reference. The
// provider VLM path accepts ONLY inline data: URIs, so a remote http(s)://,
// file://, or otherwise non-data: reference is the production gemma-vision 400
// source. Returns ok=false with the offending reference so the caller rejects it
// pre-dispatch (one clean 400) instead of dispatching and 400ing across the fleet.
// Unknown/unreadable part shapes fall through (fail-OPEN) — never wrongly 400 a
// body we don't model.
func validateMediaParts(parsed map[string]any) (badRef string, ok bool) {
check := func(content any) (string, bool) {
parts, ok := content.([]any)
if !ok {
return "", true
}
for _, p := range parts {
pm, ok := p.(map[string]any)
if !ok {
continue
}
ref, isMedia := mediaPartURLString(pm)
if !isMedia || ref == "" {
continue
}
if !isInlineDataURI(ref) {
return ref, false
}
}
return "", true
}
if msgs, ok := parsed["messages"].([]any); ok {
for _, m := range msgs {
if mm, ok := m.(map[string]any); ok {
if ref, good := check(mm["content"]); !good {
return ref, false
}
}
}
}
if input, ok := parsed["input"].([]any); ok {
for _, it := range input {
if im, ok := it.(map[string]any); ok {
if ref, good := check(im["content"]); !good {
return ref, false
}
}
}
}
return "", true
}

// visionRejectRemoteEnabled gates the C4 pre-dispatch remote-media-URL rejection.
// Default ON; DARKBLOOM_VISION_REJECT_REMOTE_URLS=false (or 0) restores the prior
// dispatch-then-provider-400 behavior (still bounded by C1). Read live so the
// kill switch toggles without a redeploy.
func visionRejectRemoteEnabled() bool {
v := strings.TrimSpace(os.Getenv("DARKBLOOM_VISION_REJECT_REMOTE_URLS"))
if v == "" {
return true
}
on, err := strconv.ParseBool(v)
return err != nil || on
}

// rejectRemoteMediaURLs fails a vision request fast (one terminal 400) when any
// media part carries a remote/non-inline URL, mirroring the provider's data:-only
// contract. Pre-dispatch — no provider is contacted. handled=true => caller returns.
func (s *Server) rejectRemoteMediaURLs(w http.ResponseWriter, r *http.Request, parsed map[string]any, model, publicModel string, requiresVision, hasTools bool) (handled bool) {
if !requiresVision || !visionRejectRemoteEnabled() {
return false
}
badRef, ok := validateMediaParts(parsed)
if ok {
return false
}
shown := badRef
if len(shown) > 200 {
shown = shown[:200] + "…"
}
stream, _ := parsed["stream"].(bool)
s.recordRejection(rejectionInfo{
r: r,
stage: "validation",
reasonCode: "bad_param",
httpStatus: http.StatusBadRequest,
keyID: keyIDFromContext(r.Context()),
consumerKeyHash: store.HashKey(consumerKeyFromContext(r.Context())),
requestedModel: publicModel,
resolvedModel: model,
stream: stream,
requiresVision: true,
hasTools: hasTools,
params: rejectionSamplingParams(parsed),
})
s.ddIncr("inference.media_remote_url_rejected", []string{"model:" + model})
writeJSON(w, http.StatusBadRequest, errorResponse("invalid_request_error",
"image/video input must be an inline base64 data: URI (e.g. \"data:image/jpeg;base64,…\"); "+
"remote http(s):// and file:// media URLs are not supported on this end-to-end-encrypted endpoint. Got: "+shown,
withParam("messages")))
return true
}

// requestHasTools reports whether the request carries a non-empty top-level
// "tools" array (Chat Completions and Responses API share the field name).
// Drives Traits.HasTools so tool-bearing requests only route to providers whose
// binaries survive tool-schema template rendering (version floor + per-model
// template_render_ok gate in the scheduler).
func requestHasTools(parsed map[string]any) bool {
tools, ok := parsed["tools"].([]any)
return ok && len(tools) > 0
}

func requestCacheAffinityKey(parsed map[string]any) string {
raw, ok := parsed["prompt_cache_key"].(string)
if !ok || raw == "" {
return ""
}
const maxPromptCacheKeyBytes = 512
if len(raw) > maxPromptCacheKeyBytes {
return ""
}
sum := sha256.Sum256([]byte(raw))
return fmt.Sprintf("%x", sum[:])
}

func estimateRequestedMaxTokens(parsed map[string]any) int {
for _, key := range []string{"max_tokens", "max_completion_tokens", "max_output_tokens"} {
if n, ok := intFromRequestValue(parsed[key]); ok && n > 0 {
if copies, ok := intFromRequestValue(parsed["n"]); ok && copies > 1 {
return n * copies
}
return n
}
}
if copies, ok := intFromRequestValue(parsed["n"]); ok && copies > 1 {
return 256 * copies
}
return 256
}

func parseProviderSerialAllowlist(parsed map[string]any) ([]string, bool, error) {
var rawValues []any
provided := false
for _, key := range []string{"provider_serial", "provider_serials"} {
v, ok := parsed[key]
if !ok {
continue
}
provided = true
switch x := v.(type) {
case string:
rawValues = append(rawValues, x)
case []any:
rawValues = append(rawValues, x...)
default:
return nil, true, fmt.Errorf("%s must be a string or array of strings", key)
}
}
if !provided {
return nil, false, nil
}

seen := make(map[string]struct{}, len(rawValues))
ids := make([]string, 0, len(rawValues))
for _, raw := range rawValues {
id, ok := raw.(string)
if !ok {
return nil, true, fmt.Errorf("provider_serials must contain only strings")
}
id = strings.TrimSpace(id)
if id == "" {
continue
}
if _, exists := seen[id]; exists {
continue
}
seen[id] = struct{}{}
ids = append(ids, id)
}
if len(ids) == 0 {
return nil, true, fmt.Errorf("provider_serials must include at least one provider serial")
}
return ids, true, nil
}

func stripProviderRoutingFields(parsed map[string]any) bool {
changed := false
for _, key := range []string{"provider_serial", "provider_serials"} {
if _, ok := parsed[key]; ok {
delete(parsed, key)
changed = true
}
}
return changed
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 [INFO] 🧩 New 561-line file for request introspection helpers may be over-modularized

💡 Suggestion: Evaluate if these pure helper functions need their own file or could be organized differently within existing files

📊 Score: 2×3 = 6 · Category: over-abstraction

Comment on lines +101 to 108
// `inner` is a QuantizedKVCache, so its `copy()` must return one too.
// Assert that invariant explicitly rather than force-casting blind.
guard let copiedInner = inner.copy() as? QuantizedKVCache else {
preconditionFailure(
"ProtocolSafeQuantizedKVCache.copy(): inner.copy() did not return a QuantizedKVCache")
}
let new = ProtocolSafeQuantizedKVCache(
groupSize: groupSize,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 [INFO] 🧩 Added preconditionFailure with detailed error message for type casting that should never fail

💡 Suggestion: Consider if the guard+preconditionFailure is necessary or if the original force-cast with a comment would be clearer

📊 Score: 2×2 = 4 · Category: over-abstraction

Comment on lines 175 to 191
if billingSessionID != "" {
_ = s.billing.Store().CompleteBillingSession(billingSessionID)
// Best-effort: the deposit is already credited above, but a failure here
// leaves the session marked incomplete (and replayable). Surface it.
if err := s.billing.Store().CompleteBillingSession(billingSessionID); err != nil {
s.logger.Error("stripe: failed to mark billing session complete",
"billing_session_id", billingSessionID, "error", err)
s.ddIncr("billing.session_complete_failed", nil)
}
}
if referralCode != "" {
_ = s.billing.Referral().Apply(consumerKey, referralCode)
// Best-effort: a failure here means the referrer is not credited for this
// deposit; never silently swallow it.
if err := s.billing.Referral().Apply(consumerKey, referralCode); err != nil {
s.logger.Error("stripe: failed to apply referral credit", "error", err)
s.ddIncr("billing.referral_apply_failed", nil)
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 [INFO] 🧩 Similar error handling pattern repeated for billing session completion and referral application

💡 Suggestion: Consider extracting a helper function for the common 'log error + increment metric' pattern

📊 Score: 1×2 = 2 · Category: duplicate logic

@blacksmith-sh

blacksmith-sh Bot commented Jun 22, 2026

Copy link
Copy Markdown
Contributor

Found 1 test failure on Blacksmith runners:

Failure

Test View Logs
github.com/eigeninference/d-inference/e2e/TestIntegration_ConcurrentRequests View Logs

Fix with Codesmith
Need help on this PR? Tag /codesmith with what you need.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: b4549127f3

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".


// Both should be findable for the same model
p1 := reg.FindProvider(model)
p1 := findRoutableProvider(reg, model)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Update remaining FindProvider callers

When running the root E2E package, e2e/integration_test.go:607 still calls s.Coordinator.Registry.FindProvider(model), but this commit removes that registry method. The in-package tests are migrated to findRoutableProvider here, so go test ./e2e/... or an all-packages test run no longer compiles until that caller is migrated or a compatibility shim is kept.

Useful? React with 👍 / 👎.

name: "darkbloom",
dependencies: [
"ProviderCore",
"ProviderBenchmark",

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Split benchmark code out of the shipped CLI

scripts/install.sh:11 installs the darkbloom binary as the provider bundle entrypoint, so adding ProviderBenchmark to the darkbloom target means every installed provider still links the benchmark-only target, including the fatal-error cache stubs this refactor was trying to keep out of production binaries. If serving installs are meant to stay lean, move the benchmark command to a separate executable or otherwise avoid depending on ProviderBenchmark from darkbloom.

Useful? React with 👍 / 👎.

Gajesh2007 and others added 3 commits June 21, 2026 23:00
Round-1 cleanup deleted registry.FindProvider but only re-ran go test
inside coordinator/, missing the e2e package. The lone surviving caller
in TestIntegration_SwiftProviderRealRoutingGates broke the e2e build
(Blacksmith e2e/Build).

Mirror the in-coordinator migration: add a local findRoutableProvider
helper that probes routability through the production ReserveProviderEx
path (same structural/privacy/trust/challenge/capacity gates), reserves
then releases capacity, and returns the selected provider. No deleted
methods or shims reintroduced.

Verified: go build ./... and go vet ./... (compiles e2e/) green from
repo root.

Co-authored-by: Cursor <cursoragent@cursor.com>
scripts/install.sh ships the `darkbloom` binary, but the `darkbloom`
target depended on ProviderBenchmark, which carried the KV-quant
gate/eval code AND the fatalError protocol-conformance KV-cache stubs
(ProtocolSafeQuantizedKVCache, the V-only/BFloat16 caches). Those
benchmark-only stubs were linking into every installed provider,
defeating round-1's goal.

Split ProviderBenchmark into two targets along the existing clean
boundary (verified: the lightweight runners and the KVQuant/ dir do not
reference each other):

  - ProviderBenchmark (lean): ModelBenchmark, ThroughputSweep(+report),
    DecodeBandwidthModel — the only benchmark code `darkbloom benchmark`
    needs. Still linked by `darkbloom`, so the command keeps working.
  - ProviderBenchmarkKVQuant (heavy, has the fatalError stubs): the
    KVQuant/ dir, linked ONLY by kv-quant-gate, kv-attn-selftest, and
    their tests — never by `darkbloom`.

kv-quant-gate / kv-attn-selftest / the 3 KVQuant test files now import
ProviderBenchmarkKVQuant; the darkbloom benchmark command and
ThroughputSweepTests keep importing the lean ProviderBenchmark.

Bundle-safe: release builds by product (`swift build -c release
--product darkbloom`); product names, install paths, and
LatestProviderVersion are unchanged.

Verified: `swift build --build-tests` green (all targets + tests).
nm proof: darkbloom links 0 ProviderBenchmarkKVQuant symbols (939 lean
ProviderBenchmark symbols intact); kv-quant-gate links 3694.

Co-authored-by: Cursor <cursoragent@cursor.com>
Behavior-preserving cleanup of the alias capacity/TTFT fallback helper.
Both fallback modes already share a SINGLE Previous-build capacity probe
(QuickCapacityCheckWithTTFTForRequest) — the desired build is probed once
at the call site and the previous build once here, on different models, so
there is no redundant capacity check to remove. This reduces the residual
branching instead:

  - merge the two identical modelShed / !IsModelInCatalog early-return
    guards (both pure, both return the same tuple) into one
  - hoist the duplicated `mode == aliasFallbackTTFT` test into a single
    `enforceTTFT` bool and drop the `tooSlow` temp

The failure-path model selection is unchanged (TTFT mode reports the
probed Previous build the caller uses as the alternate TTFT estimate;
capacity mode keeps the current build it discards), so all returns are
byte-identical to before.

Verified: coordinator/api alias fallback tests + full `go test ./...`
green.

Co-authored-by: Cursor <cursoragent@cursor.com>
@Gajesh2007

Copy link
Copy Markdown
Member Author

Thanks for the reviews — addressed in da789aec..0497c003:

Fixed

  • CI / e2e build (Codex P1): the e2e/ package still called the removed Registry.FindProvider; round-1 only ran go test inside coordinator/, so it slipped through. Migrated to a ReserveProviderEx-based findRoutableProvider helper. go build ./... && go vet ./... from the repo root is now green (e2e compiles).
  • Benchmark in shipped binary (Codex P2): split ProviderBenchmark into a lean target (kept by darkbloom) + ProviderBenchmarkKVQuant (the fatalError KV-cache stubs, now linked only by kv-quant-gate/kv-attn-selftest). nm confirms the shipped darkbloom links 0 KVQuant stub symbols; darkbloom benchmark still works.
  • maybeFallbackAlias complexity: reduced branching (merged the duplicate shed/catalog guards, single TTFT bool), behavior preserved. The two capacity probes are for different models (Desired vs Previous), so there was no redundant check to remove.

Verified (threat-model follow-ups): the ProviderLogger move was verbatim (no privacy annotation lost), usage token counts intact, and the RuntimeVerified/CodeAttested routing gates are unchanged (only dead code was removed).

Respectfully declined:

  • Folding apikey_handlers.go / request_introspection.go back into consumer.go — the split is intentional per the repo's split-by-concern modularity guidance, and the test files are already named to match.
  • Making the billing webhook ops async — deliberate log+metric design; async settlement is a separate architectural change.
  • Reverting the Swift guard/preconditionFailure to a force-cast — the guard is strictly safer.

@ethenotethan ethenotethan left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated Code Review — Layr-Labs/d-inference#

Verdict: REQUEST_CHANGES

Security — ⚠️ SKIPPED

Error: Expecting property name enclosed in double quotes: line 2 column 5 (char 6)

Performance — 4 finding(s) (2 blocking)

  • 🔵 [INFO] coordinator/api/billing_handlers.go:176-181 — Silent error swallowing replaced with logging but no error propagation
    • Suggestion: Consider whether billing session completion failures should propagate to the caller or affect the response, as financial operations typically require strict error handling
  • 🔵 [INFO] coordinator/api/billing_handlers.go:183-188 — Referral credit failures now logged but still best-effort
    • Suggestion: Consider whether referral credit failures should be retried or queued for later processing to ensure referrers receive proper credit
  • 🟡 [MEDIUM] coordinator/api/provider.go:2247-2252 — Financial refund failures now logged but could still result in overcharging consumers
    • Suggestion: Consider implementing retry logic or dead letter queue for failed refunds to prevent consumer overcharging
  • 🟡 [MEDIUM] coordinator/api/provider.go:2437-2442 — Platform fee credit failures could drop revenue accounting
    • Suggestion: Consider implementing retry logic or persistent queue for platform fee credits to ensure accurate revenue tracking

Type_diligence — ✅ No issues found

Additive_complexity — 6 finding(s) (1 blocking)

  • 🔵 [INFO] coordinator/api/apikey_handlers.go:1-447 — New 447-line file extracted from consumer.go but still tightly coupled to Server
    • Suggestion: Consider making this a separate package with its own interface to reduce coupling to the main Server struct
  • 🔵 [INFO] coordinator/api/billing_handlers.go:176-190 — Error handling pattern repeated for billing session completion and referral application
    • Suggestion: Extract a helper function for the common 'log error + emit telemetry' pattern used in both error cases
  • 🔵 [INFO] coordinator/api/consumer.go:388-444 — aliasFallbackMode enum with only two values and complex parameter passing
    • Suggestion: Consider using a boolean parameter instead of an enum with just two cases, or inline the logic if the abstraction doesn't add clarity
  • 🟡 [MEDIUM] coordinator/api/consumer.go:416-444 — maybeFallbackAlias function has 9 parameters and complex conditional logic
    • Suggestion: Break this into smaller functions or use a struct to group related parameters. The function is doing too many things at once
  • 🔵 [INFO] coordinator/api/request_introspection.go:1-561 — Large 561-line file mixing pure functions with Server-dependent methods
    • Suggestion: Split into pure utility functions (token counting, media detection) and Server-dependent methods (rejection handling with telemetry)
  • 🔵 [INFO] provider-swift/Sources/ProviderCore/ProviderLogger.swift:1-46 — Custom logger wrapper around os.Logger with conditional compilation
    • Suggestion: Consider using Swift's built-in Logger from the Logging library or a more established logging framework instead of a custom wrapper

10 finding(s) total, 3 blocking. Verdict: REQUEST_CHANGES.

🤖 Automated review by Centaur · DAR-186

Comment on lines +176 to +181
// Best-effort: the deposit is already credited above, but a failure here
// leaves the session marked incomplete (and replayable). Surface it.
if err := s.billing.Store().CompleteBillingSession(billingSessionID); err != nil {
s.logger.Error("stripe: failed to mark billing session complete",
"billing_session_id", billingSessionID, "error", err)
s.ddIncr("billing.session_complete_failed", nil)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 [INFO] ⚡ Silent error swallowing replaced with logging but no error propagation

💡 Suggestion: Consider whether billing session completion failures should propagate to the caller or affect the response, as financial operations typically require strict error handling

📊 Score: 2×3 = 6 · Category: error_handling

Comment on lines 183 to +188
}
if referralCode != "" {
_ = s.billing.Referral().Apply(consumerKey, referralCode)
// Best-effort: a failure here means the referrer is not credited for this
// deposit; never silently swallow it.
if err := s.billing.Referral().Apply(consumerKey, referralCode); err != nil {
s.logger.Error("stripe: failed to apply referral credit", "error", err)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 [INFO] ⚡ Referral credit failures now logged but still best-effort

💡 Suggestion: Consider whether referral credit failures should be retried or queued for later processing to ensure referrers receive proper credit

📊 Score: 2×3 = 6 · Category: error_handling

Comment on lines +2247 to +2252
// Financial: a failed refund over-charges the consumer. Never swallow it.
if err := s.store.Credit(pr.ConsumerKey, refund, store.LedgerRefund, msg.RequestID); err != nil {
s.logger.Error("failed to credit settlement refund to consumer",
"request_id", msg.RequestID, "refund_micro_usd", refund, "error", err)
s.ddIncr("billing.credit_failed", []string{"op:settlement_refund"})
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [MEDIUM] ⚡ Financial refund failures now logged but could still result in overcharging consumers

💡 Suggestion: Consider implementing retry logic or dead letter queue for failed refunds to prevent consumer overcharging

📊 Score: 3×4 = 12 · Category: error_handling

Comment on lines +2437 to +2442
// Financial: a failed platform-fee credit drops revenue accounting. Never swallow it.
if err := s.store.Credit("platform", platformFee, store.LedgerPlatformFee, msg.RequestID); err != nil {
s.logger.Error("failed to credit platform fee",
"request_id", msg.RequestID, "platform_fee_micro_usd", platformFee, "error", err)
s.ddIncr("billing.credit_failed", []string{"op:platform_fee"})
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [MEDIUM] ⚡ Platform fee credit failures could drop revenue accounting

💡 Suggestion: Consider implementing retry logic or persistent queue for platform fee credits to ensure accurate revenue tracking

📊 Score: 3×4 = 12 · Category: error_handling

Comment on lines +1 to +447
package api

// apikey_handlers.go holds the consumer-facing API key management endpoints
// (create/list/get/update/rotate/delete) plus their request validation and
// response-shaping helpers. Split out of consumer.go to keep key management
// separate from the inference request path.

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"

"github.com/eigeninference/d-inference/coordinator/auth"
"github.com/eigeninference/d-inference/coordinator/store"

"github.com/eigeninference/d-inference/coordinator/api/types"
)

// handleCreateKey handles POST /v1/auth/keys — creates a new consumer API key.
// Requires Privy authentication. The key is linked to the user's account so
// requests made with the key are billed to the same account.
func (s *Server) handleCreateKey(w http.ResponseWriter, r *http.Request) {
user := auth.UserFromContext(r.Context())
if user == nil {
writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error",
"API key creation requires a Privy account — authenticate with a Privy access token"))
return
}

key, err := s.store.CreateKeyForAccount(user.AccountID)
if err != nil {
writeJSON(w, http.StatusInternalServerError, errorResponse("server_error", "failed to create key"))
return
}
writeJSON(w, http.StatusOK, types.CreateKeyResponse{
APIKey: key,
AccountID: user.AccountID,
})
}

// handleRevokeKey handles DELETE /v1/auth/keys — revokes an API key.
// The caller must own the key (same account). Requires Privy auth so a
// compromised API key cannot revoke legitimate keys.
func (s *Server) handleRevokeKey(w http.ResponseWriter, r *http.Request) {
user := auth.UserFromContext(r.Context())
if user == nil {
writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required"))
return
}

var body struct {
Key string `json:"key"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Key == "" {
writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", "provide {\"key\": \"sk-db-...\"}"))
return
}

owner := s.store.GetKeyAccount(body.Key)
if owner != user.AccountID {
writeJSON(w, http.StatusForbidden, errorResponse("forbidden", "you can only revoke your own keys"))
return
}

if !s.store.RevokeKey(body.Key) {
writeJSON(w, http.StatusNotFound, errorResponse("not_found", "key not found or already revoked"))
return
}
s.invalidateAPIKeyCache(body.Key)

writeJSON(w, http.StatusOK, types.RevokeKeyResponse{Status: "revoked"})
}

// apiKeyToResponse projects a stored key into its masked API representation,
// computing the current-window spend and remaining budget.
func (s *Server) apiKeyToResponse(k *store.APIKey) types.APIKeyResponse {
resp := types.APIKeyResponse{
ID: k.ID,
Name: k.Name,
Label: k.Label,
Disabled: k.Disabled,
LimitReset: store.NormalizeResetWindow(k.LimitReset),
RPMLimit: k.RPMLimit,
ITPMLimit: k.ITPMLimit,
OTPMLimit: k.OTPMLimit,
AllowedModels: k.AllowedModels,
SelfRouteOnly: k.SelfRouteOnly,
ExpiresAt: k.ExpiresAt,
CreatedAt: k.CreatedAt,
LastUsedAt: k.LastUsedAt,
}
since := store.KeySpendWindowStart(resp.LimitReset, time.Now())
spent := s.store.KeySpendSince(k.ID, since)
resp.UsageUSD = microToUSD(spent)
if k.LimitMicroUSD != nil {
limitUSD := microToUSD(*k.LimitMicroUSD)
resp.LimitUSD = &limitUSD
remaining := *k.LimitMicroUSD - spent
if remaining < 0 {
remaining = 0
}
remUSD := microToUSD(remaining)
resp.RemainingUSD = &remUSD
}
return resp
}

// validateKeyLimitInputs sanity-checks user-supplied limit values. Returns a
// human-readable error string (empty when valid).
func validateKeyLimitInputs(reset string, limitUSD *float64, rpm, itpm, otpm *int64, expiresAt *time.Time) string {
switch reset {
case "", store.KeyResetNone, store.KeyResetDaily, store.KeyResetWeekly, store.KeyResetMonthly:
default:
return "limit_reset must be one of: none, daily, weekly, monthly"
}
if limitUSD != nil && *limitUSD < 0 {
return "limit_usd must be >= 0"
}
if rpm != nil && *rpm < 0 {
return "rpm_limit must be >= 0"
}
if itpm != nil && *itpm < 0 {
return "itpm_limit must be >= 0"
}
if otpm != nil && *otpm < 0 {
return "otpm_limit must be >= 0"
}
if expiresAt != nil && !expiresAt.IsZero() && expiresAt.Before(time.Now()) {
return "expires_at must be in the future"
}
return ""
}

// keyModelAllowed returns false when the calling key restricts models via an
// allow-list that does not include the requested model. Account-scoped/legacy
// keys (no key in context) and keys without an allow-list always pass.
func (s *Server) keyModelAllowed(ctx context.Context, model string) bool {
k := apiKeyFromContext(ctx)
if k == nil || len(k.AllowedModels) == 0 {
return true
}
for _, m := range k.AllowedModels {
if m == model {
return true
}
}
return false
}

// checkKeySpendCap reports whether charging additionalMicroUSD to the calling
// key would exceed its per-key spend cap in the current window. It returns
// (message, ok); ok=false means the request must be rejected with 402. The
// per-account balance ledger is still the hard atomic ceiling — this is the
// soft, per-key sub-cap, enforced against settled usage (so concurrent
// in-flight requests are eventually-consistent, never over the account balance).
func (s *Server) checkKeySpendCap(ctx context.Context, additionalMicroUSD int64) (string, bool) {
k := apiKeyFromContext(ctx)
if k == nil || k.ID == "" || k.LimitMicroUSD == nil {
return "", true
}
since := store.KeySpendWindowStart(k.LimitReset, time.Now())
spent := s.store.KeySpendSince(k.ID, since)
if spent+additionalMicroUSD > *k.LimitMicroUSD {
window := store.NormalizeResetWindow(k.LimitReset)
if window == store.KeyResetNone {
window = "total"
}
return fmt.Sprintf("API key spend limit reached (%s cap $%.2f, used $%.2f) — raise this key's limit or use another key",
window, microToUSD(*k.LimitMicroUSD), microToUSD(spent)), false
}
return "", true
}

// handleListAPIKeys handles GET /v1/keys — lists the caller's keys (masked).
func (s *Server) handleListAPIKeys(w http.ResponseWriter, r *http.Request) {
user := auth.UserFromContext(r.Context())
if user == nil {
writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required"))
return
}
keys, err := s.store.ListAPIKeys(user.AccountID)
if err != nil {
writeJSON(w, http.StatusInternalServerError, errorResponse("server_error", "failed to list keys"))
return
}
out := make([]types.APIKeyResponse, 0, len(keys))
for i := range keys {
out = append(out, s.apiKeyToResponse(&keys[i]))
}
writeJSON(w, http.StatusOK, types.APIKeyListResponse{Object: "list", Data: out})
}

// handleCreateAPIKey handles POST /v1/keys — mints a new named, optionally
// limited key. The raw secret is returned exactly once.
func (s *Server) handleCreateAPIKey(w http.ResponseWriter, r *http.Request) {
user := auth.UserFromContext(r.Context())
if user == nil {
writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error",
"API key creation requires a Privy account — authenticate with a Privy access token"))
return
}

var req createAPIKeyRequest
if r.Body != nil {
// A missing/empty body is allowed (creates a default unnamed key).
if err := json.NewDecoder(r.Body).Decode(&req); err != nil && !errors.Is(err, io.EOF) {
writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", "invalid JSON body"))
return
}
}
if msg := validateKeyLimitInputs(req.LimitReset, req.LimitUSD, req.RPMLimit, req.ITPMLimit, req.OTPMLimit, req.ExpiresAt); msg != "" {
writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", msg))
return
}

opts := store.APIKeyCreate{
Name: strings.TrimSpace(req.Name),
LimitReset: store.NormalizeResetWindow(req.LimitReset),
RPMLimit: req.RPMLimit,
ITPMLimit: req.ITPMLimit,
OTPMLimit: req.OTPMLimit,
AllowedModels: req.AllowedModels,
SelfRouteOnly: req.SelfRouteOnly,
ExpiresAt: req.ExpiresAt,
}
if req.LimitUSD != nil {
m := usdToMicro(*req.LimitUSD)
opts.LimitMicroUSD = &m
}

raw, rec, err := s.store.CreateAPIKey(user.AccountID, opts)
if err != nil {
writeJSON(w, http.StatusInternalServerError, errorResponse("server_error", "failed to create key"))
return
}
writeJSON(w, http.StatusOK, types.CreateAPIKeyResponse{
Key: raw,
Data: s.apiKeyToResponse(rec),
})
}

// handleGetAPIKey handles GET /v1/keys/{id}.
func (s *Server) handleGetAPIKey(w http.ResponseWriter, r *http.Request) {
user := auth.UserFromContext(r.Context())
if user == nil {
writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required"))
return
}
id := r.PathValue("id")
k, err := s.store.GetAPIKeyByID(user.AccountID, id)
if err != nil {
writeJSON(w, http.StatusNotFound, errorResponse("not_found", "key not found"))
return
}
writeJSON(w, http.StatusOK, s.apiKeyToResponse(k))
}

// handleGetCallingKey handles GET /v1/key — returns the metadata for the API
// key used to authenticate the request (OpenRouter parity).
func (s *Server) handleGetCallingKey(w http.ResponseWriter, r *http.Request) {
k := apiKeyFromContext(r.Context())
if k == nil || k.ID == "" {
writeJSON(w, http.StatusNotFound, errorResponse("not_found",
"no key metadata — this endpoint requires API key authentication"))
return
}
writeJSON(w, http.StatusOK, s.apiKeyToResponse(k))
}

// handleUpdateAPIKey handles PATCH /v1/keys/{id} — sparse update of a key's
// name, disabled flag, limits, reset window, expiry, and model allow-list.
func (s *Server) handleUpdateAPIKey(w http.ResponseWriter, r *http.Request) {
user := auth.UserFromContext(r.Context())
if user == nil {
writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required"))
return
}
id := r.PathValue("id")
existing, err := s.store.GetAPIKeyByID(user.AccountID, id)
if err != nil {
writeJSON(w, http.StatusNotFound, errorResponse("not_found", "key not found"))
return
}

// Decode into a presence map so we can distinguish "field omitted" (leave
// unchanged) from "field set to null" (clear the limit).
var patch map[string]json.RawMessage
if err := json.NewDecoder(r.Body).Decode(&patch); err != nil {
writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", "invalid JSON body"))
return
}
if msg := applyKeyPatch(existing, patch); msg != "" {
writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", msg))
return
}
if msg := validateKeyLimitInputs(existing.LimitReset, nil, existing.RPMLimit, existing.ITPMLimit, existing.OTPMLimit, existing.ExpiresAt); msg != "" {
writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", msg))
return
}

// Bump the auth-cache generation before AND after the mutation so a
// concurrent request cannot keep authenticating with a stale (e.g.
// just-disabled) cached record.
s.invalidateAllAPIKeyCache()
updated, err := s.store.UpdateAPIKey(user.AccountID, id, *existing)
if err != nil {
writeJSON(w, http.StatusInternalServerError, errorResponse("server_error", "failed to update key"))
return
}
s.invalidateAllAPIKeyCache()
writeJSON(w, http.StatusOK, s.apiKeyToResponse(updated))
}

// applyKeyPatch merges a presence-aware PATCH body into an existing key record.
// Returns a human-readable error string on invalid input (empty when ok).
func applyKeyPatch(k *store.APIKey, patch map[string]json.RawMessage) string {
if raw, ok := patch["name"]; ok {
var name string
if err := json.Unmarshal(raw, &name); err != nil {
return "invalid value for name"
}
k.Name = strings.TrimSpace(name)
}
if raw, ok := patch["disabled"]; ok {
var disabled bool
if err := json.Unmarshal(raw, &disabled); err != nil {
return "invalid value for disabled"
}
k.Disabled = disabled
}
if raw, ok := patch["limit_reset"]; ok {
var reset string
if err := json.Unmarshal(raw, &reset); err != nil {
return "invalid value for limit_reset"
}
k.LimitReset = store.NormalizeResetWindow(reset)
}
if raw, ok := patch["limit_usd"]; ok {
if string(raw) == "null" {
k.LimitMicroUSD = nil
} else {
var usd float64
if err := json.Unmarshal(raw, &usd); err != nil {
return "invalid value for limit_usd"
}
if usd < 0 {
return "limit_usd must be >= 0"
}
m := usdToMicro(usd)
k.LimitMicroUSD = &m
}
}
if raw, ok := patch["allowed_models"]; ok {
if string(raw) == "null" {
k.AllowedModels = nil
} else {
var models []string
if err := json.Unmarshal(raw, &models); err != nil {
return "invalid value for allowed_models"
}
k.AllowedModels = models
}
}
if raw, ok := patch["self_route_only"]; ok {
var v bool
if err := json.Unmarshal(raw, &v); err != nil {
return "invalid value for self_route_only"
}
k.SelfRouteOnly = v
}
for field, dst := range map[string]**int64{
"rpm_limit": &k.RPMLimit,
"itpm_limit": &k.ITPMLimit,
"otpm_limit": &k.OTPMLimit,
} {
if raw, ok := patch[field]; ok {
if string(raw) == "null" {
*dst = nil
} else {
var v int64
if err := json.Unmarshal(raw, &v); err != nil {
return "invalid value for " + field
}
*dst = &v
}
}
}
if raw, ok := patch["expires_at"]; ok {
if string(raw) == "null" {
k.ExpiresAt = nil
} else {
var t time.Time
if err := json.Unmarshal(raw, &t); err != nil {
return "invalid value for expires_at (use RFC 3339)"
}
k.ExpiresAt = &t
}
}
return ""
}

// handleDeleteAPIKey handles DELETE /v1/keys/{id} — permanently deletes a key.
func (s *Server) handleDeleteAPIKey(w http.ResponseWriter, r *http.Request) {
user := auth.UserFromContext(r.Context())
if user == nil {
writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required"))
return
}
id := r.PathValue("id")
s.invalidateAllAPIKeyCache()
if err := s.store.RevokeAPIKeyByID(user.AccountID, id); err != nil {
writeJSON(w, http.StatusNotFound, errorResponse("not_found", "key not found"))
return
}
s.invalidateAllAPIKeyCache()
writeJSON(w, http.StatusOK, types.RevokeKeyResponse{Status: "revoked"})
}

// handleRotateAPIKey handles POST /v1/keys/{id}/rotate — mints a fresh secret
// carrying the same limits and metadata, then deletes the old key. The new
// secret is returned exactly once.
func (s *Server) handleRotateAPIKey(w http.ResponseWriter, r *http.Request) {
user := auth.UserFromContext(r.Context())
if user == nil {
writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required"))
return
}
id := r.PathValue("id")
// Bump the auth-cache generation before AND after the mutation so the old
// secret stops authenticating the instant rotation commits.
s.invalidateAllAPIKeyCache()
raw, rec, err := s.store.RotateAPIKey(user.AccountID, id)
if err != nil {
writeJSON(w, http.StatusNotFound, errorResponse("not_found", "key not found"))
return
}
s.invalidateAllAPIKeyCache()
writeJSON(w, http.StatusOK, types.CreateAPIKeyResponse{
Key: raw,
Data: s.apiKeyToResponse(rec),
})
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 [INFO] 🧩 New 447-line file extracted from consumer.go but still tightly coupled to Server

💡 Suggestion: Consider making this a separate package with its own interface to reduce coupling to the main Server struct

📊 Score: 2×3 = 6 · Category: misplaced responsibility

Comment on lines +176 to +190
// Best-effort: the deposit is already credited above, but a failure here
// leaves the session marked incomplete (and replayable). Surface it.
if err := s.billing.Store().CompleteBillingSession(billingSessionID); err != nil {
s.logger.Error("stripe: failed to mark billing session complete",
"billing_session_id", billingSessionID, "error", err)
s.ddIncr("billing.session_complete_failed", nil)
}
}
if referralCode != "" {
_ = s.billing.Referral().Apply(consumerKey, referralCode)
// Best-effort: a failure here means the referrer is not credited for this
// deposit; never silently swallow it.
if err := s.billing.Referral().Apply(consumerKey, referralCode); err != nil {
s.logger.Error("stripe: failed to apply referral credit", "error", err)
s.ddIncr("billing.referral_apply_failed", nil)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 [INFO] 🧩 Error handling pattern repeated for billing session completion and referral application

💡 Suggestion: Extract a helper function for the common 'log error + emit telemetry' pattern used in both error cases

📊 Score: 1×2 = 2 · Category: duplicate logic

Comment on lines +1 to +561
package api

// request_introspection.go holds helpers for introspecting and lightly
// reshaping inbound inference request bodies before routing/dispatch:
// token and cost estimation (routing vs billing), media/tool detection,
// remote media-URL rejection, cache-affinity key derivation, and
// provider-serial allowlist parsing. Most are pure helpers with no Server
// state; the pre-dispatch media-URL rejection (rejectRemoteMediaURLs) hangs
// off *Server only to record rejection telemetry. Split out of consumer.go
// to keep the request-handling orchestrator thin.

import (
"crypto/sha256"
"encoding/json"
"fmt"
"net/http"
"os"
"strconv"
"strings"

"github.com/eigeninference/d-inference/coordinator/store"
)

// Media prompt-token costs. A vision encoder turns each image/video into a
// bounded number of soft tokens (Gemma 4 caps around a few hundred per image)
// regardless of the base64 byte length, so counting a `data:` URI as text
// inflates the estimate by orders of magnitude — distorting routing admission and
// over-reserving balance. These flat per-media costs keep both sane.
const (
imagePromptTokenCost = 300
videoPromptTokenCost = 1500
)

func intFromRequestValue(v any) (int, bool) {
switch x := v.(type) {
case int:
return x, true
case int32:
return int(x), true
case int64:
return int(x), true
case float64:
return int(x), true
case json.Number:
n, err := x.Int64()
if err != nil {
return 0, false
}
return int(n), true
default:
return 0, false
}
}

// approximateTokenCount returns a rough token estimate for routing and queue
// admission. The len/4 heuristic is a reasonable average for English text
// with GPT-style BPE tokenizers. This value feeds into the scheduler's
// capacity checks (pendingTokenBudget, freeMemoryAdmits) where a tighter
// estimate produces better routing decisions.
//
// For billing reservation (where underestimation causes provider shortfall),
// use approximateTokenCountUpperBound instead.
func approximateTokenCount(v any) int {
if v == nil {
return 0
}
switch x := v.(type) {
case string:
if x == "" {
return 0
}
tokens := len(x) / 4
if tokens < 1 {
tokens = 1
}
return tokens
default:
b, err := json.Marshal(v)
if err != nil {
return 0
}
tokens := len(b) / 4
if tokens < 1 {
tokens = 1
}
return tokens
}
}

// approximateTokenCountUpperBound returns a guaranteed upper bound on the
// number of tokens a BPE tokenizer would produce for v. Every BPE vocabulary
// starts with one token per byte and can only merge, so len(text) >= tokens
// for any model family, any language, forever. This is used only for billing
// reservation to ensure the pre-flight debit always covers the actual cost.
//
// Using len(text) over-reserves by ~3-4x on average for English prose, but
// the difference is refunded immediately after inference completes, so
// consumers are never overcharged — they only need sufficient balance to
// cover the reservation hold.
func approximateTokenCountUpperBound(v any) int {
if v == nil {
return 0
}
switch x := v.(type) {
case string:
return len(x)
default:
b, err := json.Marshal(v)
if err != nil {
return 0
}
return len(b)
}
}

func estimatePromptTokens(parsed map[string]any) int {
total := 0
if v, ok := parsed["messages"]; ok {
total += messagesPromptTokens(v)
}
if v, ok := parsed["input"]; ok {
total += inputPromptTokens(v)
}
if v, ok := parsed["prompt"]; ok {
total += approximateTokenCount(v)
}
if total == 0 {
total = approximateTokenCount(parsed)
}
return total
}

// estimateBillingPromptTokens returns a guaranteed upper bound on prompt
// tokens for billing reservation. Uses byte-length (not len/4) so the
// pre-flight reservation always covers actual cost. This value must NOT
// be used for routing — see estimatePromptTokens for that.
func estimateBillingPromptTokens(parsed map[string]any) int {
total := 0
if v, ok := parsed["messages"]; ok {
// Billing MUST stay a guaranteed upper bound (len(bytes) >= tokens for any
// BPE tokenizer), so it keeps counting full message bytes — including a
// base64 image's bytes and every non-content field (role, tool_calls,
// name). Switching to the media-aware flat count here would DROP those
// fields and under-reserve for tool-calling requests. Over-reservation on a
// large image is safe (it is refunded after inference); the routing/ITPM
// estimate (estimatePromptTokens) is the media-aware one.
total += approximateTokenCountUpperBound(v)
}
if v, ok := parsed["input"]; ok {
total += approximateTokenCountUpperBound(v)
}
if v, ok := parsed["prompt"]; ok {
total += approximateTokenCountUpperBound(v)
}
if total == 0 {
total = approximateTokenCountUpperBound(parsed)
}
return total
}

// isMediaPartType reports whether an OpenAI/OpenRouter content-part type denotes
// image or video input.
func isMediaPartType(t string) bool {
switch t {
// OpenAI chat (image_url/video_url), OpenAI Responses (input_image/input_video),
// and Anthropic /v1/messages content blocks ({"type":"image"|"video","source":…}).
case "image_url", "input_image", "image", "video_url", "input_video", "video":
return true
}
return false
}

// messageContentTokens estimates ROUTING prompt tokens for one message's
// `content`, counting text parts as text (len/4) and each image/video part as a
// flat media cost (never the base64 length). Used only for the routing/ITPM
// estimate; billing uses approximateTokenCountUpperBound (a guaranteed upper
// bound that intentionally still counts the base64 bytes).
func messageContentTokens(content any) int {
textTokens := func(s string) int {
if s == "" {
return 0
}
if t := len(s) / 4; t > 0 {
return t
}
return 1
}
switch c := content.(type) {
case string:
return textTokens(c)
case []any:
total := 0
for _, part := range c {
pm, ok := part.(map[string]any)
if !ok {
continue
}
typ, _ := pm["type"].(string)
switch {
case typ == "text" || typ == "input_text":
if s, ok := pm["text"].(string); ok {
total += textTokens(s)
}
case typ == "image_url" || typ == "input_image" || typ == "image":
total += imagePromptTokenCost
case typ == "video_url" || typ == "input_video" || typ == "video":
total += videoPromptTokenCost
default:
if b, err := json.Marshal(pm); err == nil {
total += len(b) / 4
}
}
}
return total
default:
return approximateTokenCount(content)
}
}

// messagesPromptTokens sums media-aware routing content tokens across a messages
// array. Falls back to the len/4 heuristic when messages isn't the standard
// array shape.
func messagesPromptTokens(messages any) int {
arr, ok := messages.([]any)
if !ok {
return approximateTokenCount(messages)
}
total := 0
for _, m := range arr {
mm, ok := m.(map[string]any)
if !ok {
total += approximateTokenCount(m)
continue
}
total += 4 // small per-message framing (role + delimiters)
total += messageContentTokens(mm["content"])
}
return total
}

// inputPromptTokens estimates the Responses API `input` field. A string input
// is plain text (len/4). Structured input is an array of message-like items with
// `content` parts, so reuse the same media-aware content estimator as chat
// messages instead of counting JSON wrapper bytes.
func inputPromptTokens(input any) int {
switch x := input.(type) {
case string:
return approximateTokenCount(x)
case []any:
total := 0
for _, item := range x {
switch m := item.(type) {
case string:
total += approximateTokenCount(m)
case map[string]any:
content, ok := m["content"]
if !ok {
total += approximateTokenCount(m)
continue
}
total += 4 // role/type framing, matching messagesPromptTokens.
total += messageContentTokens(content)
default:
total += approximateTokenCount(item)
}
}
return total
default:
return approximateTokenCount(input)
}
}

// contentPartsHaveMedia reports whether a `content` value (a content-part array)
// carries any image/video part.
func contentPartsHaveMedia(content any) bool {
parts, ok := content.([]any)
if !ok {
return false
}
for _, part := range parts {
pm, ok := part.(map[string]any)
if !ok {
continue
}
if typ, _ := pm["type"].(string); isMediaPartType(typ) {
return true
}
}
return false
}

// detectMediaRequirement reports whether the request carries image/video input.
// The coordinator sees plaintext at this point (sealedTransport decrypts before
// the handler), so this drives the vision routing gate and the fail-fast "no
// vision-capable provider" response. It scans both the Chat Completions
// `messages[].content` parts and the Responses API `input[].content` parts so a
// media request on either surface is gated (never silently routed text-blind).
func detectMediaRequirement(parsed map[string]any) bool {
if messages, ok := parsed["messages"].([]any); ok {
for _, m := range messages {
if mm, ok := m.(map[string]any); ok && contentPartsHaveMedia(mm["content"]) {
return true
}
}
}
// Responses API: `input` may be a string (no media) or an array of items,
// each carrying `content` parts in the same image_url/input_image shape.
if input, ok := parsed["input"].([]any); ok {
for _, item := range input {
if im, ok := item.(map[string]any); ok && contentPartsHaveMedia(im["content"]) {
return true
}
}
}
return false
}

// isInlineDataURI reports whether a media reference is an inline base64 data: URI
// — the ONLY form the provider's E2E-encrypted VLM path accepts (see
// VLMRequestInference.MediaError.invalidURL, which 400s anything else).
func isInlineDataURI(s string) bool {
return strings.HasPrefix(strings.TrimSpace(s), "data:")
}

// mediaPartURLString returns the URL/inline reference carried by a media content
// part and whether the part IS a media part. Covers OpenAI chat (image_url/
// video_url objects or bare strings), OpenAI Responses (input_image/input_video),
// and Anthropic source blocks ({type:"image"|"video", source:{type,…}}). A media
// part whose reference can't be read returns ("", true) so the caller fails OPEN.
func mediaPartURLString(pm map[string]any) (ref string, isMedia bool) {
typ, _ := pm["type"].(string)
if !isMediaPartType(typ) {
return "", false
}
switch typ {
case "image_url", "input_image", "video_url", "input_video":
field := "image_url"
if typ == "video_url" || typ == "input_video" {
field = "video_url"
}
switch v := pm[field].(type) {
case string:
return v, true
case map[string]any:
if u, ok := v["url"].(string); ok {
return u, true
}
}
return "", true
case "image", "video": // Anthropic source block
if src, ok := pm["source"].(map[string]any); ok {
switch st, _ := src["type"].(string); st {
case "url":
u, _ := src["url"].(string)
return u, true // remote reference
case "base64":
// Inline raw base64 (not a data: URI). Treated as inline/OK in v1 —
// the provider's Anthropic path accepts it; only remote refs are the
// production storm. Marked inline so it is never rejected here.
return "data:anthropic-inline-base64", true
}
}
return "", true
}
return "", true
}

// validateMediaParts walks every media part in a chat (messages[]) or Responses
// (input[]) body and returns the first REMOTE / non-inline media reference. The
// provider VLM path accepts ONLY inline data: URIs, so a remote http(s)://,
// file://, or otherwise non-data: reference is the production gemma-vision 400
// source. Returns ok=false with the offending reference so the caller rejects it
// pre-dispatch (one clean 400) instead of dispatching and 400ing across the fleet.
// Unknown/unreadable part shapes fall through (fail-OPEN) — never wrongly 400 a
// body we don't model.
func validateMediaParts(parsed map[string]any) (badRef string, ok bool) {
check := func(content any) (string, bool) {
parts, ok := content.([]any)
if !ok {
return "", true
}
for _, p := range parts {
pm, ok := p.(map[string]any)
if !ok {
continue
}
ref, isMedia := mediaPartURLString(pm)
if !isMedia || ref == "" {
continue
}
if !isInlineDataURI(ref) {
return ref, false
}
}
return "", true
}
if msgs, ok := parsed["messages"].([]any); ok {
for _, m := range msgs {
if mm, ok := m.(map[string]any); ok {
if ref, good := check(mm["content"]); !good {
return ref, false
}
}
}
}
if input, ok := parsed["input"].([]any); ok {
for _, it := range input {
if im, ok := it.(map[string]any); ok {
if ref, good := check(im["content"]); !good {
return ref, false
}
}
}
}
return "", true
}

// visionRejectRemoteEnabled gates the C4 pre-dispatch remote-media-URL rejection.
// Default ON; DARKBLOOM_VISION_REJECT_REMOTE_URLS=false (or 0) restores the prior
// dispatch-then-provider-400 behavior (still bounded by C1). Read live so the
// kill switch toggles without a redeploy.
func visionRejectRemoteEnabled() bool {
v := strings.TrimSpace(os.Getenv("DARKBLOOM_VISION_REJECT_REMOTE_URLS"))
if v == "" {
return true
}
on, err := strconv.ParseBool(v)
return err != nil || on
}

// rejectRemoteMediaURLs fails a vision request fast (one terminal 400) when any
// media part carries a remote/non-inline URL, mirroring the provider's data:-only
// contract. Pre-dispatch — no provider is contacted. handled=true => caller returns.
func (s *Server) rejectRemoteMediaURLs(w http.ResponseWriter, r *http.Request, parsed map[string]any, model, publicModel string, requiresVision, hasTools bool) (handled bool) {
if !requiresVision || !visionRejectRemoteEnabled() {
return false
}
badRef, ok := validateMediaParts(parsed)
if ok {
return false
}
shown := badRef
if len(shown) > 200 {
shown = shown[:200] + "…"
}
stream, _ := parsed["stream"].(bool)
s.recordRejection(rejectionInfo{
r: r,
stage: "validation",
reasonCode: "bad_param",
httpStatus: http.StatusBadRequest,
keyID: keyIDFromContext(r.Context()),
consumerKeyHash: store.HashKey(consumerKeyFromContext(r.Context())),
requestedModel: publicModel,
resolvedModel: model,
stream: stream,
requiresVision: true,
hasTools: hasTools,
params: rejectionSamplingParams(parsed),
})
s.ddIncr("inference.media_remote_url_rejected", []string{"model:" + model})
writeJSON(w, http.StatusBadRequest, errorResponse("invalid_request_error",
"image/video input must be an inline base64 data: URI (e.g. \"data:image/jpeg;base64,…\"); "+
"remote http(s):// and file:// media URLs are not supported on this end-to-end-encrypted endpoint. Got: "+shown,
withParam("messages")))
return true
}

// requestHasTools reports whether the request carries a non-empty top-level
// "tools" array (Chat Completions and Responses API share the field name).
// Drives Traits.HasTools so tool-bearing requests only route to providers whose
// binaries survive tool-schema template rendering (version floor + per-model
// template_render_ok gate in the scheduler).
func requestHasTools(parsed map[string]any) bool {
tools, ok := parsed["tools"].([]any)
return ok && len(tools) > 0
}

func requestCacheAffinityKey(parsed map[string]any) string {
raw, ok := parsed["prompt_cache_key"].(string)
if !ok || raw == "" {
return ""
}
const maxPromptCacheKeyBytes = 512
if len(raw) > maxPromptCacheKeyBytes {
return ""
}
sum := sha256.Sum256([]byte(raw))
return fmt.Sprintf("%x", sum[:])
}

func estimateRequestedMaxTokens(parsed map[string]any) int {
for _, key := range []string{"max_tokens", "max_completion_tokens", "max_output_tokens"} {
if n, ok := intFromRequestValue(parsed[key]); ok && n > 0 {
if copies, ok := intFromRequestValue(parsed["n"]); ok && copies > 1 {
return n * copies
}
return n
}
}
if copies, ok := intFromRequestValue(parsed["n"]); ok && copies > 1 {
return 256 * copies
}
return 256
}

func parseProviderSerialAllowlist(parsed map[string]any) ([]string, bool, error) {
var rawValues []any
provided := false
for _, key := range []string{"provider_serial", "provider_serials"} {
v, ok := parsed[key]
if !ok {
continue
}
provided = true
switch x := v.(type) {
case string:
rawValues = append(rawValues, x)
case []any:
rawValues = append(rawValues, x...)
default:
return nil, true, fmt.Errorf("%s must be a string or array of strings", key)
}
}
if !provided {
return nil, false, nil
}

seen := make(map[string]struct{}, len(rawValues))
ids := make([]string, 0, len(rawValues))
for _, raw := range rawValues {
id, ok := raw.(string)
if !ok {
return nil, true, fmt.Errorf("provider_serials must contain only strings")
}
id = strings.TrimSpace(id)
if id == "" {
continue
}
if _, exists := seen[id]; exists {
continue
}
seen[id] = struct{}{}
ids = append(ids, id)
}
if len(ids) == 0 {
return nil, true, fmt.Errorf("provider_serials must include at least one provider serial")
}
return ids, true, nil
}

func stripProviderRoutingFields(parsed map[string]any) bool {
changed := false
for _, key := range []string{"provider_serial", "provider_serials"} {
if _, ok := parsed[key]; ok {
delete(parsed, key)
changed = true
}
}
return changed
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 [INFO] 🧩 Large 561-line file mixing pure functions with Server-dependent methods

💡 Suggestion: Split into pure utility functions (token counting, media detection) and Server-dependent methods (rejection handling with telemetry)

📊 Score: 2×3 = 6 · Category: misplaced responsibility

Comment on lines +1 to +46
import Foundation
#if canImport(os)
import os
#endif

/// Unified logger that uses os.Logger on macOS. Internal access so the
/// `ProviderLoop` companion extension files (e.g. `+SSEParser.swift`) can re-use
/// it for their file-scope loggers — `parseStreamChunk` is a `static` method and
/// can't reach the per-instance logger on the actor.
struct ProviderLogger: Sendable {
#if canImport(os)
private let osLogger: os.Logger
#endif
private let category: String

init(subsystem: String, category: String) {
self.category = category
#if canImport(os)
self.osLogger = os.Logger(subsystem: subsystem, category: category)
#endif
}

func info(_ message: String) {
#if canImport(os)
osLogger.info("\(message, privacy: .public)")
#else
print("[\(category)] INFO: \(message)")
#endif
}

func warning(_ message: String) {
#if canImport(os)
osLogger.warning("\(message, privacy: .public)")
#else
print("[\(category)] WARN: \(message)")
#endif
}

func error(_ message: String) {
#if canImport(os)
osLogger.error("\(message, privacy: .public)")
#else
print("[\(category)] ERROR: \(message)")
#endif
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 [INFO] 🧩 Custom logger wrapper around os.Logger with conditional compilation

💡 Suggestion: Consider using Swift's built-in Logger from the Logging library or a more established logging framework instead of a custom wrapper

📊 Score: 2×3 = 6 · Category: reinventing stdlib

Gajesh2007 added a commit that referenced this pull request Jun 22, 2026
… chore/tech-debt-godfiles

Reconciles the stacked god-file branch against the advanced base, keeping
BOTH this branch's god-file split AND the base's PR-#444 review fixes:

- e2e: route via ReserveProviderEx through a local findRoutableProvider
  helper after Registry.FindProvider removal (1b69036)
- provider: split KV-quant benchmark out of the shipped darkbloom into a
  new ProviderBenchmarkKVQuant target; kv-quant-gate/kv-attn-selftest and
  the KVQuant tests re-pointed to it (9cc1a60)
- coordinator: tighten maybeFallbackAlias branching, behavior-preserving (0497c00)

Auto-merged with no conflicts: the consumer.go god-file split and the
base's maybeFallbackAlias edit touch disjoint regions. Verified green:
repo-root go build/vet ./..., coordinator go test ./..., swift build (all
targets), swift test (1059 tests, 74 suites). Shipped darkbloom links 0
ProviderBenchmarkKVQuant symbols; kv-quant-gate links them as expected.

Co-authored-by: Cursor <cursoragent@cursor.com>
@Gajesh2007 Gajesh2007 merged commit 2ffc858 into master Jun 22, 2026
39 of 41 checks passed
@Gajesh2007 Gajesh2007 deleted the chore/tech-debt-modularize branch June 22, 2026 06:23
Gajesh2007 added a commit that referenced this pull request Jun 22, 2026
…lready contains all #444 content; resolve overlaps to ours
Gajesh2007 added a commit that referenced this pull request Jun 22, 2026
… (stacked on #444) (#445)

* refactor(provider): delete dead single-request inference/SSE/formatter stack

The production inference path is MultiModelBatchSchedulerEngine+Translation
+ BatchScheduler. The single-request driver, its SSE formatter, and the
chat-prompt formatter were only referenced by tests. Remove them and the
types they orphaned:

- delete SingleRequestInference.swift, SSEChunkFormatting.swift,
  ChatPromptFormatting.swift
- drop orphaned InferenceUsage + UsageAccumulator (UsageAccounting.swift),
  keeping the live StreamedGenerationUsage / CancelledGenerationTerminal
- drop orphaned SSEChunk + streaming ChatCompletionChunk/ChunkChoice/
  ChunkDelta (ChatRequest.swift); keep ChunkUsage + ChatCompletionResponse
  (still used by the live response path / InferenceLiveTests)
- trim InferenceTests.swift to the live cancellation-registry test
- fix the now-dangling ChatPromptFormatter mention in the translate() doc

Behavior-preserving: swift build green; swift test 1049 pass / 0 fail
(baseline 1054 minus the 5 removed dead tests).

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(coordinator): delete dead parallel routing implementation

Remove the score-based FindProvider / FindProviderWithTrust / ScoreProvider
cluster (plus the now-unused TrustMultiplier and the MaxConcurrentRequests
alias) from the registry. Production dispatch routes through the cost-based
ReserveProviderEx -> selectBestCandidate path; these functions were a second
routing brain reachable only from tests, giving false confidence by testing
behavior that diverged from production (6m vs 16m challenge freshness window,
score-vs-cost selection, crashed-provider routing).

Tests that exercised only the dead path (scoring math, score-based selection,
dead-specific challenge boundaries, crashed-only routing) are deleted. Tests
that used FindProvider as a routability probe for gates SHARED with production
(privacy caps, manifest/SIP checks, trust, catalog, eviction, challenge
freshness, concurrency cap) are repointed to a findRoutableProvider helper that
drives the real ReserveProviderEx path; stale challenge times were bumped past
the 16m production freshness window to preserve staleness coverage.

No production behavior change. build/vet/test/gofmt all green.

Co-authored-by: Cursor <cursoragent@cursor.com>

* chore(coordinator): untrack built binary and drop dead R2 site-packages config

- git rm --cached coordinator/coordinator: the 40MB built binary was tracked
  despite already being in .gitignore. Stop committing the artifact.
- Remove Server.r2SitePackagesCDNURL plus its ServerConfig field and env wiring
  (EIGENINFERENCE_R2_SITE_PACKAGES_CDN_URL). It was written from env but never
  read: install.sh stopped substituting __DARKBLOOM_R2_SITE_PACKAGES_CDN_URL__
  post-Swift-cutover (v0.5.0+), so the field was pure dead config.

Co-authored-by: Cursor <cursoragent@cursor.com>

* chore(coordinator): stop tracking the built coordinator binary

The 40MB coordinator/coordinator artifact was tracked despite already being
listed in .gitignore. Untrack it so builds don't keep committing the binary.

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(coordinator): collapse dead config->env re-export shim

config/config.go only re-exported env helpers (EnvOr/FirstNonEmpty/EnvFloat/
EnvInt). FirstNonEmpty/EnvFloat/EnvInt had zero callers, and EnvOr was used only
by app_config.go in the same package. Point app_config.go at env.EnvOr directly,
move the package doc onto app_config.go, and delete the shim so there is a single
env-helper source of truth.

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(coordinator): merge alias capacity/TTFT fallbacks into one helper

maybeFallbackAliasCapacity and maybeFallbackAliasTTFT were near-identical
copies differing only in the TTFT ceiling check and the failure-path return
model. Merge into maybeFallbackAlias(parsed, mode, ...) with an aliasFallbackMode
(capacity vs ttft) flag; capacity callers pass ttftThreshold=0. Failure-path
return semantics are preserved exactly (capacity reports currentModel, TTFT
reports the probed previous build). No behavior change.

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(provider): extract Benchmark into its own ProviderBenchmark target

Benchmark/ (~5.2k LOC, incl. fatalError protocol-conformance stub caches)
was compiled into the ProviderCore library and thus linked into every
provider/enclave binary. Move it into a dedicated `ProviderBenchmark`
SwiftPM target that depends on ProviderCore, so only the benchmark-bearing
executables link it (darkbloom `benchmark`, kv-quant-gate, kv-attn-selftest,
kv-engine-demo) and the shipped enclave/publish/core surface no longer
carries benchmark code.

- new target `ProviderBenchmark` (Sources/ProviderBenchmark), 18 files moved
- the two engine-facing types the live scheduler needs stay in ProviderCore:
  KVQuantCandidateMode (split out of KVQuantTypes.swift) and the
  self-contained KVQuantPolicy.swift, now under ProviderCore/Inference/KVQuant
- widen KVEstimation / parseModelArchitecture / resolvedKVBytesPerToken /
  ModelArchitecture to public (already used by the live scheduler) so the
  benchmark target can reach them across the module boundary
- wire ProviderBenchmark into the four executables + the four benchmark
  test files; add `import ProviderCore` to the moved files

Behavior-preserving: swift build green; swift test 1049 pass / 0 fail.
Co-authored-by: Cursor <cursoragent@cursor.com>

* chore(coordinator): drop dead vllm-mlx full-stack test and fix stale comments

- Delete api/fullstack_integration_test.go (~1155 LOC): it spawned real vllm-mlx
  subprocesses (the retired subprocess backend), was gated behind
  LIVE_FULLSTACK_TEST=1 (never run in CI), and the repo-root e2e/ harness covers
  the current MLX-Swift provider over the WebSocket relay.
- Refresh stale vllm-mlx comments to MLX-Swift / E2E-relay reality in consumer.go,
  protocol/messages.go, and registry.go (comments only; no wire-type changes).
- Fix install.sh path comment (internal/api -> api) in server.go.
- Reword the telemetry symmetry test comment so it states the Go test pins the
  shape and the Swift/TS mirrors must match, without asserting mirror tests this
  package cannot see.

Co-authored-by: Cursor <cursoragent@cursor.com>

* test(provider): add telemetry wire-symmetry tests mirroring the Go canonical

coordinator/protocol/telemetry_symmetry_test.go references a Swift mirror that
did not exist. Add Tests/ProviderCoreTests/TelemetrySymmetryTests.swift to pin:
- enum casing (source=provider, severity=error, kind=backend_crash)
- snake_case keys + nil-optional omission (machine_id/account_id/request_id/
  stack/fields), matching Go omitempty
- the TelemetryKind set + count (mirror of Go KnownKinds())
- TelemetrySource/TelemetrySeverity raw values

Add CaseIterable to TelemetryKind (the Swift equivalent of KnownKinds()) and
fix two stale `coordinator/internal/protocol|api/...` comment paths to the
current `coordinator/protocol|api/...` locations.

swift build green; swift test 1052 pass / 0 fail (+3 new).

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(coordinator): stop silently swallowing billing/credit and WS-send errors

Replace `_ = ...` discards with explicit error handling + a DogStatsD counter:

- Stripe webhook: CompleteBillingSession / Referral.Apply failures now log an
  error and increment billing.{session_complete,referral_apply}_failed instead
  of vanishing after a successful deposit.
- Settlement: failed consumer refund and platform-fee Credit calls now log an
  error and increment billing.credit_failed{op} — these are financial and must
  never be silent (a dropped refund over-charges the consumer).
- Provider WS: best-effort EnqueueText sends (runtime/trust status) now log at
  debug and increment provider.enqueue_failed{msg} so a wedged send is visible.

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(provider): centralize Harmony assistant-message stripping

The assistant-message Harmony wrapper (role guard + content/thinking/
reasoning_content field iteration) was duplicated in JinjaSanitization and
TemplateRenderCheck. Centralize it in ProviderCoreFoundation alongside the
existing string transform:

- add `harmonyAssistantTextFields` + `stripHarmonyFramingFromAssistantMessage`
  in HarmonySanitization.swift (Linux-buildable, no Apple deps)
- TemplateRenderCheck now calls the shared helper (drops its private copy)
- JinjaSanitization iterates the shared field-list constant
- BatchScheduler already routes through the shared `stripHarmonyChannelFraming`

Behavior-preserving: swift build green; swift test 1052 pass / 0 fail.
Co-authored-by: Cursor <cursoragent@cursor.com>

* test(coordinator): cover auth/env/telemetry; doc pendingModelLoads routing scope

- auth: first unit tests for the previously-untested package — PEM/key parsing
  (NewPrivyAuth), ES256 JWT verification incl. expiry/issuer/audience/wrong-key/
  alg-confusion rejection (VerifyToken), user provisioning (GetOrCreateUser), and
  Config.Check. Uses a real in-memory store and real generated keys; no mocks of
  the unit under test.
- env: 100% coverage of EnvOr/FirstNonEmpty/EnvFloat/EnvInt/EnvBool.
- telemetry: Emitter default-fill, metric counting (capturing fake sink),
  nil-emitter/nil-sink safety, and version defaulting.
- registry: document that pendingModelLoads is consulted ONLY by warm-pool/swap
  planning and never participates in routing/admission (see AGENTS.md state model).

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(provider): consolidate formatDuration into DurationFormatting

Four near-identical duration formatters had diverged in user-visible ways
(download progress shows seconds; idle/status logs use compact "2h30m";
the scheduler uses spaced "2h 30m"; trailing zero-minute elision differed).
Add one `DurationFormatting.compact` helper whose flags reproduce each style
exactly, and delegate the four call-site helpers to it. Output is unchanged
at every call site; the logic now lives in one place.

Behavior-preserving: swift build green; swift test 1052 pass / 0 fail.
Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(coordinator): extract request introspection helpers from consumer.go

Move the pure request-introspection / token-estimation cluster out of the 5.7k-LOC
consumer.go into a focused request_introspection.go (token + billing cost
estimation, media/tool detection, cache-affinity keys, provider-serial allowlist
parsing, and the media-cost constants). Same package, no behavior change — these
functions have no Server dependency. consumer.go drops from ~5700 to ~5311 LOC.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(provider): surface CoordinatorClient encode failures instead of silent {}

Heartbeat / outbound / inference-error encoding used `try?` and degraded to
hardcoded `{}` or minimal JSON on failure — silent protocol corruption the
coordinator can't attribute. Replace with do/catch that logs at error and
emits a `protocol_error` telemetry event, and make the inference-error path
fall back to a parseable, correctly-typed payload (carrying the real
request_id/error/status_code) rather than `{}`.

Behavior-preserving on the success path: swift build green; swift test
1052 pass / 0 fail.

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(provider): replace force casts with guards; modernize a GCD timeout

- PersistentEnclaveKey: guard the Keychain `result` optional and throw a typed
  keyLookupFailed instead of force-unwrapping nil (the CFTypeRef->SecKey cast
  is compiler-guaranteed).
- ProtocolSafeQuantizedKVCache.copy(): assert the inner-copy type via guard +
  preconditionFailure with a clear message rather than a blind `as!`.
- ProviderLoop.waitForPreloads: replace `DispatchQueue.global().asyncAfter`
  with a structured `Task.sleep(for:)` timeout (OneShotBoolContinuation still
  dedupes, so the first-resume-wins race is unchanged).

Behavior-preserving: swift build green; swift test 1052 pass / 0 fail.
Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(coordinator): extract API key handlers from consumer.go

Move the consumer-facing API key management endpoints and their
validation/response helpers (create/list/get/update/rotate/delete,
apiKeyToResponse, validateKeyLimitInputs, keyModelAllowed, checkKeySpendCap,
applyKeyPatch) into apikey_handlers.go, matching the existing
apikey_handlers_test.go. Same package, no behavior change. consumer.go drops
from ~5311 to ~4884 LOC.

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(provider): extract ProviderLoopConfig and ProviderLogger from ProviderLoop

First, low-risk step of breaking up the ProviderLoop god file: move the two
standalone top-level value types out of the 3.6k-line ProviderLoop.swift into
their own single-responsibility files (no actor internals touched, no access
loosened). The loop file now holds the actor + its behavior, not its inputs
or its logger type.

Note: the actor's instance-method sections are intentionally NOT split across
files here — ProviderLoop deliberately keeps method extensions in-file to reach
its `private` model registry without loosening ~40 members to `internal` (see
the documented rationale at the local-endpoint extension). That deeper split is
deferred pending an explicit encapsulation decision.

Behavior-preserving: swift build green; swift test 1052 pass / 0 fail.
Co-authored-by: Cursor <cursoragent@cursor.com>

* test(provider): cover the darkbloom-publish hash CLI entrypoint

darkbloom-publish (the security-critical model-manifest hasher used by the
registry publish flow) had zero tests on its CLI surface. Add a
DarkbloomPublishTests target exercising the `hash` subcommand end-to-end
against a temp snapshot dir:
- writes a manifest.json that decodes back to the expected id/version/files/
  64-hex aggregate
- hashing is deterministic across runs on identical bytes
- rejects unsafe model ids and version tags (spaces, '..', '/', empty) before
  any hashing happens

(The ManifestBuilder library itself remains covered by ProviderCoreFoundationTests.)

swift build green; swift test 1052 swift-testing + 4 new XCTest pass / 0 fail.

Co-authored-by: Cursor <cursoragent@cursor.com>

* chore(provider): remove unused dev-only harness targets

vlm-smoke (explicitly "Safe to delete") and kv-engine-demo (DAR-318 capacity
demo, "NOT a product") are dev-only executables referenced nowhere outside
Package.swift — no CI, scripts, Makefile, or docs build them. Delete both
(~1040 LOC) to shrink the default build surface.

Kept kv-se-harness: it is the only way to exercise the Secure-Enclave KEK +
Keychain round-trip on real SE hardware (a path `swift test` cannot reach), so
it retains test value despite not being a product.

Behavior-preserving: swift build green; swift test 1052 pass / 0 fail.
Co-authored-by: Cursor <cursoragent@cursor.com>

* test(coordinator): behavior-lock the five routing-eligibility gate fns

Characterization suite pinning the exact current decisions of
providerPassesRoutingGatesLockedEx, providerCanRouteBuildLocked,
providerHasWarmModelLocked, publiclyRoutableLocked,
warmPoolCandidateReasonLocked (+ modelLoadCandidatePendingLocked) across a
matrix of provider states, including the intentional differences (owner
self-route relaxation, breaker-ignoring preflight bypass, warm-only loaded
check, model-agnostic public check, granular warm-pool reason labels).

Pre-refactor safety net for the routing-gate pipeline unification.

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(provider): split ProviderLoop god-file by concern (3577->525 LOC)

Split the 3,577-LOC ProviderLoop actor into focused same-module extension
files mirroring the existing ProviderLoop+SSEParser/+ErrorMapping pattern:
Serve, InferenceHandler, Preload, Prefetch, Testing, Trust, MemoryProtection,
IdleTimeout, Capacity, AutoUpdate, ModelLoading, Cancellation,
AttestationChallenge, LocalEndpoint.

Method bodies are byte-identical; only access control changed. Stored state
and methods reached across the split were widened private->internal (Swift
private is file-scoped); purely-local members stay private. Behavior unchanged.

Build green; swift test 1052/73 pass (unchanged from baseline).

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(coordinator): unify the routing-eligibility gate pipeline

Five functions re-implemented overlapping subsets of the provider
eligibility gates. Extract the two exactly-shared sub-pipelines into
registry/routing_eligibility.go:

  - providerLivenessGateLocked: status/private-only/trust/runtime/
    private-text/challenge-freshness, with minTrust+allowPrivate relaxation
  - providerServesRoutableModelLocked: catalog membership + dedicated-box rule

Repoint providerPassesRoutingGatesLockedEx, providerCanRouteBuildLocked,
providerHasWarmModelLocked, publiclyRoutableLocked, and
modelLoadCandidatePendingLocked onto them — exact semantics preserved
(verified by the characterization suite). warmPoolCandidateReasonLocked is
kept separate (documented) because it emits granular per-gate reason labels
that a boolean helper cannot preserve, and interleaves warm-pool-specific
gates whose order determines the reported reason.

Behavior-preserving: routing-gate characterization suite + full coordinator
go test ./... stay green.

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(provider): split BatchScheduler god-file by concern (2503->538 LOC)

Split the BatchScheduler actor core into focused same-module extension files
alongside the existing +EngineBridge/+KVEstimation/+Telemetry/+Liveness/
+KVQuantScheme: ModelLifecycle, EngineFactory, CheckpointRestore, Admission,
Submit, PrefixCacheSizing, Testing.

Method bodies are byte-identical; only access control changed. 13 members
reached across the split were widened private->internal (incl. resolving a
checkpointLayerSignatures static-vs-instance name shadow); purely-local
members stay private. Behavior unchanged.

Build green; swift test 1052/73 pass (unchanged from baseline).

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(coordinator): extract shared inference admission preflight

handleChatCompletions and handleGenericInference carried byte-identical
copies (~290 lines each) of the self-route / prefer / public
capacity-and-TTFT preflight: QuickCapacityCheck -> alias-capacity fallback
-> unservable shed -> model-too-large -> capacity 429 / queue-spill ->
no-eligible-provider shed -> TTFT gate, with identical rejections and
routing.decisions metrics.

Extract runInferenceAdmission (api/inference_admission.go). The only
divergence (verified by diffing the two blocks) is the forward-body refresh
after an alias fallback rewrites parsed["model"]: chat re-marshals its
threaded rawBody (re-lowering Responses input->chat, which can 400), generic
rebuilds from parsed at dispatch. That single difference is threaded as the
onModelFallback callback (nil for generic).

Behavior-preserving: full api go test ./api/... (incl. failover, shed,
servability, dedicated, cold-dispatch, route-outcome suites) stays green.
consumer.go 4884 -> 4407 LOC.

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(provider): split ModelCatalog god-file by concern (1202->171 LOC)

Split ModelCatalog.swift into focused files: catalog data types stay in
ModelCatalog.swift; ModelCatalogClient (coordinator catalog HTTP client),
ModelDownloader core, and the ModelPrefetcher abstraction move to their own
files. ModelDownloader's interleaved methods split into +Download (manifest/
legacy orchestration), +Prefetch (resume-aware prefetch), and +HTTP (file
fetch/stream/hash/publish) extensions.

Bodies byte-identical; ModelDownloader's 4 stored props + cross-file private
methods and CatalogResponse widened private->internal. Behavior unchanged.

Build green; swift test 1052/73 pass (unchanged from baseline).

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(coordinator): extract shared inference balance reservation

handleChatCompletions and handleGenericInference held identical pre-flight
balance reservation + per-key spend-cap blocks (reservationCost ->
checkKeySpendCap -> reserveInitialBalance with the same insufficient_quota /
insufficient_funds / DB-error rejections). Extract reserveInferenceBalance
(api/inference_admission.go), returning (reservedMicroUSD, serviceReservation,
handled). Behavior-preserving: full api go test ./api/... stays green.

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(provider): split StartCommand god-file by concern (1133->121 LOC)

Split the Start command struct into same-target extension files: core keeps
the command declaration, options, and run() orchestrator; serving modes
(standalone/foreground/scheduled), preflight + inline login, launchd daemon
install, interactive catalog picker, and the raw-mode TUI picker move to
StartCommand+Modes/+Preflight/+Daemon/+Picker/+TUIPicker.

Bodies byte-identical; 7 private methods reached across the split widened to
internal. Behavior unchanged.

Build green; swift test 1052/73 pass (unchanged from baseline).

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(coordinator): segregate store.Store into domain sub-interfaces

The ~150-method Store god-interface is split into 12 cohesive domain
sub-interfaces (APIKeyStore, UsageStore, TelemetryStore, LedgerStore,
BillingStore, ModelRegistryStore, ReleaseStore, UserStore, DeviceAuthStore,
InviteStore, ProviderEarningsStore, ProviderStore) in store/interface_domains.go;
Store now embeds all 12. Pure interface refactor: every method keeps its exact
signature, both MemoryStore and PostgresStore are unchanged and still satisfy
the composed Store (compile-time assertions), and a reflect-based method-set
dump confirms the flattened Store exposes the identical 150 methods before and
after. store go test ./store/... stays green (Postgres tests run only when
DATABASE_URL is set).

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(provider): split CoordinatorClient god-file by concern (1173->213 LOC)

Extract self-contained top-level types into their own files (CoordinatorClient
Types: events/config/messages/errors; State: atomic stats + provider state +
concurrency primitives; OutboundRouter; ReachabilityMonitor) and split the
actor into +Connection (reconnect/session/receive + telemetry), +Inbound
(frame->event dispatch), +Registration (register + heartbeat), and +Outbound
(wire encoding).

Bodies byte-identical; 21 members reached across the split widened
private->internal. The file-private `Logger` typealias was renamed to a unique
`CoordinatorWSLogger` (kept internal) so widening the actor's logger property
doesn't shadow `Logging.Logger`/`os.Logger` elsewhere in the module.

Build green; swift test 1052/73 pass (unchanged from baseline).

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(coordinator): split api/consumer.go into focused files

Same-package modularization of the consumer god-file (no behavior change):
  - httputil.go: writeJSON + the OpenAI-compatible errorResponse envelope
  - responses_translate.go: Responses-API request lowering (input/tools/
    tool_choice/text.format -> chat-completions shape)
  - models_endpoints.go: GET /v1/models and /v1/models/{id} + alias listing

consumer.go 4884 -> 3794 LOC. Full api go test ./api/... stays green.

Co-authored-by: Cursor <cursoragent@cursor.com>

* test(provider): cover doctor command pure logic (CLI test seams)

Add DoctorChecksTests for buildDoctorChecks (snapshot-driven hardware/config/
model-count checks + the deterministic check backbone), describeMDMEnrollment,
and CheckStatus markers. Environment-independent: only asserts snapshot-driven
behavior, so host Metal/SIP/codesign state can't flake the suite.

swift test 1059/74 pass (baseline 1052/73 + 7 new; no production changes).

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(coordinator): extract APNs code-attest from api/provider.go

Move the v0.6.0 APNs code-identity attestation flow (codeAttestMetric,
codeAttestLoop, tryCrossVersionReuse, maybeRearmCodeAttest,
sendCodeIdentityChallenge, handleCodeAttestationResponse) into
api/provider_codeattest.go. Same-package move, no behavior change;
provider.go 3563 -> 3150 LOC. Full api go test ./api/... stays green.

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(coordinator): extract provider persistence from registry.go

Move the registry<->store persistence bridge (SetStore, LoadStoredProviders,
RestoreProviderState, providerRecordStats, PersistProvider[Throttled],
persistReputation[Throttled], persistProviderNow) into registry/persistence.go.
Same-package move, no behavior change; registry.go 4693 -> 4365 LOC.
registry go test ./registry/... stays green.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(e2e): route via ReserveProviderEx after FindProvider removal

Round-1 cleanup deleted registry.FindProvider but only re-ran go test
inside coordinator/, missing the e2e package. The lone surviving caller
in TestIntegration_SwiftProviderRealRoutingGates broke the e2e build
(Blacksmith e2e/Build).

Mirror the in-coordinator migration: add a local findRoutableProvider
helper that probes routability through the production ReserveProviderEx
path (same structural/privacy/trust/challenge/capacity gates), reserves
then releases capacity, and returns the selected provider. No deleted
methods or shims reintroduced.

Verified: go build ./... and go vet ./... (compiles e2e/) green from
repo root.

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(provider): split KV-quant benchmark out of shipped darkbloom

scripts/install.sh ships the `darkbloom` binary, but the `darkbloom`
target depended on ProviderBenchmark, which carried the KV-quant
gate/eval code AND the fatalError protocol-conformance KV-cache stubs
(ProtocolSafeQuantizedKVCache, the V-only/BFloat16 caches). Those
benchmark-only stubs were linking into every installed provider,
defeating round-1's goal.

Split ProviderBenchmark into two targets along the existing clean
boundary (verified: the lightweight runners and the KVQuant/ dir do not
reference each other):

  - ProviderBenchmark (lean): ModelBenchmark, ThroughputSweep(+report),
    DecodeBandwidthModel — the only benchmark code `darkbloom benchmark`
    needs. Still linked by `darkbloom`, so the command keeps working.
  - ProviderBenchmarkKVQuant (heavy, has the fatalError stubs): the
    KVQuant/ dir, linked ONLY by kv-quant-gate, kv-attn-selftest, and
    their tests — never by `darkbloom`.

kv-quant-gate / kv-attn-selftest / the 3 KVQuant test files now import
ProviderBenchmarkKVQuant; the darkbloom benchmark command and
ThroughputSweepTests keep importing the lean ProviderBenchmark.

Bundle-safe: release builds by product (`swift build -c release
--product darkbloom`); product names, install paths, and
LatestProviderVersion are unchanged.

Verified: `swift build --build-tests` green (all targets + tests).
nm proof: darkbloom links 0 ProviderBenchmarkKVQuant symbols (939 lean
ProviderBenchmark symbols intact); kv-quant-gate links 3694.

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(coordinator): tighten maybeFallbackAlias branching

Behavior-preserving cleanup of the alias capacity/TTFT fallback helper.
Both fallback modes already share a SINGLE Previous-build capacity probe
(QuickCapacityCheckWithTTFTForRequest) — the desired build is probed once
at the call site and the previous build once here, on different models, so
there is no redundant capacity check to remove. This reduces the residual
branching instead:

  - merge the two identical modelShed / !IsModelInCatalog early-return
    guards (both pure, both return the same tuple) into one
  - hoist the duplicated `mode == aliasFallbackTTFT` test into a single
    `enforceTTFT` bool and drop the `tooSlow` temp

The failure-path model selection is unchanged (TTFT mode reports the
probed Previous build the caller uses as the alternate TTFT estimate;
capacity mode keeps the current build it discards), so all returns are
byte-identical to before.

Verified: coordinator/api alias fallback tests + full `go test ./...`
green.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(provider): widen split-file symbols to internal (ProviderLoopError, OSAllocatedUnfairLock) — fixes #445 clean-build access errors

The god-file split moved cross-file references that the warm incremental
.build cache never recompiled, hiding two access violations:

- ProviderLoopError (ProviderLoop.swift) is thrown from ProviderLoop+Serve.swift.
  The throw is inside `#if !DEBUG`, so `swift build` (debug) compiled it out and
  masked the error; a release build fails with "'ProviderLoopError' is inaccessible
  due to 'private' protection level". Widen private -> internal.

- OSAllocatedUnfairLock shim (CoordinatorClientState.swift) is instantiated in
  OutboundRouter.swift. While `private` it silently resolved to Apple's
  os.OSAllocatedUnfairLock (OutboundRouter imports os) rather than the project shim.
  Widen private -> internal so OutboundRouter uses the intended shim
  (matching PongTracker/ManagedAtomic).

Validated: `swift build -c release --product darkbloom` (whole-module, #if !DEBUG
active) reaches "Build complete"; `swift test` -> 1059/1059 pass.

Co-authored-by: Cursor <cursoragent@cursor.com>

---------

Co-authored-by: Cursor <cursoragent@cursor.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants