refactor: tech-debt cleanup + partial modularization (coordinator + provider)#444
Conversation
…r stack The production inference path is MultiModelBatchSchedulerEngine+Translation + BatchScheduler. The single-request driver, its SSE formatter, and the chat-prompt formatter were only referenced by tests. Remove them and the types they orphaned: - delete SingleRequestInference.swift, SSEChunkFormatting.swift, ChatPromptFormatting.swift - drop orphaned InferenceUsage + UsageAccumulator (UsageAccounting.swift), keeping the live StreamedGenerationUsage / CancelledGenerationTerminal - drop orphaned SSEChunk + streaming ChatCompletionChunk/ChunkChoice/ ChunkDelta (ChatRequest.swift); keep ChunkUsage + ChatCompletionResponse (still used by the live response path / InferenceLiveTests) - trim InferenceTests.swift to the live cancellation-registry test - fix the now-dangling ChatPromptFormatter mention in the translate() doc Behavior-preserving: swift build green; swift test 1049 pass / 0 fail (baseline 1054 minus the 5 removed dead tests). Co-authored-by: Cursor <cursoragent@cursor.com>
Remove the score-based FindProvider / FindProviderWithTrust / ScoreProvider cluster (plus the now-unused TrustMultiplier and the MaxConcurrentRequests alias) from the registry. Production dispatch routes through the cost-based ReserveProviderEx -> selectBestCandidate path; these functions were a second routing brain reachable only from tests, giving false confidence by testing behavior that diverged from production (6m vs 16m challenge freshness window, score-vs-cost selection, crashed-provider routing). Tests that exercised only the dead path (scoring math, score-based selection, dead-specific challenge boundaries, crashed-only routing) are deleted. Tests that used FindProvider as a routability probe for gates SHARED with production (privacy caps, manifest/SIP checks, trust, catalog, eviction, challenge freshness, concurrency cap) are repointed to a findRoutableProvider helper that drives the real ReserveProviderEx path; stale challenge times were bumped past the 16m production freshness window to preserve staleness coverage. No production behavior change. build/vet/test/gofmt all green. Co-authored-by: Cursor <cursoragent@cursor.com>
…es config - git rm --cached coordinator/coordinator: the 40MB built binary was tracked despite already being in .gitignore. Stop committing the artifact. - Remove Server.r2SitePackagesCDNURL plus its ServerConfig field and env wiring (EIGENINFERENCE_R2_SITE_PACKAGES_CDN_URL). It was written from env but never read: install.sh stopped substituting __DARKBLOOM_R2_SITE_PACKAGES_CDN_URL__ post-Swift-cutover (v0.5.0+), so the field was pure dead config. Co-authored-by: Cursor <cursoragent@cursor.com>
The 40MB coordinator/coordinator artifact was tracked despite already being listed in .gitignore. Untrack it so builds don't keep committing the binary. Co-authored-by: Cursor <cursoragent@cursor.com>
config/config.go only re-exported env helpers (EnvOr/FirstNonEmpty/EnvFloat/ EnvInt). FirstNonEmpty/EnvFloat/EnvInt had zero callers, and EnvOr was used only by app_config.go in the same package. Point app_config.go at env.EnvOr directly, move the package doc onto app_config.go, and delete the shim so there is a single env-helper source of truth. Co-authored-by: Cursor <cursoragent@cursor.com>
…elper maybeFallbackAliasCapacity and maybeFallbackAliasTTFT were near-identical copies differing only in the TTFT ceiling check and the failure-path return model. Merge into maybeFallbackAlias(parsed, mode, ...) with an aliasFallbackMode (capacity vs ttft) flag; capacity callers pass ttftThreshold=0. Failure-path return semantics are preserved exactly (capacity reports currentModel, TTFT reports the probed previous build). No behavior change. Co-authored-by: Cursor <cursoragent@cursor.com>
…target Benchmark/ (~5.2k LOC, incl. fatalError protocol-conformance stub caches) was compiled into the ProviderCore library and thus linked into every provider/enclave binary. Move it into a dedicated `ProviderBenchmark` SwiftPM target that depends on ProviderCore, so only the benchmark-bearing executables link it (darkbloom `benchmark`, kv-quant-gate, kv-attn-selftest, kv-engine-demo) and the shipped enclave/publish/core surface no longer carries benchmark code. - new target `ProviderBenchmark` (Sources/ProviderBenchmark), 18 files moved - the two engine-facing types the live scheduler needs stay in ProviderCore: KVQuantCandidateMode (split out of KVQuantTypes.swift) and the self-contained KVQuantPolicy.swift, now under ProviderCore/Inference/KVQuant - widen KVEstimation / parseModelArchitecture / resolvedKVBytesPerToken / ModelArchitecture to public (already used by the live scheduler) so the benchmark target can reach them across the module boundary - wire ProviderBenchmark into the four executables + the four benchmark test files; add `import ProviderCore` to the moved files Behavior-preserving: swift build green; swift test 1049 pass / 0 fail. Co-authored-by: Cursor <cursoragent@cursor.com>
…comments - Delete api/fullstack_integration_test.go (~1155 LOC): it spawned real vllm-mlx subprocesses (the retired subprocess backend), was gated behind LIVE_FULLSTACK_TEST=1 (never run in CI), and the repo-root e2e/ harness covers the current MLX-Swift provider over the WebSocket relay. - Refresh stale vllm-mlx comments to MLX-Swift / E2E-relay reality in consumer.go, protocol/messages.go, and registry.go (comments only; no wire-type changes). - Fix install.sh path comment (internal/api -> api) in server.go. - Reword the telemetry symmetry test comment so it states the Go test pins the shape and the Swift/TS mirrors must match, without asserting mirror tests this package cannot see. Co-authored-by: Cursor <cursoragent@cursor.com>
…nonical coordinator/protocol/telemetry_symmetry_test.go references a Swift mirror that did not exist. Add Tests/ProviderCoreTests/TelemetrySymmetryTests.swift to pin: - enum casing (source=provider, severity=error, kind=backend_crash) - snake_case keys + nil-optional omission (machine_id/account_id/request_id/ stack/fields), matching Go omitempty - the TelemetryKind set + count (mirror of Go KnownKinds()) - TelemetrySource/TelemetrySeverity raw values Add CaseIterable to TelemetryKind (the Swift equivalent of KnownKinds()) and fix two stale `coordinator/internal/protocol|api/...` comment paths to the current `coordinator/protocol|api/...` locations. swift build green; swift test 1052 pass / 0 fail (+3 new). Co-authored-by: Cursor <cursoragent@cursor.com>
… errors
Replace `_ = ...` discards with explicit error handling + a DogStatsD counter:
- Stripe webhook: CompleteBillingSession / Referral.Apply failures now log an
error and increment billing.{session_complete,referral_apply}_failed instead
of vanishing after a successful deposit.
- Settlement: failed consumer refund and platform-fee Credit calls now log an
error and increment billing.credit_failed{op} — these are financial and must
never be silent (a dropped refund over-charges the consumer).
- Provider WS: best-effort EnqueueText sends (runtime/trust status) now log at
debug and increment provider.enqueue_failed{msg} so a wedged send is visible.
Co-authored-by: Cursor <cursoragent@cursor.com>
The assistant-message Harmony wrapper (role guard + content/thinking/ reasoning_content field iteration) was duplicated in JinjaSanitization and TemplateRenderCheck. Centralize it in ProviderCoreFoundation alongside the existing string transform: - add `harmonyAssistantTextFields` + `stripHarmonyFramingFromAssistantMessage` in HarmonySanitization.swift (Linux-buildable, no Apple deps) - TemplateRenderCheck now calls the shared helper (drops its private copy) - JinjaSanitization iterates the shared field-list constant - BatchScheduler already routes through the shared `stripHarmonyChannelFraming` Behavior-preserving: swift build green; swift test 1052 pass / 0 fail. Co-authored-by: Cursor <cursoragent@cursor.com>
…uting scope - auth: first unit tests for the previously-untested package — PEM/key parsing (NewPrivyAuth), ES256 JWT verification incl. expiry/issuer/audience/wrong-key/ alg-confusion rejection (VerifyToken), user provisioning (GetOrCreateUser), and Config.Check. Uses a real in-memory store and real generated keys; no mocks of the unit under test. - env: 100% coverage of EnvOr/FirstNonEmpty/EnvFloat/EnvInt/EnvBool. - telemetry: Emitter default-fill, metric counting (capturing fake sink), nil-emitter/nil-sink safety, and version defaulting. - registry: document that pendingModelLoads is consulted ONLY by warm-pool/swap planning and never participates in routing/admission (see AGENTS.md state model). Co-authored-by: Cursor <cursoragent@cursor.com>
Four near-identical duration formatters had diverged in user-visible ways (download progress shows seconds; idle/status logs use compact "2h30m"; the scheduler uses spaced "2h 30m"; trailing zero-minute elision differed). Add one `DurationFormatting.compact` helper whose flags reproduce each style exactly, and delegate the four call-site helpers to it. Output is unchanged at every call site; the logic now lives in one place. Behavior-preserving: swift build green; swift test 1052 pass / 0 fail. Co-authored-by: Cursor <cursoragent@cursor.com>
…sumer.go Move the pure request-introspection / token-estimation cluster out of the 5.7k-LOC consumer.go into a focused request_introspection.go (token + billing cost estimation, media/tool detection, cache-affinity keys, provider-serial allowlist parsing, and the media-cost constants). Same package, no behavior change — these functions have no Server dependency. consumer.go drops from ~5700 to ~5311 LOC. Co-authored-by: Cursor <cursoragent@cursor.com>
…ilent {}
Heartbeat / outbound / inference-error encoding used `try?` and degraded to
hardcoded `{}` or minimal JSON on failure — silent protocol corruption the
coordinator can't attribute. Replace with do/catch that logs at error and
emits a `protocol_error` telemetry event, and make the inference-error path
fall back to a parseable, correctly-typed payload (carrying the real
request_id/error/status_code) rather than `{}`.
Behavior-preserving on the success path: swift build green; swift test
1052 pass / 0 fail.
Co-authored-by: Cursor <cursoragent@cursor.com>
…timeout - PersistentEnclaveKey: guard the Keychain `result` optional and throw a typed keyLookupFailed instead of force-unwrapping nil (the CFTypeRef->SecKey cast is compiler-guaranteed). - ProtocolSafeQuantizedKVCache.copy(): assert the inner-copy type via guard + preconditionFailure with a clear message rather than a blind `as!`. - ProviderLoop.waitForPreloads: replace `DispatchQueue.global().asyncAfter` with a structured `Task.sleep(for:)` timeout (OneShotBoolContinuation still dedupes, so the first-resume-wins race is unchanged). Behavior-preserving: swift build green; swift test 1052 pass / 0 fail. Co-authored-by: Cursor <cursoragent@cursor.com>
Move the consumer-facing API key management endpoints and their validation/response helpers (create/list/get/update/rotate/delete, apiKeyToResponse, validateKeyLimitInputs, keyModelAllowed, checkKeySpendCap, applyKeyPatch) into apikey_handlers.go, matching the existing apikey_handlers_test.go. Same package, no behavior change. consumer.go drops from ~5311 to ~4884 LOC. Co-authored-by: Cursor <cursoragent@cursor.com>
…m ProviderLoop First, low-risk step of breaking up the ProviderLoop god file: move the two standalone top-level value types out of the 3.6k-line ProviderLoop.swift into their own single-responsibility files (no actor internals touched, no access loosened). The loop file now holds the actor + its behavior, not its inputs or its logger type. Note: the actor's instance-method sections are intentionally NOT split across files here — ProviderLoop deliberately keeps method extensions in-file to reach its `private` model registry without loosening ~40 members to `internal` (see the documented rationale at the local-endpoint extension). That deeper split is deferred pending an explicit encapsulation decision. Behavior-preserving: swift build green; swift test 1052 pass / 0 fail. Co-authored-by: Cursor <cursoragent@cursor.com>
darkbloom-publish (the security-critical model-manifest hasher used by the registry publish flow) had zero tests on its CLI surface. Add a DarkbloomPublishTests target exercising the `hash` subcommand end-to-end against a temp snapshot dir: - writes a manifest.json that decodes back to the expected id/version/files/ 64-hex aggregate - hashing is deterministic across runs on identical bytes - rejects unsafe model ids and version tags (spaces, '..', '/', empty) before any hashing happens (The ManifestBuilder library itself remains covered by ProviderCoreFoundationTests.) swift build green; swift test 1052 swift-testing + 4 new XCTest pass / 0 fail. Co-authored-by: Cursor <cursoragent@cursor.com>
vlm-smoke (explicitly "Safe to delete") and kv-engine-demo (DAR-318 capacity demo, "NOT a product") are dev-only executables referenced nowhere outside Package.swift — no CI, scripts, Makefile, or docs build them. Delete both (~1040 LOC) to shrink the default build surface. Kept kv-se-harness: it is the only way to exercise the Secure-Enclave KEK + Keychain round-trip on real SE hardware (a path `swift test` cannot reach), so it retains test value despite not being a product. Behavior-preserving: swift build green; swift test 1052 pass / 0 fail. Co-authored-by: Cursor <cursoragent@cursor.com>
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
Resolve coordinator/api/consumer.go conflict: master (#443 inference reliability) inserted the new remote-media-URL rejection helpers into the request-introspection region that this branch had already relocated out of consumer.go. Keep the branch's modular structure and move master's five new funcs (isInlineDataURI, mediaPartURLString, validateMediaParts, visionRejectRemoteEnabled, and the *Server method rejectRemoteMediaURLs) into coordinator/api/request_introspection.go next to detectMediaRequirement/ isMediaPartType. The os import moves with them; the strconv import stays in consumer.go (used elsewhere). Behavior-preserving: master's stable-identity ejection breaker calls and both rejectRemoteMediaURLs call sites (handleChatCompletions, handleGenericInference) auto-merged unchanged and are retained. go build/vet/test all green. Co-authored-by: Cursor <cursoragent@cursor.com>
|
This PR is a large refactoring / dead-code removal pass with several correctness fixes; it does not introduce new attack surface and makes minor positive changes to billing error visibility and SE key resilience, but leaves all previously-open threat mitigations unchanged. Trust Boundaries Touched
Per-Threat AssessmentT-034 — Provider runs modified code while advertising a trusted identity (
|
ethenotethan
left a comment
There was a problem hiding this comment.
Automated Code Review — Layr-Labs/d-inference#
Verdict: REQUEST_CHANGES
Security — ⚠️ SKIPPED
Error: Expecting property name enclosed in double quotes: line 2 column 5 (char 6)
Performance — 3 finding(s) (1 blocking)
- 🔵 [INFO]
coordinator/api/billing_handlers.go:176-188— Synchronous billing operations in webhook handler could block request processing- Suggestion: Consider making CompleteBillingSession and Apply operations asynchronous or use a background worker queue to avoid blocking the webhook response
- 🟡 [MEDIUM]
coordinator/api/consumer.go:388-450— maybeFallbackAlias function duplicates capacity checking logic across two modes- Suggestion: Extract common capacity checking logic to avoid redundant QuickCapacityCheckWithTTFTForRequest calls and reduce code duplication
- 🔵 [INFO]
coordinator/api/apikey_handlers.go:285-320— Multiple synchronous database operations in handleUpdateAPIKey without batching- Suggestion: Consider batching the invalidateAllAPIKeyCache calls or making them asynchronous to reduce blocking time during key updates
Type_diligence — ✅ No issues found
Additive_complexity — 5 finding(s) (1 blocking)
- 🔵 [INFO]
coordinator/api/apikey_handlers.go:1-447— New 447-line file extracted from consumer.go may be premature separation- Suggestion: Consider if API key handlers truly need their own file or if they could remain in consumer.go with better organization
- 🔵 [INFO]
coordinator/api/request_introspection.go:1-561— New 561-line file for request introspection helpers may be over-modularized- Suggestion: Evaluate if these pure helper functions need their own file or could be organized differently within existing files
- 🟡 [MEDIUM]
coordinator/api/consumer.go:388-445— Complex aliasFallbackMode enum and maybeFallbackAlias function with multiple similar code paths- Suggestion: Consider simplifying the fallback logic or extracting common patterns to reduce the branching complexity
- 🔵 [INFO]
provider-swift/Sources/ProviderBenchmark/KVQuant/ProtocolSafeQuantizedKVCache.swift:101-108— Added preconditionFailure with detailed error message for type casting that should never fail- Suggestion: Consider if the guard+preconditionFailure is necessary or if the original force-cast with a comment would be clearer
- 🔵 [INFO]
coordinator/api/billing_handlers.go:175-191— Similar error handling pattern repeated for billing session completion and referral application- Suggestion: Consider extracting a helper function for the common 'log error + increment metric' pattern
8 finding(s) total, 2 blocking. Verdict: REQUEST_CHANGES.
🤖 Automated review by Centaur · DAR-186
| // Best-effort: the deposit is already credited above, but a failure here | ||
| // leaves the session marked incomplete (and replayable). Surface it. | ||
| if err := s.billing.Store().CompleteBillingSession(billingSessionID); err != nil { | ||
| s.logger.Error("stripe: failed to mark billing session complete", | ||
| "billing_session_id", billingSessionID, "error", err) | ||
| s.ddIncr("billing.session_complete_failed", nil) | ||
| } | ||
| } | ||
| if referralCode != "" { | ||
| _ = s.billing.Referral().Apply(consumerKey, referralCode) | ||
| // Best-effort: a failure here means the referrer is not credited for this | ||
| // deposit; never silently swallow it. | ||
| if err := s.billing.Referral().Apply(consumerKey, referralCode); err != nil { | ||
| s.logger.Error("stripe: failed to apply referral credit", "error", err) |
There was a problem hiding this comment.
🔵 [INFO] ⚡ Synchronous billing operations in webhook handler could block request processing
💡 Suggestion: Consider making CompleteBillingSession and Apply operations asynchronous or use a background worker queue to avoid blocking the webhook response
📊 Score: 2×3 = 6 · Category: blocking_io
| if err != nil { | ||
| writeJSON(w, http.StatusNotFound, errorResponse("not_found", "key not found")) | ||
| return | ||
| } | ||
|
|
||
| // Decode into a presence map so we can distinguish "field omitted" (leave | ||
| // unchanged) from "field set to null" (clear the limit). | ||
| var patch map[string]json.RawMessage | ||
| if err := json.NewDecoder(r.Body).Decode(&patch); err != nil { | ||
| writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", "invalid JSON body")) | ||
| return | ||
| } | ||
| if msg := applyKeyPatch(existing, patch); msg != "" { | ||
| writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", msg)) | ||
| return | ||
| } | ||
| if msg := validateKeyLimitInputs(existing.LimitReset, nil, existing.RPMLimit, existing.ITPMLimit, existing.OTPMLimit, existing.ExpiresAt); msg != "" { | ||
| writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", msg)) | ||
| return | ||
| } | ||
|
|
||
| // Bump the auth-cache generation before AND after the mutation so a | ||
| // concurrent request cannot keep authenticating with a stale (e.g. | ||
| // just-disabled) cached record. | ||
| s.invalidateAllAPIKeyCache() | ||
| updated, err := s.store.UpdateAPIKey(user.AccountID, id, *existing) | ||
| if err != nil { | ||
| writeJSON(w, http.StatusInternalServerError, errorResponse("server_error", "failed to update key")) | ||
| return | ||
| } | ||
| s.invalidateAllAPIKeyCache() | ||
| writeJSON(w, http.StatusOK, s.apiKeyToResponse(updated)) | ||
| } | ||
|
|
||
| // applyKeyPatch merges a presence-aware PATCH body into an existing key record. | ||
| // Returns a human-readable error string on invalid input (empty when ok). |
There was a problem hiding this comment.
🔵 [INFO] ⚡ Multiple synchronous database operations in handleUpdateAPIKey without batching
💡 Suggestion: Consider batching the invalidateAllAPIKeyCache calls or making them asynchronous to reduce blocking time during key updates
📊 Score: 2×3 = 6 · Category: blocking_io
| package api | ||
|
|
||
| // apikey_handlers.go holds the consumer-facing API key management endpoints | ||
| // (create/list/get/update/rotate/delete) plus their request validation and | ||
| // response-shaping helpers. Split out of consumer.go to keep key management | ||
| // separate from the inference request path. | ||
|
|
||
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "errors" | ||
| "fmt" | ||
| "io" | ||
| "net/http" | ||
| "strings" | ||
| "time" | ||
|
|
||
| "github.com/eigeninference/d-inference/coordinator/auth" | ||
| "github.com/eigeninference/d-inference/coordinator/store" | ||
|
|
||
| "github.com/eigeninference/d-inference/coordinator/api/types" | ||
| ) | ||
|
|
||
| // handleCreateKey handles POST /v1/auth/keys — creates a new consumer API key. | ||
| // Requires Privy authentication. The key is linked to the user's account so | ||
| // requests made with the key are billed to the same account. | ||
| func (s *Server) handleCreateKey(w http.ResponseWriter, r *http.Request) { | ||
| user := auth.UserFromContext(r.Context()) | ||
| if user == nil { | ||
| writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", | ||
| "API key creation requires a Privy account — authenticate with a Privy access token")) | ||
| return | ||
| } | ||
|
|
||
| key, err := s.store.CreateKeyForAccount(user.AccountID) | ||
| if err != nil { | ||
| writeJSON(w, http.StatusInternalServerError, errorResponse("server_error", "failed to create key")) | ||
| return | ||
| } | ||
| writeJSON(w, http.StatusOK, types.CreateKeyResponse{ | ||
| APIKey: key, | ||
| AccountID: user.AccountID, | ||
| }) | ||
| } | ||
|
|
||
| // handleRevokeKey handles DELETE /v1/auth/keys — revokes an API key. | ||
| // The caller must own the key (same account). Requires Privy auth so a | ||
| // compromised API key cannot revoke legitimate keys. | ||
| func (s *Server) handleRevokeKey(w http.ResponseWriter, r *http.Request) { | ||
| user := auth.UserFromContext(r.Context()) | ||
| if user == nil { | ||
| writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required")) | ||
| return | ||
| } | ||
|
|
||
| var body struct { | ||
| Key string `json:"key"` | ||
| } | ||
| if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Key == "" { | ||
| writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", "provide {\"key\": \"sk-db-...\"}")) | ||
| return | ||
| } | ||
|
|
||
| owner := s.store.GetKeyAccount(body.Key) | ||
| if owner != user.AccountID { | ||
| writeJSON(w, http.StatusForbidden, errorResponse("forbidden", "you can only revoke your own keys")) | ||
| return | ||
| } | ||
|
|
||
| if !s.store.RevokeKey(body.Key) { | ||
| writeJSON(w, http.StatusNotFound, errorResponse("not_found", "key not found or already revoked")) | ||
| return | ||
| } | ||
| s.invalidateAPIKeyCache(body.Key) | ||
|
|
||
| writeJSON(w, http.StatusOK, types.RevokeKeyResponse{Status: "revoked"}) | ||
| } | ||
|
|
||
| // apiKeyToResponse projects a stored key into its masked API representation, | ||
| // computing the current-window spend and remaining budget. | ||
| func (s *Server) apiKeyToResponse(k *store.APIKey) types.APIKeyResponse { | ||
| resp := types.APIKeyResponse{ | ||
| ID: k.ID, | ||
| Name: k.Name, | ||
| Label: k.Label, | ||
| Disabled: k.Disabled, | ||
| LimitReset: store.NormalizeResetWindow(k.LimitReset), | ||
| RPMLimit: k.RPMLimit, | ||
| ITPMLimit: k.ITPMLimit, | ||
| OTPMLimit: k.OTPMLimit, | ||
| AllowedModels: k.AllowedModels, | ||
| SelfRouteOnly: k.SelfRouteOnly, | ||
| ExpiresAt: k.ExpiresAt, | ||
| CreatedAt: k.CreatedAt, | ||
| LastUsedAt: k.LastUsedAt, | ||
| } | ||
| since := store.KeySpendWindowStart(resp.LimitReset, time.Now()) | ||
| spent := s.store.KeySpendSince(k.ID, since) | ||
| resp.UsageUSD = microToUSD(spent) | ||
| if k.LimitMicroUSD != nil { | ||
| limitUSD := microToUSD(*k.LimitMicroUSD) | ||
| resp.LimitUSD = &limitUSD | ||
| remaining := *k.LimitMicroUSD - spent | ||
| if remaining < 0 { | ||
| remaining = 0 | ||
| } | ||
| remUSD := microToUSD(remaining) | ||
| resp.RemainingUSD = &remUSD | ||
| } | ||
| return resp | ||
| } | ||
|
|
||
| // validateKeyLimitInputs sanity-checks user-supplied limit values. Returns a | ||
| // human-readable error string (empty when valid). | ||
| func validateKeyLimitInputs(reset string, limitUSD *float64, rpm, itpm, otpm *int64, expiresAt *time.Time) string { | ||
| switch reset { | ||
| case "", store.KeyResetNone, store.KeyResetDaily, store.KeyResetWeekly, store.KeyResetMonthly: | ||
| default: | ||
| return "limit_reset must be one of: none, daily, weekly, monthly" | ||
| } | ||
| if limitUSD != nil && *limitUSD < 0 { | ||
| return "limit_usd must be >= 0" | ||
| } | ||
| if rpm != nil && *rpm < 0 { | ||
| return "rpm_limit must be >= 0" | ||
| } | ||
| if itpm != nil && *itpm < 0 { | ||
| return "itpm_limit must be >= 0" | ||
| } | ||
| if otpm != nil && *otpm < 0 { | ||
| return "otpm_limit must be >= 0" | ||
| } | ||
| if expiresAt != nil && !expiresAt.IsZero() && expiresAt.Before(time.Now()) { | ||
| return "expires_at must be in the future" | ||
| } | ||
| return "" | ||
| } | ||
|
|
||
| // keyModelAllowed returns false when the calling key restricts models via an | ||
| // allow-list that does not include the requested model. Account-scoped/legacy | ||
| // keys (no key in context) and keys without an allow-list always pass. | ||
| func (s *Server) keyModelAllowed(ctx context.Context, model string) bool { | ||
| k := apiKeyFromContext(ctx) | ||
| if k == nil || len(k.AllowedModels) == 0 { | ||
| return true | ||
| } | ||
| for _, m := range k.AllowedModels { | ||
| if m == model { | ||
| return true | ||
| } | ||
| } | ||
| return false | ||
| } | ||
|
|
||
| // checkKeySpendCap reports whether charging additionalMicroUSD to the calling | ||
| // key would exceed its per-key spend cap in the current window. It returns | ||
| // (message, ok); ok=false means the request must be rejected with 402. The | ||
| // per-account balance ledger is still the hard atomic ceiling — this is the | ||
| // soft, per-key sub-cap, enforced against settled usage (so concurrent | ||
| // in-flight requests are eventually-consistent, never over the account balance). | ||
| func (s *Server) checkKeySpendCap(ctx context.Context, additionalMicroUSD int64) (string, bool) { | ||
| k := apiKeyFromContext(ctx) | ||
| if k == nil || k.ID == "" || k.LimitMicroUSD == nil { | ||
| return "", true | ||
| } | ||
| since := store.KeySpendWindowStart(k.LimitReset, time.Now()) | ||
| spent := s.store.KeySpendSince(k.ID, since) | ||
| if spent+additionalMicroUSD > *k.LimitMicroUSD { | ||
| window := store.NormalizeResetWindow(k.LimitReset) | ||
| if window == store.KeyResetNone { | ||
| window = "total" | ||
| } | ||
| return fmt.Sprintf("API key spend limit reached (%s cap $%.2f, used $%.2f) — raise this key's limit or use another key", | ||
| window, microToUSD(*k.LimitMicroUSD), microToUSD(spent)), false | ||
| } | ||
| return "", true | ||
| } | ||
|
|
||
| // handleListAPIKeys handles GET /v1/keys — lists the caller's keys (masked). | ||
| func (s *Server) handleListAPIKeys(w http.ResponseWriter, r *http.Request) { | ||
| user := auth.UserFromContext(r.Context()) | ||
| if user == nil { | ||
| writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required")) | ||
| return | ||
| } | ||
| keys, err := s.store.ListAPIKeys(user.AccountID) | ||
| if err != nil { | ||
| writeJSON(w, http.StatusInternalServerError, errorResponse("server_error", "failed to list keys")) | ||
| return | ||
| } | ||
| out := make([]types.APIKeyResponse, 0, len(keys)) | ||
| for i := range keys { | ||
| out = append(out, s.apiKeyToResponse(&keys[i])) | ||
| } | ||
| writeJSON(w, http.StatusOK, types.APIKeyListResponse{Object: "list", Data: out}) | ||
| } | ||
|
|
||
| // handleCreateAPIKey handles POST /v1/keys — mints a new named, optionally | ||
| // limited key. The raw secret is returned exactly once. | ||
| func (s *Server) handleCreateAPIKey(w http.ResponseWriter, r *http.Request) { | ||
| user := auth.UserFromContext(r.Context()) | ||
| if user == nil { | ||
| writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", | ||
| "API key creation requires a Privy account — authenticate with a Privy access token")) | ||
| return | ||
| } | ||
|
|
||
| var req createAPIKeyRequest | ||
| if r.Body != nil { | ||
| // A missing/empty body is allowed (creates a default unnamed key). | ||
| if err := json.NewDecoder(r.Body).Decode(&req); err != nil && !errors.Is(err, io.EOF) { | ||
| writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", "invalid JSON body")) | ||
| return | ||
| } | ||
| } | ||
| if msg := validateKeyLimitInputs(req.LimitReset, req.LimitUSD, req.RPMLimit, req.ITPMLimit, req.OTPMLimit, req.ExpiresAt); msg != "" { | ||
| writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", msg)) | ||
| return | ||
| } | ||
|
|
||
| opts := store.APIKeyCreate{ | ||
| Name: strings.TrimSpace(req.Name), | ||
| LimitReset: store.NormalizeResetWindow(req.LimitReset), | ||
| RPMLimit: req.RPMLimit, | ||
| ITPMLimit: req.ITPMLimit, | ||
| OTPMLimit: req.OTPMLimit, | ||
| AllowedModels: req.AllowedModels, | ||
| SelfRouteOnly: req.SelfRouteOnly, | ||
| ExpiresAt: req.ExpiresAt, | ||
| } | ||
| if req.LimitUSD != nil { | ||
| m := usdToMicro(*req.LimitUSD) | ||
| opts.LimitMicroUSD = &m | ||
| } | ||
|
|
||
| raw, rec, err := s.store.CreateAPIKey(user.AccountID, opts) | ||
| if err != nil { | ||
| writeJSON(w, http.StatusInternalServerError, errorResponse("server_error", "failed to create key")) | ||
| return | ||
| } | ||
| writeJSON(w, http.StatusOK, types.CreateAPIKeyResponse{ | ||
| Key: raw, | ||
| Data: s.apiKeyToResponse(rec), | ||
| }) | ||
| } | ||
|
|
||
| // handleGetAPIKey handles GET /v1/keys/{id}. | ||
| func (s *Server) handleGetAPIKey(w http.ResponseWriter, r *http.Request) { | ||
| user := auth.UserFromContext(r.Context()) | ||
| if user == nil { | ||
| writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required")) | ||
| return | ||
| } | ||
| id := r.PathValue("id") | ||
| k, err := s.store.GetAPIKeyByID(user.AccountID, id) | ||
| if err != nil { | ||
| writeJSON(w, http.StatusNotFound, errorResponse("not_found", "key not found")) | ||
| return | ||
| } | ||
| writeJSON(w, http.StatusOK, s.apiKeyToResponse(k)) | ||
| } | ||
|
|
||
| // handleGetCallingKey handles GET /v1/key — returns the metadata for the API | ||
| // key used to authenticate the request (OpenRouter parity). | ||
| func (s *Server) handleGetCallingKey(w http.ResponseWriter, r *http.Request) { | ||
| k := apiKeyFromContext(r.Context()) | ||
| if k == nil || k.ID == "" { | ||
| writeJSON(w, http.StatusNotFound, errorResponse("not_found", | ||
| "no key metadata — this endpoint requires API key authentication")) | ||
| return | ||
| } | ||
| writeJSON(w, http.StatusOK, s.apiKeyToResponse(k)) | ||
| } | ||
|
|
||
| // handleUpdateAPIKey handles PATCH /v1/keys/{id} — sparse update of a key's | ||
| // name, disabled flag, limits, reset window, expiry, and model allow-list. | ||
| func (s *Server) handleUpdateAPIKey(w http.ResponseWriter, r *http.Request) { | ||
| user := auth.UserFromContext(r.Context()) | ||
| if user == nil { | ||
| writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required")) | ||
| return | ||
| } | ||
| id := r.PathValue("id") | ||
| existing, err := s.store.GetAPIKeyByID(user.AccountID, id) | ||
| if err != nil { | ||
| writeJSON(w, http.StatusNotFound, errorResponse("not_found", "key not found")) | ||
| return | ||
| } | ||
|
|
||
| // Decode into a presence map so we can distinguish "field omitted" (leave | ||
| // unchanged) from "field set to null" (clear the limit). | ||
| var patch map[string]json.RawMessage | ||
| if err := json.NewDecoder(r.Body).Decode(&patch); err != nil { | ||
| writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", "invalid JSON body")) | ||
| return | ||
| } | ||
| if msg := applyKeyPatch(existing, patch); msg != "" { | ||
| writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", msg)) | ||
| return | ||
| } | ||
| if msg := validateKeyLimitInputs(existing.LimitReset, nil, existing.RPMLimit, existing.ITPMLimit, existing.OTPMLimit, existing.ExpiresAt); msg != "" { | ||
| writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", msg)) | ||
| return | ||
| } | ||
|
|
||
| // Bump the auth-cache generation before AND after the mutation so a | ||
| // concurrent request cannot keep authenticating with a stale (e.g. | ||
| // just-disabled) cached record. | ||
| s.invalidateAllAPIKeyCache() | ||
| updated, err := s.store.UpdateAPIKey(user.AccountID, id, *existing) | ||
| if err != nil { | ||
| writeJSON(w, http.StatusInternalServerError, errorResponse("server_error", "failed to update key")) | ||
| return | ||
| } | ||
| s.invalidateAllAPIKeyCache() | ||
| writeJSON(w, http.StatusOK, s.apiKeyToResponse(updated)) | ||
| } | ||
|
|
||
| // applyKeyPatch merges a presence-aware PATCH body into an existing key record. | ||
| // Returns a human-readable error string on invalid input (empty when ok). | ||
| func applyKeyPatch(k *store.APIKey, patch map[string]json.RawMessage) string { | ||
| if raw, ok := patch["name"]; ok { | ||
| var name string | ||
| if err := json.Unmarshal(raw, &name); err != nil { | ||
| return "invalid value for name" | ||
| } | ||
| k.Name = strings.TrimSpace(name) | ||
| } | ||
| if raw, ok := patch["disabled"]; ok { | ||
| var disabled bool | ||
| if err := json.Unmarshal(raw, &disabled); err != nil { | ||
| return "invalid value for disabled" | ||
| } | ||
| k.Disabled = disabled | ||
| } | ||
| if raw, ok := patch["limit_reset"]; ok { | ||
| var reset string | ||
| if err := json.Unmarshal(raw, &reset); err != nil { | ||
| return "invalid value for limit_reset" | ||
| } | ||
| k.LimitReset = store.NormalizeResetWindow(reset) | ||
| } | ||
| if raw, ok := patch["limit_usd"]; ok { | ||
| if string(raw) == "null" { | ||
| k.LimitMicroUSD = nil | ||
| } else { | ||
| var usd float64 | ||
| if err := json.Unmarshal(raw, &usd); err != nil { | ||
| return "invalid value for limit_usd" | ||
| } | ||
| if usd < 0 { | ||
| return "limit_usd must be >= 0" | ||
| } | ||
| m := usdToMicro(usd) | ||
| k.LimitMicroUSD = &m | ||
| } | ||
| } | ||
| if raw, ok := patch["allowed_models"]; ok { | ||
| if string(raw) == "null" { | ||
| k.AllowedModels = nil | ||
| } else { | ||
| var models []string | ||
| if err := json.Unmarshal(raw, &models); err != nil { | ||
| return "invalid value for allowed_models" | ||
| } | ||
| k.AllowedModels = models | ||
| } | ||
| } | ||
| if raw, ok := patch["self_route_only"]; ok { | ||
| var v bool | ||
| if err := json.Unmarshal(raw, &v); err != nil { | ||
| return "invalid value for self_route_only" | ||
| } | ||
| k.SelfRouteOnly = v | ||
| } | ||
| for field, dst := range map[string]**int64{ | ||
| "rpm_limit": &k.RPMLimit, | ||
| "itpm_limit": &k.ITPMLimit, | ||
| "otpm_limit": &k.OTPMLimit, | ||
| } { | ||
| if raw, ok := patch[field]; ok { | ||
| if string(raw) == "null" { | ||
| *dst = nil | ||
| } else { | ||
| var v int64 | ||
| if err := json.Unmarshal(raw, &v); err != nil { | ||
| return "invalid value for " + field | ||
| } | ||
| *dst = &v | ||
| } | ||
| } | ||
| } | ||
| if raw, ok := patch["expires_at"]; ok { | ||
| if string(raw) == "null" { | ||
| k.ExpiresAt = nil | ||
| } else { | ||
| var t time.Time | ||
| if err := json.Unmarshal(raw, &t); err != nil { | ||
| return "invalid value for expires_at (use RFC 3339)" | ||
| } | ||
| k.ExpiresAt = &t | ||
| } | ||
| } | ||
| return "" | ||
| } | ||
|
|
||
| // handleDeleteAPIKey handles DELETE /v1/keys/{id} — permanently deletes a key. | ||
| func (s *Server) handleDeleteAPIKey(w http.ResponseWriter, r *http.Request) { | ||
| user := auth.UserFromContext(r.Context()) | ||
| if user == nil { | ||
| writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required")) | ||
| return | ||
| } | ||
| id := r.PathValue("id") | ||
| s.invalidateAllAPIKeyCache() | ||
| if err := s.store.RevokeAPIKeyByID(user.AccountID, id); err != nil { | ||
| writeJSON(w, http.StatusNotFound, errorResponse("not_found", "key not found")) | ||
| return | ||
| } | ||
| s.invalidateAllAPIKeyCache() | ||
| writeJSON(w, http.StatusOK, types.RevokeKeyResponse{Status: "revoked"}) | ||
| } | ||
|
|
||
| // handleRotateAPIKey handles POST /v1/keys/{id}/rotate — mints a fresh secret | ||
| // carrying the same limits and metadata, then deletes the old key. The new | ||
| // secret is returned exactly once. | ||
| func (s *Server) handleRotateAPIKey(w http.ResponseWriter, r *http.Request) { | ||
| user := auth.UserFromContext(r.Context()) | ||
| if user == nil { | ||
| writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required")) | ||
| return | ||
| } | ||
| id := r.PathValue("id") | ||
| // Bump the auth-cache generation before AND after the mutation so the old | ||
| // secret stops authenticating the instant rotation commits. | ||
| s.invalidateAllAPIKeyCache() | ||
| raw, rec, err := s.store.RotateAPIKey(user.AccountID, id) | ||
| if err != nil { | ||
| writeJSON(w, http.StatusNotFound, errorResponse("not_found", "key not found")) | ||
| return | ||
| } | ||
| s.invalidateAllAPIKeyCache() | ||
| writeJSON(w, http.StatusOK, types.CreateAPIKeyResponse{ | ||
| Key: raw, | ||
| Data: s.apiKeyToResponse(rec), | ||
| }) | ||
| } |
There was a problem hiding this comment.
🔵 [INFO] 🧩 New 447-line file extracted from consumer.go may be premature separation
💡 Suggestion: Consider if API key handlers truly need their own file or if they could remain in consumer.go with better organization
📊 Score: 2×3 = 6 · Category: over-abstraction
| package api | ||
|
|
||
| // request_introspection.go holds helpers for introspecting and lightly | ||
| // reshaping inbound inference request bodies before routing/dispatch: | ||
| // token and cost estimation (routing vs billing), media/tool detection, | ||
| // remote media-URL rejection, cache-affinity key derivation, and | ||
| // provider-serial allowlist parsing. Most are pure helpers with no Server | ||
| // state; the pre-dispatch media-URL rejection (rejectRemoteMediaURLs) hangs | ||
| // off *Server only to record rejection telemetry. Split out of consumer.go | ||
| // to keep the request-handling orchestrator thin. | ||
|
|
||
| import ( | ||
| "crypto/sha256" | ||
| "encoding/json" | ||
| "fmt" | ||
| "net/http" | ||
| "os" | ||
| "strconv" | ||
| "strings" | ||
|
|
||
| "github.com/eigeninference/d-inference/coordinator/store" | ||
| ) | ||
|
|
||
| // Media prompt-token costs. A vision encoder turns each image/video into a | ||
| // bounded number of soft tokens (Gemma 4 caps around a few hundred per image) | ||
| // regardless of the base64 byte length, so counting a `data:` URI as text | ||
| // inflates the estimate by orders of magnitude — distorting routing admission and | ||
| // over-reserving balance. These flat per-media costs keep both sane. | ||
| const ( | ||
| imagePromptTokenCost = 300 | ||
| videoPromptTokenCost = 1500 | ||
| ) | ||
|
|
||
| func intFromRequestValue(v any) (int, bool) { | ||
| switch x := v.(type) { | ||
| case int: | ||
| return x, true | ||
| case int32: | ||
| return int(x), true | ||
| case int64: | ||
| return int(x), true | ||
| case float64: | ||
| return int(x), true | ||
| case json.Number: | ||
| n, err := x.Int64() | ||
| if err != nil { | ||
| return 0, false | ||
| } | ||
| return int(n), true | ||
| default: | ||
| return 0, false | ||
| } | ||
| } | ||
|
|
||
| // approximateTokenCount returns a rough token estimate for routing and queue | ||
| // admission. The len/4 heuristic is a reasonable average for English text | ||
| // with GPT-style BPE tokenizers. This value feeds into the scheduler's | ||
| // capacity checks (pendingTokenBudget, freeMemoryAdmits) where a tighter | ||
| // estimate produces better routing decisions. | ||
| // | ||
| // For billing reservation (where underestimation causes provider shortfall), | ||
| // use approximateTokenCountUpperBound instead. | ||
| func approximateTokenCount(v any) int { | ||
| if v == nil { | ||
| return 0 | ||
| } | ||
| switch x := v.(type) { | ||
| case string: | ||
| if x == "" { | ||
| return 0 | ||
| } | ||
| tokens := len(x) / 4 | ||
| if tokens < 1 { | ||
| tokens = 1 | ||
| } | ||
| return tokens | ||
| default: | ||
| b, err := json.Marshal(v) | ||
| if err != nil { | ||
| return 0 | ||
| } | ||
| tokens := len(b) / 4 | ||
| if tokens < 1 { | ||
| tokens = 1 | ||
| } | ||
| return tokens | ||
| } | ||
| } | ||
|
|
||
| // approximateTokenCountUpperBound returns a guaranteed upper bound on the | ||
| // number of tokens a BPE tokenizer would produce for v. Every BPE vocabulary | ||
| // starts with one token per byte and can only merge, so len(text) >= tokens | ||
| // for any model family, any language, forever. This is used only for billing | ||
| // reservation to ensure the pre-flight debit always covers the actual cost. | ||
| // | ||
| // Using len(text) over-reserves by ~3-4x on average for English prose, but | ||
| // the difference is refunded immediately after inference completes, so | ||
| // consumers are never overcharged — they only need sufficient balance to | ||
| // cover the reservation hold. | ||
| func approximateTokenCountUpperBound(v any) int { | ||
| if v == nil { | ||
| return 0 | ||
| } | ||
| switch x := v.(type) { | ||
| case string: | ||
| return len(x) | ||
| default: | ||
| b, err := json.Marshal(v) | ||
| if err != nil { | ||
| return 0 | ||
| } | ||
| return len(b) | ||
| } | ||
| } | ||
|
|
||
| func estimatePromptTokens(parsed map[string]any) int { | ||
| total := 0 | ||
| if v, ok := parsed["messages"]; ok { | ||
| total += messagesPromptTokens(v) | ||
| } | ||
| if v, ok := parsed["input"]; ok { | ||
| total += inputPromptTokens(v) | ||
| } | ||
| if v, ok := parsed["prompt"]; ok { | ||
| total += approximateTokenCount(v) | ||
| } | ||
| if total == 0 { | ||
| total = approximateTokenCount(parsed) | ||
| } | ||
| return total | ||
| } | ||
|
|
||
| // estimateBillingPromptTokens returns a guaranteed upper bound on prompt | ||
| // tokens for billing reservation. Uses byte-length (not len/4) so the | ||
| // pre-flight reservation always covers actual cost. This value must NOT | ||
| // be used for routing — see estimatePromptTokens for that. | ||
| func estimateBillingPromptTokens(parsed map[string]any) int { | ||
| total := 0 | ||
| if v, ok := parsed["messages"]; ok { | ||
| // Billing MUST stay a guaranteed upper bound (len(bytes) >= tokens for any | ||
| // BPE tokenizer), so it keeps counting full message bytes — including a | ||
| // base64 image's bytes and every non-content field (role, tool_calls, | ||
| // name). Switching to the media-aware flat count here would DROP those | ||
| // fields and under-reserve for tool-calling requests. Over-reservation on a | ||
| // large image is safe (it is refunded after inference); the routing/ITPM | ||
| // estimate (estimatePromptTokens) is the media-aware one. | ||
| total += approximateTokenCountUpperBound(v) | ||
| } | ||
| if v, ok := parsed["input"]; ok { | ||
| total += approximateTokenCountUpperBound(v) | ||
| } | ||
| if v, ok := parsed["prompt"]; ok { | ||
| total += approximateTokenCountUpperBound(v) | ||
| } | ||
| if total == 0 { | ||
| total = approximateTokenCountUpperBound(parsed) | ||
| } | ||
| return total | ||
| } | ||
|
|
||
| // isMediaPartType reports whether an OpenAI/OpenRouter content-part type denotes | ||
| // image or video input. | ||
| func isMediaPartType(t string) bool { | ||
| switch t { | ||
| // OpenAI chat (image_url/video_url), OpenAI Responses (input_image/input_video), | ||
| // and Anthropic /v1/messages content blocks ({"type":"image"|"video","source":…}). | ||
| case "image_url", "input_image", "image", "video_url", "input_video", "video": | ||
| return true | ||
| } | ||
| return false | ||
| } | ||
|
|
||
| // messageContentTokens estimates ROUTING prompt tokens for one message's | ||
| // `content`, counting text parts as text (len/4) and each image/video part as a | ||
| // flat media cost (never the base64 length). Used only for the routing/ITPM | ||
| // estimate; billing uses approximateTokenCountUpperBound (a guaranteed upper | ||
| // bound that intentionally still counts the base64 bytes). | ||
| func messageContentTokens(content any) int { | ||
| textTokens := func(s string) int { | ||
| if s == "" { | ||
| return 0 | ||
| } | ||
| if t := len(s) / 4; t > 0 { | ||
| return t | ||
| } | ||
| return 1 | ||
| } | ||
| switch c := content.(type) { | ||
| case string: | ||
| return textTokens(c) | ||
| case []any: | ||
| total := 0 | ||
| for _, part := range c { | ||
| pm, ok := part.(map[string]any) | ||
| if !ok { | ||
| continue | ||
| } | ||
| typ, _ := pm["type"].(string) | ||
| switch { | ||
| case typ == "text" || typ == "input_text": | ||
| if s, ok := pm["text"].(string); ok { | ||
| total += textTokens(s) | ||
| } | ||
| case typ == "image_url" || typ == "input_image" || typ == "image": | ||
| total += imagePromptTokenCost | ||
| case typ == "video_url" || typ == "input_video" || typ == "video": | ||
| total += videoPromptTokenCost | ||
| default: | ||
| if b, err := json.Marshal(pm); err == nil { | ||
| total += len(b) / 4 | ||
| } | ||
| } | ||
| } | ||
| return total | ||
| default: | ||
| return approximateTokenCount(content) | ||
| } | ||
| } | ||
|
|
||
| // messagesPromptTokens sums media-aware routing content tokens across a messages | ||
| // array. Falls back to the len/4 heuristic when messages isn't the standard | ||
| // array shape. | ||
| func messagesPromptTokens(messages any) int { | ||
| arr, ok := messages.([]any) | ||
| if !ok { | ||
| return approximateTokenCount(messages) | ||
| } | ||
| total := 0 | ||
| for _, m := range arr { | ||
| mm, ok := m.(map[string]any) | ||
| if !ok { | ||
| total += approximateTokenCount(m) | ||
| continue | ||
| } | ||
| total += 4 // small per-message framing (role + delimiters) | ||
| total += messageContentTokens(mm["content"]) | ||
| } | ||
| return total | ||
| } | ||
|
|
||
| // inputPromptTokens estimates the Responses API `input` field. A string input | ||
| // is plain text (len/4). Structured input is an array of message-like items with | ||
| // `content` parts, so reuse the same media-aware content estimator as chat | ||
| // messages instead of counting JSON wrapper bytes. | ||
| func inputPromptTokens(input any) int { | ||
| switch x := input.(type) { | ||
| case string: | ||
| return approximateTokenCount(x) | ||
| case []any: | ||
| total := 0 | ||
| for _, item := range x { | ||
| switch m := item.(type) { | ||
| case string: | ||
| total += approximateTokenCount(m) | ||
| case map[string]any: | ||
| content, ok := m["content"] | ||
| if !ok { | ||
| total += approximateTokenCount(m) | ||
| continue | ||
| } | ||
| total += 4 // role/type framing, matching messagesPromptTokens. | ||
| total += messageContentTokens(content) | ||
| default: | ||
| total += approximateTokenCount(item) | ||
| } | ||
| } | ||
| return total | ||
| default: | ||
| return approximateTokenCount(input) | ||
| } | ||
| } | ||
|
|
||
| // contentPartsHaveMedia reports whether a `content` value (a content-part array) | ||
| // carries any image/video part. | ||
| func contentPartsHaveMedia(content any) bool { | ||
| parts, ok := content.([]any) | ||
| if !ok { | ||
| return false | ||
| } | ||
| for _, part := range parts { | ||
| pm, ok := part.(map[string]any) | ||
| if !ok { | ||
| continue | ||
| } | ||
| if typ, _ := pm["type"].(string); isMediaPartType(typ) { | ||
| return true | ||
| } | ||
| } | ||
| return false | ||
| } | ||
|
|
||
| // detectMediaRequirement reports whether the request carries image/video input. | ||
| // The coordinator sees plaintext at this point (sealedTransport decrypts before | ||
| // the handler), so this drives the vision routing gate and the fail-fast "no | ||
| // vision-capable provider" response. It scans both the Chat Completions | ||
| // `messages[].content` parts and the Responses API `input[].content` parts so a | ||
| // media request on either surface is gated (never silently routed text-blind). | ||
| func detectMediaRequirement(parsed map[string]any) bool { | ||
| if messages, ok := parsed["messages"].([]any); ok { | ||
| for _, m := range messages { | ||
| if mm, ok := m.(map[string]any); ok && contentPartsHaveMedia(mm["content"]) { | ||
| return true | ||
| } | ||
| } | ||
| } | ||
| // Responses API: `input` may be a string (no media) or an array of items, | ||
| // each carrying `content` parts in the same image_url/input_image shape. | ||
| if input, ok := parsed["input"].([]any); ok { | ||
| for _, item := range input { | ||
| if im, ok := item.(map[string]any); ok && contentPartsHaveMedia(im["content"]) { | ||
| return true | ||
| } | ||
| } | ||
| } | ||
| return false | ||
| } | ||
|
|
||
| // isInlineDataURI reports whether a media reference is an inline base64 data: URI | ||
| // — the ONLY form the provider's E2E-encrypted VLM path accepts (see | ||
| // VLMRequestInference.MediaError.invalidURL, which 400s anything else). | ||
| func isInlineDataURI(s string) bool { | ||
| return strings.HasPrefix(strings.TrimSpace(s), "data:") | ||
| } | ||
|
|
||
| // mediaPartURLString returns the URL/inline reference carried by a media content | ||
| // part and whether the part IS a media part. Covers OpenAI chat (image_url/ | ||
| // video_url objects or bare strings), OpenAI Responses (input_image/input_video), | ||
| // and Anthropic source blocks ({type:"image"|"video", source:{type,…}}). A media | ||
| // part whose reference can't be read returns ("", true) so the caller fails OPEN. | ||
| func mediaPartURLString(pm map[string]any) (ref string, isMedia bool) { | ||
| typ, _ := pm["type"].(string) | ||
| if !isMediaPartType(typ) { | ||
| return "", false | ||
| } | ||
| switch typ { | ||
| case "image_url", "input_image", "video_url", "input_video": | ||
| field := "image_url" | ||
| if typ == "video_url" || typ == "input_video" { | ||
| field = "video_url" | ||
| } | ||
| switch v := pm[field].(type) { | ||
| case string: | ||
| return v, true | ||
| case map[string]any: | ||
| if u, ok := v["url"].(string); ok { | ||
| return u, true | ||
| } | ||
| } | ||
| return "", true | ||
| case "image", "video": // Anthropic source block | ||
| if src, ok := pm["source"].(map[string]any); ok { | ||
| switch st, _ := src["type"].(string); st { | ||
| case "url": | ||
| u, _ := src["url"].(string) | ||
| return u, true // remote reference | ||
| case "base64": | ||
| // Inline raw base64 (not a data: URI). Treated as inline/OK in v1 — | ||
| // the provider's Anthropic path accepts it; only remote refs are the | ||
| // production storm. Marked inline so it is never rejected here. | ||
| return "data:anthropic-inline-base64", true | ||
| } | ||
| } | ||
| return "", true | ||
| } | ||
| return "", true | ||
| } | ||
|
|
||
| // validateMediaParts walks every media part in a chat (messages[]) or Responses | ||
| // (input[]) body and returns the first REMOTE / non-inline media reference. The | ||
| // provider VLM path accepts ONLY inline data: URIs, so a remote http(s)://, | ||
| // file://, or otherwise non-data: reference is the production gemma-vision 400 | ||
| // source. Returns ok=false with the offending reference so the caller rejects it | ||
| // pre-dispatch (one clean 400) instead of dispatching and 400ing across the fleet. | ||
| // Unknown/unreadable part shapes fall through (fail-OPEN) — never wrongly 400 a | ||
| // body we don't model. | ||
| func validateMediaParts(parsed map[string]any) (badRef string, ok bool) { | ||
| check := func(content any) (string, bool) { | ||
| parts, ok := content.([]any) | ||
| if !ok { | ||
| return "", true | ||
| } | ||
| for _, p := range parts { | ||
| pm, ok := p.(map[string]any) | ||
| if !ok { | ||
| continue | ||
| } | ||
| ref, isMedia := mediaPartURLString(pm) | ||
| if !isMedia || ref == "" { | ||
| continue | ||
| } | ||
| if !isInlineDataURI(ref) { | ||
| return ref, false | ||
| } | ||
| } | ||
| return "", true | ||
| } | ||
| if msgs, ok := parsed["messages"].([]any); ok { | ||
| for _, m := range msgs { | ||
| if mm, ok := m.(map[string]any); ok { | ||
| if ref, good := check(mm["content"]); !good { | ||
| return ref, false | ||
| } | ||
| } | ||
| } | ||
| } | ||
| if input, ok := parsed["input"].([]any); ok { | ||
| for _, it := range input { | ||
| if im, ok := it.(map[string]any); ok { | ||
| if ref, good := check(im["content"]); !good { | ||
| return ref, false | ||
| } | ||
| } | ||
| } | ||
| } | ||
| return "", true | ||
| } | ||
|
|
||
| // visionRejectRemoteEnabled gates the C4 pre-dispatch remote-media-URL rejection. | ||
| // Default ON; DARKBLOOM_VISION_REJECT_REMOTE_URLS=false (or 0) restores the prior | ||
| // dispatch-then-provider-400 behavior (still bounded by C1). Read live so the | ||
| // kill switch toggles without a redeploy. | ||
| func visionRejectRemoteEnabled() bool { | ||
| v := strings.TrimSpace(os.Getenv("DARKBLOOM_VISION_REJECT_REMOTE_URLS")) | ||
| if v == "" { | ||
| return true | ||
| } | ||
| on, err := strconv.ParseBool(v) | ||
| return err != nil || on | ||
| } | ||
|
|
||
| // rejectRemoteMediaURLs fails a vision request fast (one terminal 400) when any | ||
| // media part carries a remote/non-inline URL, mirroring the provider's data:-only | ||
| // contract. Pre-dispatch — no provider is contacted. handled=true => caller returns. | ||
| func (s *Server) rejectRemoteMediaURLs(w http.ResponseWriter, r *http.Request, parsed map[string]any, model, publicModel string, requiresVision, hasTools bool) (handled bool) { | ||
| if !requiresVision || !visionRejectRemoteEnabled() { | ||
| return false | ||
| } | ||
| badRef, ok := validateMediaParts(parsed) | ||
| if ok { | ||
| return false | ||
| } | ||
| shown := badRef | ||
| if len(shown) > 200 { | ||
| shown = shown[:200] + "…" | ||
| } | ||
| stream, _ := parsed["stream"].(bool) | ||
| s.recordRejection(rejectionInfo{ | ||
| r: r, | ||
| stage: "validation", | ||
| reasonCode: "bad_param", | ||
| httpStatus: http.StatusBadRequest, | ||
| keyID: keyIDFromContext(r.Context()), | ||
| consumerKeyHash: store.HashKey(consumerKeyFromContext(r.Context())), | ||
| requestedModel: publicModel, | ||
| resolvedModel: model, | ||
| stream: stream, | ||
| requiresVision: true, | ||
| hasTools: hasTools, | ||
| params: rejectionSamplingParams(parsed), | ||
| }) | ||
| s.ddIncr("inference.media_remote_url_rejected", []string{"model:" + model}) | ||
| writeJSON(w, http.StatusBadRequest, errorResponse("invalid_request_error", | ||
| "image/video input must be an inline base64 data: URI (e.g. \"data:image/jpeg;base64,…\"); "+ | ||
| "remote http(s):// and file:// media URLs are not supported on this end-to-end-encrypted endpoint. Got: "+shown, | ||
| withParam("messages"))) | ||
| return true | ||
| } | ||
|
|
||
| // requestHasTools reports whether the request carries a non-empty top-level | ||
| // "tools" array (Chat Completions and Responses API share the field name). | ||
| // Drives Traits.HasTools so tool-bearing requests only route to providers whose | ||
| // binaries survive tool-schema template rendering (version floor + per-model | ||
| // template_render_ok gate in the scheduler). | ||
| func requestHasTools(parsed map[string]any) bool { | ||
| tools, ok := parsed["tools"].([]any) | ||
| return ok && len(tools) > 0 | ||
| } | ||
|
|
||
| func requestCacheAffinityKey(parsed map[string]any) string { | ||
| raw, ok := parsed["prompt_cache_key"].(string) | ||
| if !ok || raw == "" { | ||
| return "" | ||
| } | ||
| const maxPromptCacheKeyBytes = 512 | ||
| if len(raw) > maxPromptCacheKeyBytes { | ||
| return "" | ||
| } | ||
| sum := sha256.Sum256([]byte(raw)) | ||
| return fmt.Sprintf("%x", sum[:]) | ||
| } | ||
|
|
||
| func estimateRequestedMaxTokens(parsed map[string]any) int { | ||
| for _, key := range []string{"max_tokens", "max_completion_tokens", "max_output_tokens"} { | ||
| if n, ok := intFromRequestValue(parsed[key]); ok && n > 0 { | ||
| if copies, ok := intFromRequestValue(parsed["n"]); ok && copies > 1 { | ||
| return n * copies | ||
| } | ||
| return n | ||
| } | ||
| } | ||
| if copies, ok := intFromRequestValue(parsed["n"]); ok && copies > 1 { | ||
| return 256 * copies | ||
| } | ||
| return 256 | ||
| } | ||
|
|
||
| func parseProviderSerialAllowlist(parsed map[string]any) ([]string, bool, error) { | ||
| var rawValues []any | ||
| provided := false | ||
| for _, key := range []string{"provider_serial", "provider_serials"} { | ||
| v, ok := parsed[key] | ||
| if !ok { | ||
| continue | ||
| } | ||
| provided = true | ||
| switch x := v.(type) { | ||
| case string: | ||
| rawValues = append(rawValues, x) | ||
| case []any: | ||
| rawValues = append(rawValues, x...) | ||
| default: | ||
| return nil, true, fmt.Errorf("%s must be a string or array of strings", key) | ||
| } | ||
| } | ||
| if !provided { | ||
| return nil, false, nil | ||
| } | ||
|
|
||
| seen := make(map[string]struct{}, len(rawValues)) | ||
| ids := make([]string, 0, len(rawValues)) | ||
| for _, raw := range rawValues { | ||
| id, ok := raw.(string) | ||
| if !ok { | ||
| return nil, true, fmt.Errorf("provider_serials must contain only strings") | ||
| } | ||
| id = strings.TrimSpace(id) | ||
| if id == "" { | ||
| continue | ||
| } | ||
| if _, exists := seen[id]; exists { | ||
| continue | ||
| } | ||
| seen[id] = struct{}{} | ||
| ids = append(ids, id) | ||
| } | ||
| if len(ids) == 0 { | ||
| return nil, true, fmt.Errorf("provider_serials must include at least one provider serial") | ||
| } | ||
| return ids, true, nil | ||
| } | ||
|
|
||
| func stripProviderRoutingFields(parsed map[string]any) bool { | ||
| changed := false | ||
| for _, key := range []string{"provider_serial", "provider_serials"} { | ||
| if _, ok := parsed[key]; ok { | ||
| delete(parsed, key) | ||
| changed = true | ||
| } | ||
| } | ||
| return changed | ||
| } |
There was a problem hiding this comment.
🔵 [INFO] 🧩 New 561-line file for request introspection helpers may be over-modularized
💡 Suggestion: Evaluate if these pure helper functions need their own file or could be organized differently within existing files
📊 Score: 2×3 = 6 · Category: over-abstraction
| // `inner` is a QuantizedKVCache, so its `copy()` must return one too. | ||
| // Assert that invariant explicitly rather than force-casting blind. | ||
| guard let copiedInner = inner.copy() as? QuantizedKVCache else { | ||
| preconditionFailure( | ||
| "ProtocolSafeQuantizedKVCache.copy(): inner.copy() did not return a QuantizedKVCache") | ||
| } | ||
| let new = ProtocolSafeQuantizedKVCache( | ||
| groupSize: groupSize, |
There was a problem hiding this comment.
🔵 [INFO] 🧩 Added preconditionFailure with detailed error message for type casting that should never fail
💡 Suggestion: Consider if the guard+preconditionFailure is necessary or if the original force-cast with a comment would be clearer
📊 Score: 2×2 = 4 · Category: over-abstraction
| if billingSessionID != "" { | ||
| _ = s.billing.Store().CompleteBillingSession(billingSessionID) | ||
| // Best-effort: the deposit is already credited above, but a failure here | ||
| // leaves the session marked incomplete (and replayable). Surface it. | ||
| if err := s.billing.Store().CompleteBillingSession(billingSessionID); err != nil { | ||
| s.logger.Error("stripe: failed to mark billing session complete", | ||
| "billing_session_id", billingSessionID, "error", err) | ||
| s.ddIncr("billing.session_complete_failed", nil) | ||
| } | ||
| } | ||
| if referralCode != "" { | ||
| _ = s.billing.Referral().Apply(consumerKey, referralCode) | ||
| // Best-effort: a failure here means the referrer is not credited for this | ||
| // deposit; never silently swallow it. | ||
| if err := s.billing.Referral().Apply(consumerKey, referralCode); err != nil { | ||
| s.logger.Error("stripe: failed to apply referral credit", "error", err) | ||
| s.ddIncr("billing.referral_apply_failed", nil) | ||
| } | ||
| } |
There was a problem hiding this comment.
🔵 [INFO] 🧩 Similar error handling pattern repeated for billing session completion and referral application
💡 Suggestion: Consider extracting a helper function for the common 'log error + increment metric' pattern
📊 Score: 1×2 = 2 · Category: duplicate logic
|
Found 1 test failure on Blacksmith runners: Failure
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: b4549127f3
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
|
|
||
| // Both should be findable for the same model | ||
| p1 := reg.FindProvider(model) | ||
| p1 := findRoutableProvider(reg, model) |
There was a problem hiding this comment.
Update remaining FindProvider callers
When running the root E2E package, e2e/integration_test.go:607 still calls s.Coordinator.Registry.FindProvider(model), but this commit removes that registry method. The in-package tests are migrated to findRoutableProvider here, so go test ./e2e/... or an all-packages test run no longer compiles until that caller is migrated or a compatibility shim is kept.
Useful? React with 👍 / 👎.
| name: "darkbloom", | ||
| dependencies: [ | ||
| "ProviderCore", | ||
| "ProviderBenchmark", |
There was a problem hiding this comment.
Split benchmark code out of the shipped CLI
scripts/install.sh:11 installs the darkbloom binary as the provider bundle entrypoint, so adding ProviderBenchmark to the darkbloom target means every installed provider still links the benchmark-only target, including the fatal-error cache stubs this refactor was trying to keep out of production binaries. If serving installs are meant to stay lean, move the benchmark command to a separate executable or otherwise avoid depending on ProviderBenchmark from darkbloom.
Useful? React with 👍 / 👎.
Round-1 cleanup deleted registry.FindProvider but only re-ran go test inside coordinator/, missing the e2e package. The lone surviving caller in TestIntegration_SwiftProviderRealRoutingGates broke the e2e build (Blacksmith e2e/Build). Mirror the in-coordinator migration: add a local findRoutableProvider helper that probes routability through the production ReserveProviderEx path (same structural/privacy/trust/challenge/capacity gates), reserves then releases capacity, and returns the selected provider. No deleted methods or shims reintroduced. Verified: go build ./... and go vet ./... (compiles e2e/) green from repo root. Co-authored-by: Cursor <cursoragent@cursor.com>
scripts/install.sh ships the `darkbloom` binary, but the `darkbloom`
target depended on ProviderBenchmark, which carried the KV-quant
gate/eval code AND the fatalError protocol-conformance KV-cache stubs
(ProtocolSafeQuantizedKVCache, the V-only/BFloat16 caches). Those
benchmark-only stubs were linking into every installed provider,
defeating round-1's goal.
Split ProviderBenchmark into two targets along the existing clean
boundary (verified: the lightweight runners and the KVQuant/ dir do not
reference each other):
- ProviderBenchmark (lean): ModelBenchmark, ThroughputSweep(+report),
DecodeBandwidthModel — the only benchmark code `darkbloom benchmark`
needs. Still linked by `darkbloom`, so the command keeps working.
- ProviderBenchmarkKVQuant (heavy, has the fatalError stubs): the
KVQuant/ dir, linked ONLY by kv-quant-gate, kv-attn-selftest, and
their tests — never by `darkbloom`.
kv-quant-gate / kv-attn-selftest / the 3 KVQuant test files now import
ProviderBenchmarkKVQuant; the darkbloom benchmark command and
ThroughputSweepTests keep importing the lean ProviderBenchmark.
Bundle-safe: release builds by product (`swift build -c release
--product darkbloom`); product names, install paths, and
LatestProviderVersion are unchanged.
Verified: `swift build --build-tests` green (all targets + tests).
nm proof: darkbloom links 0 ProviderBenchmarkKVQuant symbols (939 lean
ProviderBenchmark symbols intact); kv-quant-gate links 3694.
Co-authored-by: Cursor <cursoragent@cursor.com>
Behavior-preserving cleanup of the alias capacity/TTFT fallback helper.
Both fallback modes already share a SINGLE Previous-build capacity probe
(QuickCapacityCheckWithTTFTForRequest) — the desired build is probed once
at the call site and the previous build once here, on different models, so
there is no redundant capacity check to remove. This reduces the residual
branching instead:
- merge the two identical modelShed / !IsModelInCatalog early-return
guards (both pure, both return the same tuple) into one
- hoist the duplicated `mode == aliasFallbackTTFT` test into a single
`enforceTTFT` bool and drop the `tooSlow` temp
The failure-path model selection is unchanged (TTFT mode reports the
probed Previous build the caller uses as the alternate TTFT estimate;
capacity mode keeps the current build it discards), so all returns are
byte-identical to before.
Verified: coordinator/api alias fallback tests + full `go test ./...`
green.
Co-authored-by: Cursor <cursoragent@cursor.com>
|
Thanks for the reviews — addressed in Fixed
Verified (threat-model follow-ups): the Respectfully declined:
|
ethenotethan
left a comment
There was a problem hiding this comment.
Automated Code Review — Layr-Labs/d-inference#
Verdict: REQUEST_CHANGES
Security — ⚠️ SKIPPED
Error: Expecting property name enclosed in double quotes: line 2 column 5 (char 6)
Performance — 4 finding(s) (2 blocking)
- 🔵 [INFO]
coordinator/api/billing_handlers.go:176-181— Silent error swallowing replaced with logging but no error propagation- Suggestion: Consider whether billing session completion failures should propagate to the caller or affect the response, as financial operations typically require strict error handling
- 🔵 [INFO]
coordinator/api/billing_handlers.go:183-188— Referral credit failures now logged but still best-effort- Suggestion: Consider whether referral credit failures should be retried or queued for later processing to ensure referrers receive proper credit
- 🟡 [MEDIUM]
coordinator/api/provider.go:2247-2252— Financial refund failures now logged but could still result in overcharging consumers- Suggestion: Consider implementing retry logic or dead letter queue for failed refunds to prevent consumer overcharging
- 🟡 [MEDIUM]
coordinator/api/provider.go:2437-2442— Platform fee credit failures could drop revenue accounting- Suggestion: Consider implementing retry logic or persistent queue for platform fee credits to ensure accurate revenue tracking
Type_diligence — ✅ No issues found
Additive_complexity — 6 finding(s) (1 blocking)
- 🔵 [INFO]
coordinator/api/apikey_handlers.go:1-447— New 447-line file extracted from consumer.go but still tightly coupled to Server- Suggestion: Consider making this a separate package with its own interface to reduce coupling to the main Server struct
- 🔵 [INFO]
coordinator/api/billing_handlers.go:176-190— Error handling pattern repeated for billing session completion and referral application- Suggestion: Extract a helper function for the common 'log error + emit telemetry' pattern used in both error cases
- 🔵 [INFO]
coordinator/api/consumer.go:388-444— aliasFallbackMode enum with only two values and complex parameter passing- Suggestion: Consider using a boolean parameter instead of an enum with just two cases, or inline the logic if the abstraction doesn't add clarity
- 🟡 [MEDIUM]
coordinator/api/consumer.go:416-444— maybeFallbackAlias function has 9 parameters and complex conditional logic- Suggestion: Break this into smaller functions or use a struct to group related parameters. The function is doing too many things at once
- 🔵 [INFO]
coordinator/api/request_introspection.go:1-561— Large 561-line file mixing pure functions with Server-dependent methods- Suggestion: Split into pure utility functions (token counting, media detection) and Server-dependent methods (rejection handling with telemetry)
- 🔵 [INFO]
provider-swift/Sources/ProviderCore/ProviderLogger.swift:1-46— Custom logger wrapper around os.Logger with conditional compilation- Suggestion: Consider using Swift's built-in Logger from the Logging library or a more established logging framework instead of a custom wrapper
10 finding(s) total, 3 blocking. Verdict: REQUEST_CHANGES.
🤖 Automated review by Centaur · DAR-186
| // Best-effort: the deposit is already credited above, but a failure here | ||
| // leaves the session marked incomplete (and replayable). Surface it. | ||
| if err := s.billing.Store().CompleteBillingSession(billingSessionID); err != nil { | ||
| s.logger.Error("stripe: failed to mark billing session complete", | ||
| "billing_session_id", billingSessionID, "error", err) | ||
| s.ddIncr("billing.session_complete_failed", nil) |
There was a problem hiding this comment.
🔵 [INFO] ⚡ Silent error swallowing replaced with logging but no error propagation
💡 Suggestion: Consider whether billing session completion failures should propagate to the caller or affect the response, as financial operations typically require strict error handling
📊 Score: 2×3 = 6 · Category: error_handling
| } | ||
| if referralCode != "" { | ||
| _ = s.billing.Referral().Apply(consumerKey, referralCode) | ||
| // Best-effort: a failure here means the referrer is not credited for this | ||
| // deposit; never silently swallow it. | ||
| if err := s.billing.Referral().Apply(consumerKey, referralCode); err != nil { | ||
| s.logger.Error("stripe: failed to apply referral credit", "error", err) |
There was a problem hiding this comment.
🔵 [INFO] ⚡ Referral credit failures now logged but still best-effort
💡 Suggestion: Consider whether referral credit failures should be retried or queued for later processing to ensure referrers receive proper credit
📊 Score: 2×3 = 6 · Category: error_handling
| // Financial: a failed refund over-charges the consumer. Never swallow it. | ||
| if err := s.store.Credit(pr.ConsumerKey, refund, store.LedgerRefund, msg.RequestID); err != nil { | ||
| s.logger.Error("failed to credit settlement refund to consumer", | ||
| "request_id", msg.RequestID, "refund_micro_usd", refund, "error", err) | ||
| s.ddIncr("billing.credit_failed", []string{"op:settlement_refund"}) | ||
| } |
There was a problem hiding this comment.
🟡 [MEDIUM] ⚡ Financial refund failures now logged but could still result in overcharging consumers
💡 Suggestion: Consider implementing retry logic or dead letter queue for failed refunds to prevent consumer overcharging
📊 Score: 3×4 = 12 · Category: error_handling
| // Financial: a failed platform-fee credit drops revenue accounting. Never swallow it. | ||
| if err := s.store.Credit("platform", platformFee, store.LedgerPlatformFee, msg.RequestID); err != nil { | ||
| s.logger.Error("failed to credit platform fee", | ||
| "request_id", msg.RequestID, "platform_fee_micro_usd", platformFee, "error", err) | ||
| s.ddIncr("billing.credit_failed", []string{"op:platform_fee"}) | ||
| } |
There was a problem hiding this comment.
🟡 [MEDIUM] ⚡ Platform fee credit failures could drop revenue accounting
💡 Suggestion: Consider implementing retry logic or persistent queue for platform fee credits to ensure accurate revenue tracking
📊 Score: 3×4 = 12 · Category: error_handling
| package api | ||
|
|
||
| // apikey_handlers.go holds the consumer-facing API key management endpoints | ||
| // (create/list/get/update/rotate/delete) plus their request validation and | ||
| // response-shaping helpers. Split out of consumer.go to keep key management | ||
| // separate from the inference request path. | ||
|
|
||
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "errors" | ||
| "fmt" | ||
| "io" | ||
| "net/http" | ||
| "strings" | ||
| "time" | ||
|
|
||
| "github.com/eigeninference/d-inference/coordinator/auth" | ||
| "github.com/eigeninference/d-inference/coordinator/store" | ||
|
|
||
| "github.com/eigeninference/d-inference/coordinator/api/types" | ||
| ) | ||
|
|
||
| // handleCreateKey handles POST /v1/auth/keys — creates a new consumer API key. | ||
| // Requires Privy authentication. The key is linked to the user's account so | ||
| // requests made with the key are billed to the same account. | ||
| func (s *Server) handleCreateKey(w http.ResponseWriter, r *http.Request) { | ||
| user := auth.UserFromContext(r.Context()) | ||
| if user == nil { | ||
| writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", | ||
| "API key creation requires a Privy account — authenticate with a Privy access token")) | ||
| return | ||
| } | ||
|
|
||
| key, err := s.store.CreateKeyForAccount(user.AccountID) | ||
| if err != nil { | ||
| writeJSON(w, http.StatusInternalServerError, errorResponse("server_error", "failed to create key")) | ||
| return | ||
| } | ||
| writeJSON(w, http.StatusOK, types.CreateKeyResponse{ | ||
| APIKey: key, | ||
| AccountID: user.AccountID, | ||
| }) | ||
| } | ||
|
|
||
| // handleRevokeKey handles DELETE /v1/auth/keys — revokes an API key. | ||
| // The caller must own the key (same account). Requires Privy auth so a | ||
| // compromised API key cannot revoke legitimate keys. | ||
| func (s *Server) handleRevokeKey(w http.ResponseWriter, r *http.Request) { | ||
| user := auth.UserFromContext(r.Context()) | ||
| if user == nil { | ||
| writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required")) | ||
| return | ||
| } | ||
|
|
||
| var body struct { | ||
| Key string `json:"key"` | ||
| } | ||
| if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Key == "" { | ||
| writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", "provide {\"key\": \"sk-db-...\"}")) | ||
| return | ||
| } | ||
|
|
||
| owner := s.store.GetKeyAccount(body.Key) | ||
| if owner != user.AccountID { | ||
| writeJSON(w, http.StatusForbidden, errorResponse("forbidden", "you can only revoke your own keys")) | ||
| return | ||
| } | ||
|
|
||
| if !s.store.RevokeKey(body.Key) { | ||
| writeJSON(w, http.StatusNotFound, errorResponse("not_found", "key not found or already revoked")) | ||
| return | ||
| } | ||
| s.invalidateAPIKeyCache(body.Key) | ||
|
|
||
| writeJSON(w, http.StatusOK, types.RevokeKeyResponse{Status: "revoked"}) | ||
| } | ||
|
|
||
| // apiKeyToResponse projects a stored key into its masked API representation, | ||
| // computing the current-window spend and remaining budget. | ||
| func (s *Server) apiKeyToResponse(k *store.APIKey) types.APIKeyResponse { | ||
| resp := types.APIKeyResponse{ | ||
| ID: k.ID, | ||
| Name: k.Name, | ||
| Label: k.Label, | ||
| Disabled: k.Disabled, | ||
| LimitReset: store.NormalizeResetWindow(k.LimitReset), | ||
| RPMLimit: k.RPMLimit, | ||
| ITPMLimit: k.ITPMLimit, | ||
| OTPMLimit: k.OTPMLimit, | ||
| AllowedModels: k.AllowedModels, | ||
| SelfRouteOnly: k.SelfRouteOnly, | ||
| ExpiresAt: k.ExpiresAt, | ||
| CreatedAt: k.CreatedAt, | ||
| LastUsedAt: k.LastUsedAt, | ||
| } | ||
| since := store.KeySpendWindowStart(resp.LimitReset, time.Now()) | ||
| spent := s.store.KeySpendSince(k.ID, since) | ||
| resp.UsageUSD = microToUSD(spent) | ||
| if k.LimitMicroUSD != nil { | ||
| limitUSD := microToUSD(*k.LimitMicroUSD) | ||
| resp.LimitUSD = &limitUSD | ||
| remaining := *k.LimitMicroUSD - spent | ||
| if remaining < 0 { | ||
| remaining = 0 | ||
| } | ||
| remUSD := microToUSD(remaining) | ||
| resp.RemainingUSD = &remUSD | ||
| } | ||
| return resp | ||
| } | ||
|
|
||
| // validateKeyLimitInputs sanity-checks user-supplied limit values. Returns a | ||
| // human-readable error string (empty when valid). | ||
| func validateKeyLimitInputs(reset string, limitUSD *float64, rpm, itpm, otpm *int64, expiresAt *time.Time) string { | ||
| switch reset { | ||
| case "", store.KeyResetNone, store.KeyResetDaily, store.KeyResetWeekly, store.KeyResetMonthly: | ||
| default: | ||
| return "limit_reset must be one of: none, daily, weekly, monthly" | ||
| } | ||
| if limitUSD != nil && *limitUSD < 0 { | ||
| return "limit_usd must be >= 0" | ||
| } | ||
| if rpm != nil && *rpm < 0 { | ||
| return "rpm_limit must be >= 0" | ||
| } | ||
| if itpm != nil && *itpm < 0 { | ||
| return "itpm_limit must be >= 0" | ||
| } | ||
| if otpm != nil && *otpm < 0 { | ||
| return "otpm_limit must be >= 0" | ||
| } | ||
| if expiresAt != nil && !expiresAt.IsZero() && expiresAt.Before(time.Now()) { | ||
| return "expires_at must be in the future" | ||
| } | ||
| return "" | ||
| } | ||
|
|
||
| // keyModelAllowed returns false when the calling key restricts models via an | ||
| // allow-list that does not include the requested model. Account-scoped/legacy | ||
| // keys (no key in context) and keys without an allow-list always pass. | ||
| func (s *Server) keyModelAllowed(ctx context.Context, model string) bool { | ||
| k := apiKeyFromContext(ctx) | ||
| if k == nil || len(k.AllowedModels) == 0 { | ||
| return true | ||
| } | ||
| for _, m := range k.AllowedModels { | ||
| if m == model { | ||
| return true | ||
| } | ||
| } | ||
| return false | ||
| } | ||
|
|
||
| // checkKeySpendCap reports whether charging additionalMicroUSD to the calling | ||
| // key would exceed its per-key spend cap in the current window. It returns | ||
| // (message, ok); ok=false means the request must be rejected with 402. The | ||
| // per-account balance ledger is still the hard atomic ceiling — this is the | ||
| // soft, per-key sub-cap, enforced against settled usage (so concurrent | ||
| // in-flight requests are eventually-consistent, never over the account balance). | ||
| func (s *Server) checkKeySpendCap(ctx context.Context, additionalMicroUSD int64) (string, bool) { | ||
| k := apiKeyFromContext(ctx) | ||
| if k == nil || k.ID == "" || k.LimitMicroUSD == nil { | ||
| return "", true | ||
| } | ||
| since := store.KeySpendWindowStart(k.LimitReset, time.Now()) | ||
| spent := s.store.KeySpendSince(k.ID, since) | ||
| if spent+additionalMicroUSD > *k.LimitMicroUSD { | ||
| window := store.NormalizeResetWindow(k.LimitReset) | ||
| if window == store.KeyResetNone { | ||
| window = "total" | ||
| } | ||
| return fmt.Sprintf("API key spend limit reached (%s cap $%.2f, used $%.2f) — raise this key's limit or use another key", | ||
| window, microToUSD(*k.LimitMicroUSD), microToUSD(spent)), false | ||
| } | ||
| return "", true | ||
| } | ||
|
|
||
| // handleListAPIKeys handles GET /v1/keys — lists the caller's keys (masked). | ||
| func (s *Server) handleListAPIKeys(w http.ResponseWriter, r *http.Request) { | ||
| user := auth.UserFromContext(r.Context()) | ||
| if user == nil { | ||
| writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required")) | ||
| return | ||
| } | ||
| keys, err := s.store.ListAPIKeys(user.AccountID) | ||
| if err != nil { | ||
| writeJSON(w, http.StatusInternalServerError, errorResponse("server_error", "failed to list keys")) | ||
| return | ||
| } | ||
| out := make([]types.APIKeyResponse, 0, len(keys)) | ||
| for i := range keys { | ||
| out = append(out, s.apiKeyToResponse(&keys[i])) | ||
| } | ||
| writeJSON(w, http.StatusOK, types.APIKeyListResponse{Object: "list", Data: out}) | ||
| } | ||
|
|
||
| // handleCreateAPIKey handles POST /v1/keys — mints a new named, optionally | ||
| // limited key. The raw secret is returned exactly once. | ||
| func (s *Server) handleCreateAPIKey(w http.ResponseWriter, r *http.Request) { | ||
| user := auth.UserFromContext(r.Context()) | ||
| if user == nil { | ||
| writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", | ||
| "API key creation requires a Privy account — authenticate with a Privy access token")) | ||
| return | ||
| } | ||
|
|
||
| var req createAPIKeyRequest | ||
| if r.Body != nil { | ||
| // A missing/empty body is allowed (creates a default unnamed key). | ||
| if err := json.NewDecoder(r.Body).Decode(&req); err != nil && !errors.Is(err, io.EOF) { | ||
| writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", "invalid JSON body")) | ||
| return | ||
| } | ||
| } | ||
| if msg := validateKeyLimitInputs(req.LimitReset, req.LimitUSD, req.RPMLimit, req.ITPMLimit, req.OTPMLimit, req.ExpiresAt); msg != "" { | ||
| writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", msg)) | ||
| return | ||
| } | ||
|
|
||
| opts := store.APIKeyCreate{ | ||
| Name: strings.TrimSpace(req.Name), | ||
| LimitReset: store.NormalizeResetWindow(req.LimitReset), | ||
| RPMLimit: req.RPMLimit, | ||
| ITPMLimit: req.ITPMLimit, | ||
| OTPMLimit: req.OTPMLimit, | ||
| AllowedModels: req.AllowedModels, | ||
| SelfRouteOnly: req.SelfRouteOnly, | ||
| ExpiresAt: req.ExpiresAt, | ||
| } | ||
| if req.LimitUSD != nil { | ||
| m := usdToMicro(*req.LimitUSD) | ||
| opts.LimitMicroUSD = &m | ||
| } | ||
|
|
||
| raw, rec, err := s.store.CreateAPIKey(user.AccountID, opts) | ||
| if err != nil { | ||
| writeJSON(w, http.StatusInternalServerError, errorResponse("server_error", "failed to create key")) | ||
| return | ||
| } | ||
| writeJSON(w, http.StatusOK, types.CreateAPIKeyResponse{ | ||
| Key: raw, | ||
| Data: s.apiKeyToResponse(rec), | ||
| }) | ||
| } | ||
|
|
||
| // handleGetAPIKey handles GET /v1/keys/{id}. | ||
| func (s *Server) handleGetAPIKey(w http.ResponseWriter, r *http.Request) { | ||
| user := auth.UserFromContext(r.Context()) | ||
| if user == nil { | ||
| writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required")) | ||
| return | ||
| } | ||
| id := r.PathValue("id") | ||
| k, err := s.store.GetAPIKeyByID(user.AccountID, id) | ||
| if err != nil { | ||
| writeJSON(w, http.StatusNotFound, errorResponse("not_found", "key not found")) | ||
| return | ||
| } | ||
| writeJSON(w, http.StatusOK, s.apiKeyToResponse(k)) | ||
| } | ||
|
|
||
| // handleGetCallingKey handles GET /v1/key — returns the metadata for the API | ||
| // key used to authenticate the request (OpenRouter parity). | ||
| func (s *Server) handleGetCallingKey(w http.ResponseWriter, r *http.Request) { | ||
| k := apiKeyFromContext(r.Context()) | ||
| if k == nil || k.ID == "" { | ||
| writeJSON(w, http.StatusNotFound, errorResponse("not_found", | ||
| "no key metadata — this endpoint requires API key authentication")) | ||
| return | ||
| } | ||
| writeJSON(w, http.StatusOK, s.apiKeyToResponse(k)) | ||
| } | ||
|
|
||
| // handleUpdateAPIKey handles PATCH /v1/keys/{id} — sparse update of a key's | ||
| // name, disabled flag, limits, reset window, expiry, and model allow-list. | ||
| func (s *Server) handleUpdateAPIKey(w http.ResponseWriter, r *http.Request) { | ||
| user := auth.UserFromContext(r.Context()) | ||
| if user == nil { | ||
| writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required")) | ||
| return | ||
| } | ||
| id := r.PathValue("id") | ||
| existing, err := s.store.GetAPIKeyByID(user.AccountID, id) | ||
| if err != nil { | ||
| writeJSON(w, http.StatusNotFound, errorResponse("not_found", "key not found")) | ||
| return | ||
| } | ||
|
|
||
| // Decode into a presence map so we can distinguish "field omitted" (leave | ||
| // unchanged) from "field set to null" (clear the limit). | ||
| var patch map[string]json.RawMessage | ||
| if err := json.NewDecoder(r.Body).Decode(&patch); err != nil { | ||
| writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", "invalid JSON body")) | ||
| return | ||
| } | ||
| if msg := applyKeyPatch(existing, patch); msg != "" { | ||
| writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", msg)) | ||
| return | ||
| } | ||
| if msg := validateKeyLimitInputs(existing.LimitReset, nil, existing.RPMLimit, existing.ITPMLimit, existing.OTPMLimit, existing.ExpiresAt); msg != "" { | ||
| writeJSON(w, http.StatusBadRequest, errorResponse("bad_request", msg)) | ||
| return | ||
| } | ||
|
|
||
| // Bump the auth-cache generation before AND after the mutation so a | ||
| // concurrent request cannot keep authenticating with a stale (e.g. | ||
| // just-disabled) cached record. | ||
| s.invalidateAllAPIKeyCache() | ||
| updated, err := s.store.UpdateAPIKey(user.AccountID, id, *existing) | ||
| if err != nil { | ||
| writeJSON(w, http.StatusInternalServerError, errorResponse("server_error", "failed to update key")) | ||
| return | ||
| } | ||
| s.invalidateAllAPIKeyCache() | ||
| writeJSON(w, http.StatusOK, s.apiKeyToResponse(updated)) | ||
| } | ||
|
|
||
| // applyKeyPatch merges a presence-aware PATCH body into an existing key record. | ||
| // Returns a human-readable error string on invalid input (empty when ok). | ||
| func applyKeyPatch(k *store.APIKey, patch map[string]json.RawMessage) string { | ||
| if raw, ok := patch["name"]; ok { | ||
| var name string | ||
| if err := json.Unmarshal(raw, &name); err != nil { | ||
| return "invalid value for name" | ||
| } | ||
| k.Name = strings.TrimSpace(name) | ||
| } | ||
| if raw, ok := patch["disabled"]; ok { | ||
| var disabled bool | ||
| if err := json.Unmarshal(raw, &disabled); err != nil { | ||
| return "invalid value for disabled" | ||
| } | ||
| k.Disabled = disabled | ||
| } | ||
| if raw, ok := patch["limit_reset"]; ok { | ||
| var reset string | ||
| if err := json.Unmarshal(raw, &reset); err != nil { | ||
| return "invalid value for limit_reset" | ||
| } | ||
| k.LimitReset = store.NormalizeResetWindow(reset) | ||
| } | ||
| if raw, ok := patch["limit_usd"]; ok { | ||
| if string(raw) == "null" { | ||
| k.LimitMicroUSD = nil | ||
| } else { | ||
| var usd float64 | ||
| if err := json.Unmarshal(raw, &usd); err != nil { | ||
| return "invalid value for limit_usd" | ||
| } | ||
| if usd < 0 { | ||
| return "limit_usd must be >= 0" | ||
| } | ||
| m := usdToMicro(usd) | ||
| k.LimitMicroUSD = &m | ||
| } | ||
| } | ||
| if raw, ok := patch["allowed_models"]; ok { | ||
| if string(raw) == "null" { | ||
| k.AllowedModels = nil | ||
| } else { | ||
| var models []string | ||
| if err := json.Unmarshal(raw, &models); err != nil { | ||
| return "invalid value for allowed_models" | ||
| } | ||
| k.AllowedModels = models | ||
| } | ||
| } | ||
| if raw, ok := patch["self_route_only"]; ok { | ||
| var v bool | ||
| if err := json.Unmarshal(raw, &v); err != nil { | ||
| return "invalid value for self_route_only" | ||
| } | ||
| k.SelfRouteOnly = v | ||
| } | ||
| for field, dst := range map[string]**int64{ | ||
| "rpm_limit": &k.RPMLimit, | ||
| "itpm_limit": &k.ITPMLimit, | ||
| "otpm_limit": &k.OTPMLimit, | ||
| } { | ||
| if raw, ok := patch[field]; ok { | ||
| if string(raw) == "null" { | ||
| *dst = nil | ||
| } else { | ||
| var v int64 | ||
| if err := json.Unmarshal(raw, &v); err != nil { | ||
| return "invalid value for " + field | ||
| } | ||
| *dst = &v | ||
| } | ||
| } | ||
| } | ||
| if raw, ok := patch["expires_at"]; ok { | ||
| if string(raw) == "null" { | ||
| k.ExpiresAt = nil | ||
| } else { | ||
| var t time.Time | ||
| if err := json.Unmarshal(raw, &t); err != nil { | ||
| return "invalid value for expires_at (use RFC 3339)" | ||
| } | ||
| k.ExpiresAt = &t | ||
| } | ||
| } | ||
| return "" | ||
| } | ||
|
|
||
| // handleDeleteAPIKey handles DELETE /v1/keys/{id} — permanently deletes a key. | ||
| func (s *Server) handleDeleteAPIKey(w http.ResponseWriter, r *http.Request) { | ||
| user := auth.UserFromContext(r.Context()) | ||
| if user == nil { | ||
| writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required")) | ||
| return | ||
| } | ||
| id := r.PathValue("id") | ||
| s.invalidateAllAPIKeyCache() | ||
| if err := s.store.RevokeAPIKeyByID(user.AccountID, id); err != nil { | ||
| writeJSON(w, http.StatusNotFound, errorResponse("not_found", "key not found")) | ||
| return | ||
| } | ||
| s.invalidateAllAPIKeyCache() | ||
| writeJSON(w, http.StatusOK, types.RevokeKeyResponse{Status: "revoked"}) | ||
| } | ||
|
|
||
| // handleRotateAPIKey handles POST /v1/keys/{id}/rotate — mints a fresh secret | ||
| // carrying the same limits and metadata, then deletes the old key. The new | ||
| // secret is returned exactly once. | ||
| func (s *Server) handleRotateAPIKey(w http.ResponseWriter, r *http.Request) { | ||
| user := auth.UserFromContext(r.Context()) | ||
| if user == nil { | ||
| writeJSON(w, http.StatusUnauthorized, errorResponse("auth_error", "authentication required")) | ||
| return | ||
| } | ||
| id := r.PathValue("id") | ||
| // Bump the auth-cache generation before AND after the mutation so the old | ||
| // secret stops authenticating the instant rotation commits. | ||
| s.invalidateAllAPIKeyCache() | ||
| raw, rec, err := s.store.RotateAPIKey(user.AccountID, id) | ||
| if err != nil { | ||
| writeJSON(w, http.StatusNotFound, errorResponse("not_found", "key not found")) | ||
| return | ||
| } | ||
| s.invalidateAllAPIKeyCache() | ||
| writeJSON(w, http.StatusOK, types.CreateAPIKeyResponse{ | ||
| Key: raw, | ||
| Data: s.apiKeyToResponse(rec), | ||
| }) | ||
| } |
There was a problem hiding this comment.
🔵 [INFO] 🧩 New 447-line file extracted from consumer.go but still tightly coupled to Server
💡 Suggestion: Consider making this a separate package with its own interface to reduce coupling to the main Server struct
📊 Score: 2×3 = 6 · Category: misplaced responsibility
| // Best-effort: the deposit is already credited above, but a failure here | ||
| // leaves the session marked incomplete (and replayable). Surface it. | ||
| if err := s.billing.Store().CompleteBillingSession(billingSessionID); err != nil { | ||
| s.logger.Error("stripe: failed to mark billing session complete", | ||
| "billing_session_id", billingSessionID, "error", err) | ||
| s.ddIncr("billing.session_complete_failed", nil) | ||
| } | ||
| } | ||
| if referralCode != "" { | ||
| _ = s.billing.Referral().Apply(consumerKey, referralCode) | ||
| // Best-effort: a failure here means the referrer is not credited for this | ||
| // deposit; never silently swallow it. | ||
| if err := s.billing.Referral().Apply(consumerKey, referralCode); err != nil { | ||
| s.logger.Error("stripe: failed to apply referral credit", "error", err) | ||
| s.ddIncr("billing.referral_apply_failed", nil) | ||
| } |
There was a problem hiding this comment.
🔵 [INFO] 🧩 Error handling pattern repeated for billing session completion and referral application
💡 Suggestion: Extract a helper function for the common 'log error + emit telemetry' pattern used in both error cases
📊 Score: 1×2 = 2 · Category: duplicate logic
| package api | ||
|
|
||
| // request_introspection.go holds helpers for introspecting and lightly | ||
| // reshaping inbound inference request bodies before routing/dispatch: | ||
| // token and cost estimation (routing vs billing), media/tool detection, | ||
| // remote media-URL rejection, cache-affinity key derivation, and | ||
| // provider-serial allowlist parsing. Most are pure helpers with no Server | ||
| // state; the pre-dispatch media-URL rejection (rejectRemoteMediaURLs) hangs | ||
| // off *Server only to record rejection telemetry. Split out of consumer.go | ||
| // to keep the request-handling orchestrator thin. | ||
|
|
||
| import ( | ||
| "crypto/sha256" | ||
| "encoding/json" | ||
| "fmt" | ||
| "net/http" | ||
| "os" | ||
| "strconv" | ||
| "strings" | ||
|
|
||
| "github.com/eigeninference/d-inference/coordinator/store" | ||
| ) | ||
|
|
||
| // Media prompt-token costs. A vision encoder turns each image/video into a | ||
| // bounded number of soft tokens (Gemma 4 caps around a few hundred per image) | ||
| // regardless of the base64 byte length, so counting a `data:` URI as text | ||
| // inflates the estimate by orders of magnitude — distorting routing admission and | ||
| // over-reserving balance. These flat per-media costs keep both sane. | ||
| const ( | ||
| imagePromptTokenCost = 300 | ||
| videoPromptTokenCost = 1500 | ||
| ) | ||
|
|
||
| func intFromRequestValue(v any) (int, bool) { | ||
| switch x := v.(type) { | ||
| case int: | ||
| return x, true | ||
| case int32: | ||
| return int(x), true | ||
| case int64: | ||
| return int(x), true | ||
| case float64: | ||
| return int(x), true | ||
| case json.Number: | ||
| n, err := x.Int64() | ||
| if err != nil { | ||
| return 0, false | ||
| } | ||
| return int(n), true | ||
| default: | ||
| return 0, false | ||
| } | ||
| } | ||
|
|
||
| // approximateTokenCount returns a rough token estimate for routing and queue | ||
| // admission. The len/4 heuristic is a reasonable average for English text | ||
| // with GPT-style BPE tokenizers. This value feeds into the scheduler's | ||
| // capacity checks (pendingTokenBudget, freeMemoryAdmits) where a tighter | ||
| // estimate produces better routing decisions. | ||
| // | ||
| // For billing reservation (where underestimation causes provider shortfall), | ||
| // use approximateTokenCountUpperBound instead. | ||
| func approximateTokenCount(v any) int { | ||
| if v == nil { | ||
| return 0 | ||
| } | ||
| switch x := v.(type) { | ||
| case string: | ||
| if x == "" { | ||
| return 0 | ||
| } | ||
| tokens := len(x) / 4 | ||
| if tokens < 1 { | ||
| tokens = 1 | ||
| } | ||
| return tokens | ||
| default: | ||
| b, err := json.Marshal(v) | ||
| if err != nil { | ||
| return 0 | ||
| } | ||
| tokens := len(b) / 4 | ||
| if tokens < 1 { | ||
| tokens = 1 | ||
| } | ||
| return tokens | ||
| } | ||
| } | ||
|
|
||
| // approximateTokenCountUpperBound returns a guaranteed upper bound on the | ||
| // number of tokens a BPE tokenizer would produce for v. Every BPE vocabulary | ||
| // starts with one token per byte and can only merge, so len(text) >= tokens | ||
| // for any model family, any language, forever. This is used only for billing | ||
| // reservation to ensure the pre-flight debit always covers the actual cost. | ||
| // | ||
| // Using len(text) over-reserves by ~3-4x on average for English prose, but | ||
| // the difference is refunded immediately after inference completes, so | ||
| // consumers are never overcharged — they only need sufficient balance to | ||
| // cover the reservation hold. | ||
| func approximateTokenCountUpperBound(v any) int { | ||
| if v == nil { | ||
| return 0 | ||
| } | ||
| switch x := v.(type) { | ||
| case string: | ||
| return len(x) | ||
| default: | ||
| b, err := json.Marshal(v) | ||
| if err != nil { | ||
| return 0 | ||
| } | ||
| return len(b) | ||
| } | ||
| } | ||
|
|
||
| func estimatePromptTokens(parsed map[string]any) int { | ||
| total := 0 | ||
| if v, ok := parsed["messages"]; ok { | ||
| total += messagesPromptTokens(v) | ||
| } | ||
| if v, ok := parsed["input"]; ok { | ||
| total += inputPromptTokens(v) | ||
| } | ||
| if v, ok := parsed["prompt"]; ok { | ||
| total += approximateTokenCount(v) | ||
| } | ||
| if total == 0 { | ||
| total = approximateTokenCount(parsed) | ||
| } | ||
| return total | ||
| } | ||
|
|
||
| // estimateBillingPromptTokens returns a guaranteed upper bound on prompt | ||
| // tokens for billing reservation. Uses byte-length (not len/4) so the | ||
| // pre-flight reservation always covers actual cost. This value must NOT | ||
| // be used for routing — see estimatePromptTokens for that. | ||
| func estimateBillingPromptTokens(parsed map[string]any) int { | ||
| total := 0 | ||
| if v, ok := parsed["messages"]; ok { | ||
| // Billing MUST stay a guaranteed upper bound (len(bytes) >= tokens for any | ||
| // BPE tokenizer), so it keeps counting full message bytes — including a | ||
| // base64 image's bytes and every non-content field (role, tool_calls, | ||
| // name). Switching to the media-aware flat count here would DROP those | ||
| // fields and under-reserve for tool-calling requests. Over-reservation on a | ||
| // large image is safe (it is refunded after inference); the routing/ITPM | ||
| // estimate (estimatePromptTokens) is the media-aware one. | ||
| total += approximateTokenCountUpperBound(v) | ||
| } | ||
| if v, ok := parsed["input"]; ok { | ||
| total += approximateTokenCountUpperBound(v) | ||
| } | ||
| if v, ok := parsed["prompt"]; ok { | ||
| total += approximateTokenCountUpperBound(v) | ||
| } | ||
| if total == 0 { | ||
| total = approximateTokenCountUpperBound(parsed) | ||
| } | ||
| return total | ||
| } | ||
|
|
||
| // isMediaPartType reports whether an OpenAI/OpenRouter content-part type denotes | ||
| // image or video input. | ||
| func isMediaPartType(t string) bool { | ||
| switch t { | ||
| // OpenAI chat (image_url/video_url), OpenAI Responses (input_image/input_video), | ||
| // and Anthropic /v1/messages content blocks ({"type":"image"|"video","source":…}). | ||
| case "image_url", "input_image", "image", "video_url", "input_video", "video": | ||
| return true | ||
| } | ||
| return false | ||
| } | ||
|
|
||
| // messageContentTokens estimates ROUTING prompt tokens for one message's | ||
| // `content`, counting text parts as text (len/4) and each image/video part as a | ||
| // flat media cost (never the base64 length). Used only for the routing/ITPM | ||
| // estimate; billing uses approximateTokenCountUpperBound (a guaranteed upper | ||
| // bound that intentionally still counts the base64 bytes). | ||
| func messageContentTokens(content any) int { | ||
| textTokens := func(s string) int { | ||
| if s == "" { | ||
| return 0 | ||
| } | ||
| if t := len(s) / 4; t > 0 { | ||
| return t | ||
| } | ||
| return 1 | ||
| } | ||
| switch c := content.(type) { | ||
| case string: | ||
| return textTokens(c) | ||
| case []any: | ||
| total := 0 | ||
| for _, part := range c { | ||
| pm, ok := part.(map[string]any) | ||
| if !ok { | ||
| continue | ||
| } | ||
| typ, _ := pm["type"].(string) | ||
| switch { | ||
| case typ == "text" || typ == "input_text": | ||
| if s, ok := pm["text"].(string); ok { | ||
| total += textTokens(s) | ||
| } | ||
| case typ == "image_url" || typ == "input_image" || typ == "image": | ||
| total += imagePromptTokenCost | ||
| case typ == "video_url" || typ == "input_video" || typ == "video": | ||
| total += videoPromptTokenCost | ||
| default: | ||
| if b, err := json.Marshal(pm); err == nil { | ||
| total += len(b) / 4 | ||
| } | ||
| } | ||
| } | ||
| return total | ||
| default: | ||
| return approximateTokenCount(content) | ||
| } | ||
| } | ||
|
|
||
| // messagesPromptTokens sums media-aware routing content tokens across a messages | ||
| // array. Falls back to the len/4 heuristic when messages isn't the standard | ||
| // array shape. | ||
| func messagesPromptTokens(messages any) int { | ||
| arr, ok := messages.([]any) | ||
| if !ok { | ||
| return approximateTokenCount(messages) | ||
| } | ||
| total := 0 | ||
| for _, m := range arr { | ||
| mm, ok := m.(map[string]any) | ||
| if !ok { | ||
| total += approximateTokenCount(m) | ||
| continue | ||
| } | ||
| total += 4 // small per-message framing (role + delimiters) | ||
| total += messageContentTokens(mm["content"]) | ||
| } | ||
| return total | ||
| } | ||
|
|
||
| // inputPromptTokens estimates the Responses API `input` field. A string input | ||
| // is plain text (len/4). Structured input is an array of message-like items with | ||
| // `content` parts, so reuse the same media-aware content estimator as chat | ||
| // messages instead of counting JSON wrapper bytes. | ||
| func inputPromptTokens(input any) int { | ||
| switch x := input.(type) { | ||
| case string: | ||
| return approximateTokenCount(x) | ||
| case []any: | ||
| total := 0 | ||
| for _, item := range x { | ||
| switch m := item.(type) { | ||
| case string: | ||
| total += approximateTokenCount(m) | ||
| case map[string]any: | ||
| content, ok := m["content"] | ||
| if !ok { | ||
| total += approximateTokenCount(m) | ||
| continue | ||
| } | ||
| total += 4 // role/type framing, matching messagesPromptTokens. | ||
| total += messageContentTokens(content) | ||
| default: | ||
| total += approximateTokenCount(item) | ||
| } | ||
| } | ||
| return total | ||
| default: | ||
| return approximateTokenCount(input) | ||
| } | ||
| } | ||
|
|
||
| // contentPartsHaveMedia reports whether a `content` value (a content-part array) | ||
| // carries any image/video part. | ||
| func contentPartsHaveMedia(content any) bool { | ||
| parts, ok := content.([]any) | ||
| if !ok { | ||
| return false | ||
| } | ||
| for _, part := range parts { | ||
| pm, ok := part.(map[string]any) | ||
| if !ok { | ||
| continue | ||
| } | ||
| if typ, _ := pm["type"].(string); isMediaPartType(typ) { | ||
| return true | ||
| } | ||
| } | ||
| return false | ||
| } | ||
|
|
||
| // detectMediaRequirement reports whether the request carries image/video input. | ||
| // The coordinator sees plaintext at this point (sealedTransport decrypts before | ||
| // the handler), so this drives the vision routing gate and the fail-fast "no | ||
| // vision-capable provider" response. It scans both the Chat Completions | ||
| // `messages[].content` parts and the Responses API `input[].content` parts so a | ||
| // media request on either surface is gated (never silently routed text-blind). | ||
| func detectMediaRequirement(parsed map[string]any) bool { | ||
| if messages, ok := parsed["messages"].([]any); ok { | ||
| for _, m := range messages { | ||
| if mm, ok := m.(map[string]any); ok && contentPartsHaveMedia(mm["content"]) { | ||
| return true | ||
| } | ||
| } | ||
| } | ||
| // Responses API: `input` may be a string (no media) or an array of items, | ||
| // each carrying `content` parts in the same image_url/input_image shape. | ||
| if input, ok := parsed["input"].([]any); ok { | ||
| for _, item := range input { | ||
| if im, ok := item.(map[string]any); ok && contentPartsHaveMedia(im["content"]) { | ||
| return true | ||
| } | ||
| } | ||
| } | ||
| return false | ||
| } | ||
|
|
||
| // isInlineDataURI reports whether a media reference is an inline base64 data: URI | ||
| // — the ONLY form the provider's E2E-encrypted VLM path accepts (see | ||
| // VLMRequestInference.MediaError.invalidURL, which 400s anything else). | ||
| func isInlineDataURI(s string) bool { | ||
| return strings.HasPrefix(strings.TrimSpace(s), "data:") | ||
| } | ||
|
|
||
| // mediaPartURLString returns the URL/inline reference carried by a media content | ||
| // part and whether the part IS a media part. Covers OpenAI chat (image_url/ | ||
| // video_url objects or bare strings), OpenAI Responses (input_image/input_video), | ||
| // and Anthropic source blocks ({type:"image"|"video", source:{type,…}}). A media | ||
| // part whose reference can't be read returns ("", true) so the caller fails OPEN. | ||
| func mediaPartURLString(pm map[string]any) (ref string, isMedia bool) { | ||
| typ, _ := pm["type"].(string) | ||
| if !isMediaPartType(typ) { | ||
| return "", false | ||
| } | ||
| switch typ { | ||
| case "image_url", "input_image", "video_url", "input_video": | ||
| field := "image_url" | ||
| if typ == "video_url" || typ == "input_video" { | ||
| field = "video_url" | ||
| } | ||
| switch v := pm[field].(type) { | ||
| case string: | ||
| return v, true | ||
| case map[string]any: | ||
| if u, ok := v["url"].(string); ok { | ||
| return u, true | ||
| } | ||
| } | ||
| return "", true | ||
| case "image", "video": // Anthropic source block | ||
| if src, ok := pm["source"].(map[string]any); ok { | ||
| switch st, _ := src["type"].(string); st { | ||
| case "url": | ||
| u, _ := src["url"].(string) | ||
| return u, true // remote reference | ||
| case "base64": | ||
| // Inline raw base64 (not a data: URI). Treated as inline/OK in v1 — | ||
| // the provider's Anthropic path accepts it; only remote refs are the | ||
| // production storm. Marked inline so it is never rejected here. | ||
| return "data:anthropic-inline-base64", true | ||
| } | ||
| } | ||
| return "", true | ||
| } | ||
| return "", true | ||
| } | ||
|
|
||
| // validateMediaParts walks every media part in a chat (messages[]) or Responses | ||
| // (input[]) body and returns the first REMOTE / non-inline media reference. The | ||
| // provider VLM path accepts ONLY inline data: URIs, so a remote http(s)://, | ||
| // file://, or otherwise non-data: reference is the production gemma-vision 400 | ||
| // source. Returns ok=false with the offending reference so the caller rejects it | ||
| // pre-dispatch (one clean 400) instead of dispatching and 400ing across the fleet. | ||
| // Unknown/unreadable part shapes fall through (fail-OPEN) — never wrongly 400 a | ||
| // body we don't model. | ||
| func validateMediaParts(parsed map[string]any) (badRef string, ok bool) { | ||
| check := func(content any) (string, bool) { | ||
| parts, ok := content.([]any) | ||
| if !ok { | ||
| return "", true | ||
| } | ||
| for _, p := range parts { | ||
| pm, ok := p.(map[string]any) | ||
| if !ok { | ||
| continue | ||
| } | ||
| ref, isMedia := mediaPartURLString(pm) | ||
| if !isMedia || ref == "" { | ||
| continue | ||
| } | ||
| if !isInlineDataURI(ref) { | ||
| return ref, false | ||
| } | ||
| } | ||
| return "", true | ||
| } | ||
| if msgs, ok := parsed["messages"].([]any); ok { | ||
| for _, m := range msgs { | ||
| if mm, ok := m.(map[string]any); ok { | ||
| if ref, good := check(mm["content"]); !good { | ||
| return ref, false | ||
| } | ||
| } | ||
| } | ||
| } | ||
| if input, ok := parsed["input"].([]any); ok { | ||
| for _, it := range input { | ||
| if im, ok := it.(map[string]any); ok { | ||
| if ref, good := check(im["content"]); !good { | ||
| return ref, false | ||
| } | ||
| } | ||
| } | ||
| } | ||
| return "", true | ||
| } | ||
|
|
||
| // visionRejectRemoteEnabled gates the C4 pre-dispatch remote-media-URL rejection. | ||
| // Default ON; DARKBLOOM_VISION_REJECT_REMOTE_URLS=false (or 0) restores the prior | ||
| // dispatch-then-provider-400 behavior (still bounded by C1). Read live so the | ||
| // kill switch toggles without a redeploy. | ||
| func visionRejectRemoteEnabled() bool { | ||
| v := strings.TrimSpace(os.Getenv("DARKBLOOM_VISION_REJECT_REMOTE_URLS")) | ||
| if v == "" { | ||
| return true | ||
| } | ||
| on, err := strconv.ParseBool(v) | ||
| return err != nil || on | ||
| } | ||
|
|
||
| // rejectRemoteMediaURLs fails a vision request fast (one terminal 400) when any | ||
| // media part carries a remote/non-inline URL, mirroring the provider's data:-only | ||
| // contract. Pre-dispatch — no provider is contacted. handled=true => caller returns. | ||
| func (s *Server) rejectRemoteMediaURLs(w http.ResponseWriter, r *http.Request, parsed map[string]any, model, publicModel string, requiresVision, hasTools bool) (handled bool) { | ||
| if !requiresVision || !visionRejectRemoteEnabled() { | ||
| return false | ||
| } | ||
| badRef, ok := validateMediaParts(parsed) | ||
| if ok { | ||
| return false | ||
| } | ||
| shown := badRef | ||
| if len(shown) > 200 { | ||
| shown = shown[:200] + "…" | ||
| } | ||
| stream, _ := parsed["stream"].(bool) | ||
| s.recordRejection(rejectionInfo{ | ||
| r: r, | ||
| stage: "validation", | ||
| reasonCode: "bad_param", | ||
| httpStatus: http.StatusBadRequest, | ||
| keyID: keyIDFromContext(r.Context()), | ||
| consumerKeyHash: store.HashKey(consumerKeyFromContext(r.Context())), | ||
| requestedModel: publicModel, | ||
| resolvedModel: model, | ||
| stream: stream, | ||
| requiresVision: true, | ||
| hasTools: hasTools, | ||
| params: rejectionSamplingParams(parsed), | ||
| }) | ||
| s.ddIncr("inference.media_remote_url_rejected", []string{"model:" + model}) | ||
| writeJSON(w, http.StatusBadRequest, errorResponse("invalid_request_error", | ||
| "image/video input must be an inline base64 data: URI (e.g. \"data:image/jpeg;base64,…\"); "+ | ||
| "remote http(s):// and file:// media URLs are not supported on this end-to-end-encrypted endpoint. Got: "+shown, | ||
| withParam("messages"))) | ||
| return true | ||
| } | ||
|
|
||
| // requestHasTools reports whether the request carries a non-empty top-level | ||
| // "tools" array (Chat Completions and Responses API share the field name). | ||
| // Drives Traits.HasTools so tool-bearing requests only route to providers whose | ||
| // binaries survive tool-schema template rendering (version floor + per-model | ||
| // template_render_ok gate in the scheduler). | ||
| func requestHasTools(parsed map[string]any) bool { | ||
| tools, ok := parsed["tools"].([]any) | ||
| return ok && len(tools) > 0 | ||
| } | ||
|
|
||
| func requestCacheAffinityKey(parsed map[string]any) string { | ||
| raw, ok := parsed["prompt_cache_key"].(string) | ||
| if !ok || raw == "" { | ||
| return "" | ||
| } | ||
| const maxPromptCacheKeyBytes = 512 | ||
| if len(raw) > maxPromptCacheKeyBytes { | ||
| return "" | ||
| } | ||
| sum := sha256.Sum256([]byte(raw)) | ||
| return fmt.Sprintf("%x", sum[:]) | ||
| } | ||
|
|
||
| func estimateRequestedMaxTokens(parsed map[string]any) int { | ||
| for _, key := range []string{"max_tokens", "max_completion_tokens", "max_output_tokens"} { | ||
| if n, ok := intFromRequestValue(parsed[key]); ok && n > 0 { | ||
| if copies, ok := intFromRequestValue(parsed["n"]); ok && copies > 1 { | ||
| return n * copies | ||
| } | ||
| return n | ||
| } | ||
| } | ||
| if copies, ok := intFromRequestValue(parsed["n"]); ok && copies > 1 { | ||
| return 256 * copies | ||
| } | ||
| return 256 | ||
| } | ||
|
|
||
| func parseProviderSerialAllowlist(parsed map[string]any) ([]string, bool, error) { | ||
| var rawValues []any | ||
| provided := false | ||
| for _, key := range []string{"provider_serial", "provider_serials"} { | ||
| v, ok := parsed[key] | ||
| if !ok { | ||
| continue | ||
| } | ||
| provided = true | ||
| switch x := v.(type) { | ||
| case string: | ||
| rawValues = append(rawValues, x) | ||
| case []any: | ||
| rawValues = append(rawValues, x...) | ||
| default: | ||
| return nil, true, fmt.Errorf("%s must be a string or array of strings", key) | ||
| } | ||
| } | ||
| if !provided { | ||
| return nil, false, nil | ||
| } | ||
|
|
||
| seen := make(map[string]struct{}, len(rawValues)) | ||
| ids := make([]string, 0, len(rawValues)) | ||
| for _, raw := range rawValues { | ||
| id, ok := raw.(string) | ||
| if !ok { | ||
| return nil, true, fmt.Errorf("provider_serials must contain only strings") | ||
| } | ||
| id = strings.TrimSpace(id) | ||
| if id == "" { | ||
| continue | ||
| } | ||
| if _, exists := seen[id]; exists { | ||
| continue | ||
| } | ||
| seen[id] = struct{}{} | ||
| ids = append(ids, id) | ||
| } | ||
| if len(ids) == 0 { | ||
| return nil, true, fmt.Errorf("provider_serials must include at least one provider serial") | ||
| } | ||
| return ids, true, nil | ||
| } | ||
|
|
||
| func stripProviderRoutingFields(parsed map[string]any) bool { | ||
| changed := false | ||
| for _, key := range []string{"provider_serial", "provider_serials"} { | ||
| if _, ok := parsed[key]; ok { | ||
| delete(parsed, key) | ||
| changed = true | ||
| } | ||
| } | ||
| return changed | ||
| } |
There was a problem hiding this comment.
🔵 [INFO] 🧩 Large 561-line file mixing pure functions with Server-dependent methods
💡 Suggestion: Split into pure utility functions (token counting, media detection) and Server-dependent methods (rejection handling with telemetry)
📊 Score: 2×3 = 6 · Category: misplaced responsibility
| import Foundation | ||
| #if canImport(os) | ||
| import os | ||
| #endif | ||
|
|
||
| /// Unified logger that uses os.Logger on macOS. Internal access so the | ||
| /// `ProviderLoop` companion extension files (e.g. `+SSEParser.swift`) can re-use | ||
| /// it for their file-scope loggers — `parseStreamChunk` is a `static` method and | ||
| /// can't reach the per-instance logger on the actor. | ||
| struct ProviderLogger: Sendable { | ||
| #if canImport(os) | ||
| private let osLogger: os.Logger | ||
| #endif | ||
| private let category: String | ||
|
|
||
| init(subsystem: String, category: String) { | ||
| self.category = category | ||
| #if canImport(os) | ||
| self.osLogger = os.Logger(subsystem: subsystem, category: category) | ||
| #endif | ||
| } | ||
|
|
||
| func info(_ message: String) { | ||
| #if canImport(os) | ||
| osLogger.info("\(message, privacy: .public)") | ||
| #else | ||
| print("[\(category)] INFO: \(message)") | ||
| #endif | ||
| } | ||
|
|
||
| func warning(_ message: String) { | ||
| #if canImport(os) | ||
| osLogger.warning("\(message, privacy: .public)") | ||
| #else | ||
| print("[\(category)] WARN: \(message)") | ||
| #endif | ||
| } | ||
|
|
||
| func error(_ message: String) { | ||
| #if canImport(os) | ||
| osLogger.error("\(message, privacy: .public)") | ||
| #else | ||
| print("[\(category)] ERROR: \(message)") | ||
| #endif | ||
| } | ||
| } |
There was a problem hiding this comment.
🔵 [INFO] 🧩 Custom logger wrapper around os.Logger with conditional compilation
💡 Suggestion: Consider using Swift's built-in Logger from the Logging library or a more established logging framework instead of a custom wrapper
📊 Score: 2×3 = 6 · Category: reinventing stdlib
… chore/tech-debt-godfiles Reconciles the stacked god-file branch against the advanced base, keeping BOTH this branch's god-file split AND the base's PR-#444 review fixes: - e2e: route via ReserveProviderEx through a local findRoutableProvider helper after Registry.FindProvider removal (1b69036) - provider: split KV-quant benchmark out of the shipped darkbloom into a new ProviderBenchmarkKVQuant target; kv-quant-gate/kv-attn-selftest and the KVQuant tests re-pointed to it (9cc1a60) - coordinator: tighten maybeFallbackAlias branching, behavior-preserving (0497c00) Auto-merged with no conflicts: the consumer.go god-file split and the base's maybeFallbackAlias edit touch disjoint regions. Verified green: repo-root go build/vet ./..., coordinator go test ./..., swift build (all targets), swift test (1059 tests, 74 suites). Shipped darkbloom links 0 ProviderBenchmarkKVQuant symbols; kv-quant-gate links them as expected. Co-authored-by: Cursor <cursoragent@cursor.com>
…lready contains all #444 content; resolve overlaps to ours
… (stacked on #444) (#445) * refactor(provider): delete dead single-request inference/SSE/formatter stack The production inference path is MultiModelBatchSchedulerEngine+Translation + BatchScheduler. The single-request driver, its SSE formatter, and the chat-prompt formatter were only referenced by tests. Remove them and the types they orphaned: - delete SingleRequestInference.swift, SSEChunkFormatting.swift, ChatPromptFormatting.swift - drop orphaned InferenceUsage + UsageAccumulator (UsageAccounting.swift), keeping the live StreamedGenerationUsage / CancelledGenerationTerminal - drop orphaned SSEChunk + streaming ChatCompletionChunk/ChunkChoice/ ChunkDelta (ChatRequest.swift); keep ChunkUsage + ChatCompletionResponse (still used by the live response path / InferenceLiveTests) - trim InferenceTests.swift to the live cancellation-registry test - fix the now-dangling ChatPromptFormatter mention in the translate() doc Behavior-preserving: swift build green; swift test 1049 pass / 0 fail (baseline 1054 minus the 5 removed dead tests). Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(coordinator): delete dead parallel routing implementation Remove the score-based FindProvider / FindProviderWithTrust / ScoreProvider cluster (plus the now-unused TrustMultiplier and the MaxConcurrentRequests alias) from the registry. Production dispatch routes through the cost-based ReserveProviderEx -> selectBestCandidate path; these functions were a second routing brain reachable only from tests, giving false confidence by testing behavior that diverged from production (6m vs 16m challenge freshness window, score-vs-cost selection, crashed-provider routing). Tests that exercised only the dead path (scoring math, score-based selection, dead-specific challenge boundaries, crashed-only routing) are deleted. Tests that used FindProvider as a routability probe for gates SHARED with production (privacy caps, manifest/SIP checks, trust, catalog, eviction, challenge freshness, concurrency cap) are repointed to a findRoutableProvider helper that drives the real ReserveProviderEx path; stale challenge times were bumped past the 16m production freshness window to preserve staleness coverage. No production behavior change. build/vet/test/gofmt all green. Co-authored-by: Cursor <cursoragent@cursor.com> * chore(coordinator): untrack built binary and drop dead R2 site-packages config - git rm --cached coordinator/coordinator: the 40MB built binary was tracked despite already being in .gitignore. Stop committing the artifact. - Remove Server.r2SitePackagesCDNURL plus its ServerConfig field and env wiring (EIGENINFERENCE_R2_SITE_PACKAGES_CDN_URL). It was written from env but never read: install.sh stopped substituting __DARKBLOOM_R2_SITE_PACKAGES_CDN_URL__ post-Swift-cutover (v0.5.0+), so the field was pure dead config. Co-authored-by: Cursor <cursoragent@cursor.com> * chore(coordinator): stop tracking the built coordinator binary The 40MB coordinator/coordinator artifact was tracked despite already being listed in .gitignore. Untrack it so builds don't keep committing the binary. Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(coordinator): collapse dead config->env re-export shim config/config.go only re-exported env helpers (EnvOr/FirstNonEmpty/EnvFloat/ EnvInt). FirstNonEmpty/EnvFloat/EnvInt had zero callers, and EnvOr was used only by app_config.go in the same package. Point app_config.go at env.EnvOr directly, move the package doc onto app_config.go, and delete the shim so there is a single env-helper source of truth. Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(coordinator): merge alias capacity/TTFT fallbacks into one helper maybeFallbackAliasCapacity and maybeFallbackAliasTTFT were near-identical copies differing only in the TTFT ceiling check and the failure-path return model. Merge into maybeFallbackAlias(parsed, mode, ...) with an aliasFallbackMode (capacity vs ttft) flag; capacity callers pass ttftThreshold=0. Failure-path return semantics are preserved exactly (capacity reports currentModel, TTFT reports the probed previous build). No behavior change. Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(provider): extract Benchmark into its own ProviderBenchmark target Benchmark/ (~5.2k LOC, incl. fatalError protocol-conformance stub caches) was compiled into the ProviderCore library and thus linked into every provider/enclave binary. Move it into a dedicated `ProviderBenchmark` SwiftPM target that depends on ProviderCore, so only the benchmark-bearing executables link it (darkbloom `benchmark`, kv-quant-gate, kv-attn-selftest, kv-engine-demo) and the shipped enclave/publish/core surface no longer carries benchmark code. - new target `ProviderBenchmark` (Sources/ProviderBenchmark), 18 files moved - the two engine-facing types the live scheduler needs stay in ProviderCore: KVQuantCandidateMode (split out of KVQuantTypes.swift) and the self-contained KVQuantPolicy.swift, now under ProviderCore/Inference/KVQuant - widen KVEstimation / parseModelArchitecture / resolvedKVBytesPerToken / ModelArchitecture to public (already used by the live scheduler) so the benchmark target can reach them across the module boundary - wire ProviderBenchmark into the four executables + the four benchmark test files; add `import ProviderCore` to the moved files Behavior-preserving: swift build green; swift test 1049 pass / 0 fail. Co-authored-by: Cursor <cursoragent@cursor.com> * chore(coordinator): drop dead vllm-mlx full-stack test and fix stale comments - Delete api/fullstack_integration_test.go (~1155 LOC): it spawned real vllm-mlx subprocesses (the retired subprocess backend), was gated behind LIVE_FULLSTACK_TEST=1 (never run in CI), and the repo-root e2e/ harness covers the current MLX-Swift provider over the WebSocket relay. - Refresh stale vllm-mlx comments to MLX-Swift / E2E-relay reality in consumer.go, protocol/messages.go, and registry.go (comments only; no wire-type changes). - Fix install.sh path comment (internal/api -> api) in server.go. - Reword the telemetry symmetry test comment so it states the Go test pins the shape and the Swift/TS mirrors must match, without asserting mirror tests this package cannot see. Co-authored-by: Cursor <cursoragent@cursor.com> * test(provider): add telemetry wire-symmetry tests mirroring the Go canonical coordinator/protocol/telemetry_symmetry_test.go references a Swift mirror that did not exist. Add Tests/ProviderCoreTests/TelemetrySymmetryTests.swift to pin: - enum casing (source=provider, severity=error, kind=backend_crash) - snake_case keys + nil-optional omission (machine_id/account_id/request_id/ stack/fields), matching Go omitempty - the TelemetryKind set + count (mirror of Go KnownKinds()) - TelemetrySource/TelemetrySeverity raw values Add CaseIterable to TelemetryKind (the Swift equivalent of KnownKinds()) and fix two stale `coordinator/internal/protocol|api/...` comment paths to the current `coordinator/protocol|api/...` locations. swift build green; swift test 1052 pass / 0 fail (+3 new). Co-authored-by: Cursor <cursoragent@cursor.com> * fix(coordinator): stop silently swallowing billing/credit and WS-send errors Replace `_ = ...` discards with explicit error handling + a DogStatsD counter: - Stripe webhook: CompleteBillingSession / Referral.Apply failures now log an error and increment billing.{session_complete,referral_apply}_failed instead of vanishing after a successful deposit. - Settlement: failed consumer refund and platform-fee Credit calls now log an error and increment billing.credit_failed{op} — these are financial and must never be silent (a dropped refund over-charges the consumer). - Provider WS: best-effort EnqueueText sends (runtime/trust status) now log at debug and increment provider.enqueue_failed{msg} so a wedged send is visible. Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(provider): centralize Harmony assistant-message stripping The assistant-message Harmony wrapper (role guard + content/thinking/ reasoning_content field iteration) was duplicated in JinjaSanitization and TemplateRenderCheck. Centralize it in ProviderCoreFoundation alongside the existing string transform: - add `harmonyAssistantTextFields` + `stripHarmonyFramingFromAssistantMessage` in HarmonySanitization.swift (Linux-buildable, no Apple deps) - TemplateRenderCheck now calls the shared helper (drops its private copy) - JinjaSanitization iterates the shared field-list constant - BatchScheduler already routes through the shared `stripHarmonyChannelFraming` Behavior-preserving: swift build green; swift test 1052 pass / 0 fail. Co-authored-by: Cursor <cursoragent@cursor.com> * test(coordinator): cover auth/env/telemetry; doc pendingModelLoads routing scope - auth: first unit tests for the previously-untested package — PEM/key parsing (NewPrivyAuth), ES256 JWT verification incl. expiry/issuer/audience/wrong-key/ alg-confusion rejection (VerifyToken), user provisioning (GetOrCreateUser), and Config.Check. Uses a real in-memory store and real generated keys; no mocks of the unit under test. - env: 100% coverage of EnvOr/FirstNonEmpty/EnvFloat/EnvInt/EnvBool. - telemetry: Emitter default-fill, metric counting (capturing fake sink), nil-emitter/nil-sink safety, and version defaulting. - registry: document that pendingModelLoads is consulted ONLY by warm-pool/swap planning and never participates in routing/admission (see AGENTS.md state model). Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(provider): consolidate formatDuration into DurationFormatting Four near-identical duration formatters had diverged in user-visible ways (download progress shows seconds; idle/status logs use compact "2h30m"; the scheduler uses spaced "2h 30m"; trailing zero-minute elision differed). Add one `DurationFormatting.compact` helper whose flags reproduce each style exactly, and delegate the four call-site helpers to it. Output is unchanged at every call site; the logic now lives in one place. Behavior-preserving: swift build green; swift test 1052 pass / 0 fail. Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(coordinator): extract request introspection helpers from consumer.go Move the pure request-introspection / token-estimation cluster out of the 5.7k-LOC consumer.go into a focused request_introspection.go (token + billing cost estimation, media/tool detection, cache-affinity keys, provider-serial allowlist parsing, and the media-cost constants). Same package, no behavior change — these functions have no Server dependency. consumer.go drops from ~5700 to ~5311 LOC. Co-authored-by: Cursor <cursoragent@cursor.com> * fix(provider): surface CoordinatorClient encode failures instead of silent {} Heartbeat / outbound / inference-error encoding used `try?` and degraded to hardcoded `{}` or minimal JSON on failure — silent protocol corruption the coordinator can't attribute. Replace with do/catch that logs at error and emits a `protocol_error` telemetry event, and make the inference-error path fall back to a parseable, correctly-typed payload (carrying the real request_id/error/status_code) rather than `{}`. Behavior-preserving on the success path: swift build green; swift test 1052 pass / 0 fail. Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(provider): replace force casts with guards; modernize a GCD timeout - PersistentEnclaveKey: guard the Keychain `result` optional and throw a typed keyLookupFailed instead of force-unwrapping nil (the CFTypeRef->SecKey cast is compiler-guaranteed). - ProtocolSafeQuantizedKVCache.copy(): assert the inner-copy type via guard + preconditionFailure with a clear message rather than a blind `as!`. - ProviderLoop.waitForPreloads: replace `DispatchQueue.global().asyncAfter` with a structured `Task.sleep(for:)` timeout (OneShotBoolContinuation still dedupes, so the first-resume-wins race is unchanged). Behavior-preserving: swift build green; swift test 1052 pass / 0 fail. Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(coordinator): extract API key handlers from consumer.go Move the consumer-facing API key management endpoints and their validation/response helpers (create/list/get/update/rotate/delete, apiKeyToResponse, validateKeyLimitInputs, keyModelAllowed, checkKeySpendCap, applyKeyPatch) into apikey_handlers.go, matching the existing apikey_handlers_test.go. Same package, no behavior change. consumer.go drops from ~5311 to ~4884 LOC. Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(provider): extract ProviderLoopConfig and ProviderLogger from ProviderLoop First, low-risk step of breaking up the ProviderLoop god file: move the two standalone top-level value types out of the 3.6k-line ProviderLoop.swift into their own single-responsibility files (no actor internals touched, no access loosened). The loop file now holds the actor + its behavior, not its inputs or its logger type. Note: the actor's instance-method sections are intentionally NOT split across files here — ProviderLoop deliberately keeps method extensions in-file to reach its `private` model registry without loosening ~40 members to `internal` (see the documented rationale at the local-endpoint extension). That deeper split is deferred pending an explicit encapsulation decision. Behavior-preserving: swift build green; swift test 1052 pass / 0 fail. Co-authored-by: Cursor <cursoragent@cursor.com> * test(provider): cover the darkbloom-publish hash CLI entrypoint darkbloom-publish (the security-critical model-manifest hasher used by the registry publish flow) had zero tests on its CLI surface. Add a DarkbloomPublishTests target exercising the `hash` subcommand end-to-end against a temp snapshot dir: - writes a manifest.json that decodes back to the expected id/version/files/ 64-hex aggregate - hashing is deterministic across runs on identical bytes - rejects unsafe model ids and version tags (spaces, '..', '/', empty) before any hashing happens (The ManifestBuilder library itself remains covered by ProviderCoreFoundationTests.) swift build green; swift test 1052 swift-testing + 4 new XCTest pass / 0 fail. Co-authored-by: Cursor <cursoragent@cursor.com> * chore(provider): remove unused dev-only harness targets vlm-smoke (explicitly "Safe to delete") and kv-engine-demo (DAR-318 capacity demo, "NOT a product") are dev-only executables referenced nowhere outside Package.swift — no CI, scripts, Makefile, or docs build them. Delete both (~1040 LOC) to shrink the default build surface. Kept kv-se-harness: it is the only way to exercise the Secure-Enclave KEK + Keychain round-trip on real SE hardware (a path `swift test` cannot reach), so it retains test value despite not being a product. Behavior-preserving: swift build green; swift test 1052 pass / 0 fail. Co-authored-by: Cursor <cursoragent@cursor.com> * test(coordinator): behavior-lock the five routing-eligibility gate fns Characterization suite pinning the exact current decisions of providerPassesRoutingGatesLockedEx, providerCanRouteBuildLocked, providerHasWarmModelLocked, publiclyRoutableLocked, warmPoolCandidateReasonLocked (+ modelLoadCandidatePendingLocked) across a matrix of provider states, including the intentional differences (owner self-route relaxation, breaker-ignoring preflight bypass, warm-only loaded check, model-agnostic public check, granular warm-pool reason labels). Pre-refactor safety net for the routing-gate pipeline unification. Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(provider): split ProviderLoop god-file by concern (3577->525 LOC) Split the 3,577-LOC ProviderLoop actor into focused same-module extension files mirroring the existing ProviderLoop+SSEParser/+ErrorMapping pattern: Serve, InferenceHandler, Preload, Prefetch, Testing, Trust, MemoryProtection, IdleTimeout, Capacity, AutoUpdate, ModelLoading, Cancellation, AttestationChallenge, LocalEndpoint. Method bodies are byte-identical; only access control changed. Stored state and methods reached across the split were widened private->internal (Swift private is file-scoped); purely-local members stay private. Behavior unchanged. Build green; swift test 1052/73 pass (unchanged from baseline). Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(coordinator): unify the routing-eligibility gate pipeline Five functions re-implemented overlapping subsets of the provider eligibility gates. Extract the two exactly-shared sub-pipelines into registry/routing_eligibility.go: - providerLivenessGateLocked: status/private-only/trust/runtime/ private-text/challenge-freshness, with minTrust+allowPrivate relaxation - providerServesRoutableModelLocked: catalog membership + dedicated-box rule Repoint providerPassesRoutingGatesLockedEx, providerCanRouteBuildLocked, providerHasWarmModelLocked, publiclyRoutableLocked, and modelLoadCandidatePendingLocked onto them — exact semantics preserved (verified by the characterization suite). warmPoolCandidateReasonLocked is kept separate (documented) because it emits granular per-gate reason labels that a boolean helper cannot preserve, and interleaves warm-pool-specific gates whose order determines the reported reason. Behavior-preserving: routing-gate characterization suite + full coordinator go test ./... stay green. Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(provider): split BatchScheduler god-file by concern (2503->538 LOC) Split the BatchScheduler actor core into focused same-module extension files alongside the existing +EngineBridge/+KVEstimation/+Telemetry/+Liveness/ +KVQuantScheme: ModelLifecycle, EngineFactory, CheckpointRestore, Admission, Submit, PrefixCacheSizing, Testing. Method bodies are byte-identical; only access control changed. 13 members reached across the split were widened private->internal (incl. resolving a checkpointLayerSignatures static-vs-instance name shadow); purely-local members stay private. Behavior unchanged. Build green; swift test 1052/73 pass (unchanged from baseline). Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(coordinator): extract shared inference admission preflight handleChatCompletions and handleGenericInference carried byte-identical copies (~290 lines each) of the self-route / prefer / public capacity-and-TTFT preflight: QuickCapacityCheck -> alias-capacity fallback -> unservable shed -> model-too-large -> capacity 429 / queue-spill -> no-eligible-provider shed -> TTFT gate, with identical rejections and routing.decisions metrics. Extract runInferenceAdmission (api/inference_admission.go). The only divergence (verified by diffing the two blocks) is the forward-body refresh after an alias fallback rewrites parsed["model"]: chat re-marshals its threaded rawBody (re-lowering Responses input->chat, which can 400), generic rebuilds from parsed at dispatch. That single difference is threaded as the onModelFallback callback (nil for generic). Behavior-preserving: full api go test ./api/... (incl. failover, shed, servability, dedicated, cold-dispatch, route-outcome suites) stays green. consumer.go 4884 -> 4407 LOC. Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(provider): split ModelCatalog god-file by concern (1202->171 LOC) Split ModelCatalog.swift into focused files: catalog data types stay in ModelCatalog.swift; ModelCatalogClient (coordinator catalog HTTP client), ModelDownloader core, and the ModelPrefetcher abstraction move to their own files. ModelDownloader's interleaved methods split into +Download (manifest/ legacy orchestration), +Prefetch (resume-aware prefetch), and +HTTP (file fetch/stream/hash/publish) extensions. Bodies byte-identical; ModelDownloader's 4 stored props + cross-file private methods and CatalogResponse widened private->internal. Behavior unchanged. Build green; swift test 1052/73 pass (unchanged from baseline). Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(coordinator): extract shared inference balance reservation handleChatCompletions and handleGenericInference held identical pre-flight balance reservation + per-key spend-cap blocks (reservationCost -> checkKeySpendCap -> reserveInitialBalance with the same insufficient_quota / insufficient_funds / DB-error rejections). Extract reserveInferenceBalance (api/inference_admission.go), returning (reservedMicroUSD, serviceReservation, handled). Behavior-preserving: full api go test ./api/... stays green. Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(provider): split StartCommand god-file by concern (1133->121 LOC) Split the Start command struct into same-target extension files: core keeps the command declaration, options, and run() orchestrator; serving modes (standalone/foreground/scheduled), preflight + inline login, launchd daemon install, interactive catalog picker, and the raw-mode TUI picker move to StartCommand+Modes/+Preflight/+Daemon/+Picker/+TUIPicker. Bodies byte-identical; 7 private methods reached across the split widened to internal. Behavior unchanged. Build green; swift test 1052/73 pass (unchanged from baseline). Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(coordinator): segregate store.Store into domain sub-interfaces The ~150-method Store god-interface is split into 12 cohesive domain sub-interfaces (APIKeyStore, UsageStore, TelemetryStore, LedgerStore, BillingStore, ModelRegistryStore, ReleaseStore, UserStore, DeviceAuthStore, InviteStore, ProviderEarningsStore, ProviderStore) in store/interface_domains.go; Store now embeds all 12. Pure interface refactor: every method keeps its exact signature, both MemoryStore and PostgresStore are unchanged and still satisfy the composed Store (compile-time assertions), and a reflect-based method-set dump confirms the flattened Store exposes the identical 150 methods before and after. store go test ./store/... stays green (Postgres tests run only when DATABASE_URL is set). Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(provider): split CoordinatorClient god-file by concern (1173->213 LOC) Extract self-contained top-level types into their own files (CoordinatorClient Types: events/config/messages/errors; State: atomic stats + provider state + concurrency primitives; OutboundRouter; ReachabilityMonitor) and split the actor into +Connection (reconnect/session/receive + telemetry), +Inbound (frame->event dispatch), +Registration (register + heartbeat), and +Outbound (wire encoding). Bodies byte-identical; 21 members reached across the split widened private->internal. The file-private `Logger` typealias was renamed to a unique `CoordinatorWSLogger` (kept internal) so widening the actor's logger property doesn't shadow `Logging.Logger`/`os.Logger` elsewhere in the module. Build green; swift test 1052/73 pass (unchanged from baseline). Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(coordinator): split api/consumer.go into focused files Same-package modularization of the consumer god-file (no behavior change): - httputil.go: writeJSON + the OpenAI-compatible errorResponse envelope - responses_translate.go: Responses-API request lowering (input/tools/ tool_choice/text.format -> chat-completions shape) - models_endpoints.go: GET /v1/models and /v1/models/{id} + alias listing consumer.go 4884 -> 3794 LOC. Full api go test ./api/... stays green. Co-authored-by: Cursor <cursoragent@cursor.com> * test(provider): cover doctor command pure logic (CLI test seams) Add DoctorChecksTests for buildDoctorChecks (snapshot-driven hardware/config/ model-count checks + the deterministic check backbone), describeMDMEnrollment, and CheckStatus markers. Environment-independent: only asserts snapshot-driven behavior, so host Metal/SIP/codesign state can't flake the suite. swift test 1059/74 pass (baseline 1052/73 + 7 new; no production changes). Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(coordinator): extract APNs code-attest from api/provider.go Move the v0.6.0 APNs code-identity attestation flow (codeAttestMetric, codeAttestLoop, tryCrossVersionReuse, maybeRearmCodeAttest, sendCodeIdentityChallenge, handleCodeAttestationResponse) into api/provider_codeattest.go. Same-package move, no behavior change; provider.go 3563 -> 3150 LOC. Full api go test ./api/... stays green. Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(coordinator): extract provider persistence from registry.go Move the registry<->store persistence bridge (SetStore, LoadStoredProviders, RestoreProviderState, providerRecordStats, PersistProvider[Throttled], persistReputation[Throttled], persistProviderNow) into registry/persistence.go. Same-package move, no behavior change; registry.go 4693 -> 4365 LOC. registry go test ./registry/... stays green. Co-authored-by: Cursor <cursoragent@cursor.com> * fix(e2e): route via ReserveProviderEx after FindProvider removal Round-1 cleanup deleted registry.FindProvider but only re-ran go test inside coordinator/, missing the e2e package. The lone surviving caller in TestIntegration_SwiftProviderRealRoutingGates broke the e2e build (Blacksmith e2e/Build). Mirror the in-coordinator migration: add a local findRoutableProvider helper that probes routability through the production ReserveProviderEx path (same structural/privacy/trust/challenge/capacity gates), reserves then releases capacity, and returns the selected provider. No deleted methods or shims reintroduced. Verified: go build ./... and go vet ./... (compiles e2e/) green from repo root. Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(provider): split KV-quant benchmark out of shipped darkbloom scripts/install.sh ships the `darkbloom` binary, but the `darkbloom` target depended on ProviderBenchmark, which carried the KV-quant gate/eval code AND the fatalError protocol-conformance KV-cache stubs (ProtocolSafeQuantizedKVCache, the V-only/BFloat16 caches). Those benchmark-only stubs were linking into every installed provider, defeating round-1's goal. Split ProviderBenchmark into two targets along the existing clean boundary (verified: the lightweight runners and the KVQuant/ dir do not reference each other): - ProviderBenchmark (lean): ModelBenchmark, ThroughputSweep(+report), DecodeBandwidthModel — the only benchmark code `darkbloom benchmark` needs. Still linked by `darkbloom`, so the command keeps working. - ProviderBenchmarkKVQuant (heavy, has the fatalError stubs): the KVQuant/ dir, linked ONLY by kv-quant-gate, kv-attn-selftest, and their tests — never by `darkbloom`. kv-quant-gate / kv-attn-selftest / the 3 KVQuant test files now import ProviderBenchmarkKVQuant; the darkbloom benchmark command and ThroughputSweepTests keep importing the lean ProviderBenchmark. Bundle-safe: release builds by product (`swift build -c release --product darkbloom`); product names, install paths, and LatestProviderVersion are unchanged. Verified: `swift build --build-tests` green (all targets + tests). nm proof: darkbloom links 0 ProviderBenchmarkKVQuant symbols (939 lean ProviderBenchmark symbols intact); kv-quant-gate links 3694. Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(coordinator): tighten maybeFallbackAlias branching Behavior-preserving cleanup of the alias capacity/TTFT fallback helper. Both fallback modes already share a SINGLE Previous-build capacity probe (QuickCapacityCheckWithTTFTForRequest) — the desired build is probed once at the call site and the previous build once here, on different models, so there is no redundant capacity check to remove. This reduces the residual branching instead: - merge the two identical modelShed / !IsModelInCatalog early-return guards (both pure, both return the same tuple) into one - hoist the duplicated `mode == aliasFallbackTTFT` test into a single `enforceTTFT` bool and drop the `tooSlow` temp The failure-path model selection is unchanged (TTFT mode reports the probed Previous build the caller uses as the alternate TTFT estimate; capacity mode keeps the current build it discards), so all returns are byte-identical to before. Verified: coordinator/api alias fallback tests + full `go test ./...` green. Co-authored-by: Cursor <cursoragent@cursor.com> * fix(provider): widen split-file symbols to internal (ProviderLoopError, OSAllocatedUnfairLock) — fixes #445 clean-build access errors The god-file split moved cross-file references that the warm incremental .build cache never recompiled, hiding two access violations: - ProviderLoopError (ProviderLoop.swift) is thrown from ProviderLoop+Serve.swift. The throw is inside `#if !DEBUG`, so `swift build` (debug) compiled it out and masked the error; a release build fails with "'ProviderLoopError' is inaccessible due to 'private' protection level". Widen private -> internal. - OSAllocatedUnfairLock shim (CoordinatorClientState.swift) is instantiated in OutboundRouter.swift. While `private` it silently resolved to Apple's os.OSAllocatedUnfairLock (OutboundRouter imports os) rather than the project shim. Widen private -> internal so OutboundRouter uses the intended shim (matching PongTracker/ManagedAtomic). Validated: `swift build -c release --product darkbloom` (whole-module, #if !DEBUG active) reaches "Build complete"; `swift test` -> 1059/1059 pass. Co-authored-by: Cursor <cursoragent@cursor.com> --------- Co-authored-by: Cursor <cursoragent@cursor.com>

Summary
Behavior-preserving tech-debt cleanup + first-pass modularization across the coordinator (Go) and provider (Swift), done in parallel in an isolated worktree. 20 commits, every one green (
go build/vet/test,swift build/test,gofmt). No protocol/telemetry wire changes;libs/untouched; no root files changed.Highlights:
ScoreProvider/FindProvider/FindProviderWithTrust+ orphans) and its false-confidence tests; the deadvllm-mlxfull-stack test; deadr2SitePackagesCDNURLconfig; theenv→configre-export shim; provider's dead single-request inference/SSE/formatter stack; unused dev-only targets (vlm-smoke,kv-engine-demo). The 40 MB trackedcoordinatorbinary is now untracked.Benchmark/(incl.fatalErrorstub caches) extracted into its ownProviderBenchmarkSwiftPM target, no longer linked into every provider.formatDurationeach unified to a single helper.{}on encode failure (both now log + emit telemetry).auth/tests (incl. JWT alg-confusion) +env/telemetry; provider telemetry wire-symmetry test (makes the Go symmetry test's cross-language claim real) +darkbloom-publishhashing tests.consumer.go5,700→4,884 (request_introspection.go+apikey_handlers.go);ProviderLoop.swift3,660→3,577 (ProviderLoopConfig+ProviderLogger).The deepest god-file splits (routing-gate pipeline unification,
runInferenceRequestorchestrator,store.Storeinterface segregation,registry.go/provider.gosplits; deepProviderLoop/BatchScheduleractor splits) are intentionally deferred to the stacked follow-up branchchore/tech-debt-godfiles(base = this branch) — they're either high-drift-risk or require an access-control tradeoff that warrants its own review.Before / After
Behavior — externally unchanged; dead paths and silent failures removed
flowchart LR subgraph Before R1[dispatch] --> P1["ReserveProviderEx → selectBestCandidate"] R1 -. "dead (tests only)" .-> S1["ScoreProvider / FindProvider"] B1["Stripe webhook / settlement"] --> E1["errors silently swallowed"] end subgraph After R2[dispatch] --> P2["ReserveProviderEx → selectBestCandidate"] B2["Stripe webhook / settlement"] --> E2["errors logged + DogStatsD metric"] endCode structure
flowchart TB subgraph Before C1["api/consumer.go ~5,700 LOC (inference + keys + introspection)"] PC1["ProviderCore library"] --> BM1["Benchmark/ ~4.4k LOC + fatalError stubs"] PL1["ProviderLoop.swift 3,660 (inline config/logger)"] D1["dead: single-request inference / SSE / formatter"] end subgraph After C2["api/consumer.go 4,884"] --> RI["request_introspection.go"] C2 --> AK["apikey_handlers.go"] PC2["ProviderCore library"] BM2["ProviderBenchmark target (bench / CLI only)"] PL2["ProviderLoop.swift 3,577"] --> CFG["ProviderLoopConfig.swift"] PL2 --> LOG["ProviderLogger.swift"] endTest plan
go build ./...+go vet ./...cleango test ./...green (registry suite also verified under-race)swift buildgreen;swift testno new failures (1052 swift-testing + new XCTest suites, 0 fail)gofmt -lempty; branch scoped tocoordinator/+provider-swift/onlyNotes
Protocol/mirror needs nothing.chore/tech-debt-godfiles(base = this branch).Need help on this PR? Tag
/codesmithwith what you need. Autofix is disabled.