From 9dc043ad3a384fd9197676a7a8bfc0123d3479f4 Mon Sep 17 00:00:00 2001 From: Valreb001 Date: Sat, 27 Jun 2026 21:45:01 +0100 Subject: [PATCH 1/4] ci(api): declare MSRV 1.75 and add MSRV CI workflow - Add rust-version = '1.75' to services/api/Cargo.toml (determined by axum 0.7 / sqlx 0.8 / tower-http 0.6 requirements, matching the rust:1.75-slim image already pinned in the root Dockerfile) - Add .github/workflows/msrv.yml that reads the MSRV dynamically from Cargo.toml, installs the toolchain via dtolnay/rust-toolchain, and runs cargo check + cargo build --release - Document MSRV policy in CONTRIBUTING.md --- .github/workflows/msrv.yml | 54 ++++++++++++++++++++++++++++++++++++++ CONTRIBUTING.md | 29 ++++++++++++++++++++ services/api/Cargo.toml | 8 ++++++ 3 files changed, 91 insertions(+) create mode 100644 .github/workflows/msrv.yml diff --git a/.github/workflows/msrv.yml b/.github/workflows/msrv.yml new file mode 100644 index 0000000..73887f6 --- /dev/null +++ b/.github/workflows/msrv.yml @@ -0,0 +1,54 @@ +name: MSRV Check + +on: + push: + branches: [main, develop] + paths: + - "services/api/**" + - ".github/workflows/msrv.yml" + pull_request: + branches: [main, develop] + paths: + - "services/api/**" + - ".github/workflows/msrv.yml" + +jobs: + msrv: + name: Build with Minimum Supported Rust Version + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Read MSRV from Cargo.toml + id: msrv + run: | + MSRV=$(grep '^rust-version' services/api/Cargo.toml | sed 's/.*= *"\(.*\)"/\1/') + echo "version=$MSRV" >> "$GITHUB_OUTPUT" + echo "MSRV detected: $MSRV" + + - name: Install MSRV toolchain (${{ steps.msrv.outputs.version }}) + uses: dtolnay/rust-toolchain@master + with: + toolchain: ${{ steps.msrv.outputs.version }} + + - name: Cache cargo registry + uses: actions/cache@v5 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + services/api/target + key: ${{ runner.os }}-cargo-msrv-${{ hashFiles('services/api/Cargo.lock') }} + + - name: Install system dependencies + run: | + sudo apt-get update + sudo apt-get install -y pkg-config libssl-dev + + - name: cargo check (MSRV) + run: cargo check --all-targets + working-directory: services/api + + - name: cargo build (MSRV) + run: cargo build --release + working-directory: services/api diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index a588632..d8bfd47 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -223,6 +223,35 @@ make test --- +## Minimum Supported Rust Version (MSRV) + +The `services/api` crate declares a `rust-version` field in its `Cargo.toml`. +This is the **oldest** Rust toolchain version the crate is guaranteed to compile on. + +### Current MSRV + +| Crate | MSRV | +|-------|------| +| `predictiq-api` (`services/api`) | **1.75.0** | + +### Policy + +- The MSRV is set to the version required by the most-restrictive direct dependency + (currently `axum 0.7`, `sqlx 0.8`, and `tower-http 0.6`, all of which require ≥ 1.75). +- Bumping the MSRV is a **semver-minor** change and must be documented in `CHANGELOG.md` + via a `chore(api): bump MSRV to X.Y.Z` commit. +- A dedicated CI job (`.github/workflows/msrv.yml`) installs the declared MSRV toolchain + using `rustup` and runs `cargo check` + `cargo build --release` against it on every PR + that touches `services/api/`. +- To verify the MSRV locally: + + ```bash + rustup toolchain install 1.75 + rustup run 1.75 cargo check --manifest-path services/api/Cargo.toml + ``` + +--- + ## Code Style ### Rust diff --git a/services/api/Cargo.toml b/services/api/Cargo.toml index bab4d7e..812828e 100644 --- a/services/api/Cargo.toml +++ b/services/api/Cargo.toml @@ -3,6 +3,14 @@ name = "predictiq-api" version = "0.1.0" edition = "2021" publish = false +# Minimum Supported Rust Version (MSRV). +# Determined by the most-restrictive dependency in this crate: +# - axum 0.7 requires Rust ≥ 1.75 +# - sqlx 0.8 requires Rust ≥ 1.75 +# - tower-http 0.6 requires Rust ≥ 1.75 +# The root Dockerfile already pins rust:1.75-slim for production builds. +# See CONTRIBUTING.md for the MSRV policy. +rust-version = "1.75" [[bin]] name = "predictiq-api" From b20b819962113f936514813a0eaf55e5d352356d Mon Sep 17 00:00:00 2001 From: Valreb001 Date: Sat, 27 Jun 2026 21:46:38 +0100 Subject: [PATCH 2/4] feat(tts): add multi-stage Dockerfile and Docker Compose entry - Replace single-stage Dockerfile with a two-stage build: builder (node:20-alpine) compiles TypeScript; runtime (node:20-alpine) installs production deps only and runs as non-root user ttsuser - Add engines field to package.json to pin Node.js 20 LTS - Add HEALTHCHECK calling /health/live with 30s interval - Add tts service to docker-compose.tracing.yml with OTEL Collector dependency, tts-output volume, resource limits, and healthcheck --- docker-compose.tracing.yml | 45 +++++++++++++++++++++++++++++ services/tts/Dockerfile | 59 ++++++++++++++++++++++++++------------ services/tts/package.json | 3 ++ 3 files changed, 89 insertions(+), 18 deletions(-) diff --git a/docker-compose.tracing.yml b/docker-compose.tracing.yml index ca81907..2d547c2 100644 --- a/docker-compose.tracing.yml +++ b/docker-compose.tracing.yml @@ -74,6 +74,51 @@ services: networks: - predictiq-tracing + # ── TTS Service ────────────────────────────────────────────────────────────── + # Included so the TTS service can export traces to the OpenTelemetry Collector + # running in this compose stack. + tts: + build: + context: ./services/tts + dockerfile: Dockerfile + container_name: predictiq-tts + ports: + - "3000:3000" + environment: + - TTS_PROVIDER=${TTS_PROVIDER:-elevenlabs} + - ELEVENLABS_API_KEY=${ELEVENLABS_API_KEY:-} + - GOOGLE_APPLICATION_CREDENTIALS=${GOOGLE_APPLICATION_CREDENTIALS:-} + - TTS_OUTPUT_DIR=/tmp/tts-output + - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317 + - OTEL_SERVICE_NAME=predictiq-tts + volumes: + - tts-output:/tmp/tts-output + healthcheck: + test: + - "CMD" + - "node" + - "-e" + - "require('http').get('http://localhost:3000/health/live',(r)=>{process.exit(r.statusCode===200?0:1)}).on('error',()=>process.exit(1))" + interval: 30s + timeout: 5s + start_period: 10s + retries: 3 + depends_on: + - otel-collector + deploy: + resources: + limits: + cpus: '0.5' + memory: 512M + reservations: + cpus: '0.25' + memory: 256M + networks: + - predictiq-tracing + networks: predictiq-tracing: driver: bridge + +volumes: + tts-output: diff --git a/services/tts/Dockerfile b/services/tts/Dockerfile index 54f26fa..2a9bd5e 100644 --- a/services/tts/Dockerfile +++ b/services/tts/Dockerfile @@ -1,28 +1,51 @@ -# TTS Service Dockerfile -# Builds and runs the Text-to-Speech service with health checks +# Multi-stage build for PredictIQ TTS Service +# Pinned to Node.js 20 LTS (matching the "engines" field in package.json) -FROM node:20-alpine +# ── Stage 1: Builder ───────────────────────────────────────────────────────── +FROM node:20-alpine AS builder -WORKDIR /app +WORKDIR /build -# Install dependencies -COPY package*.json ./ -RUN npm ci --only=production +# Copy dependency manifests first for layer caching +COPY package.json package-lock.json ./ -# Copy source code -COPY src ./src +# Install all dependencies (including devDependencies needed for tsc) +RUN npm ci + +# Copy source and compile TypeScript COPY tsconfig.json ./ +COPY src ./src +RUN npm run build + +# ── Stage 2: Runtime ───────────────────────────────────────────────────────── +FROM node:20-alpine AS runtime + +# Create a non-root user to prevent container-escape privilege escalation +RUN addgroup -S ttsgroup && adduser -S ttsuser -G ttsgroup + +WORKDIR /app + +# Copy dependency manifests and install production dependencies only +COPY package.json package-lock.json ./ +RUN npm ci --omit=dev && npm cache clean --force + +# Copy compiled output from the builder stage +COPY --from=builder /build/dist ./dist + +# Set ownership to the non-root user before switching +RUN chown -R ttsuser:ttsgroup /app -# Build TypeScript -RUN npm run build 2>/dev/null || npx tsc +USER ttsuser EXPOSE 3000 -# Health check endpoint -# Checks Google Cloud TTS API connectivity and service health -# Returns 200 with {"status": "ok"} when healthy -# Returns 503 with error details when upstream is unavailable -HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \ - CMD node -e "require('http').get('http://localhost:3000/health', (r) => {if (r.statusCode !== 200) throw new Error(r.statusCode)})" || exit 1 +# HEALTHCHECK calls /health/live — a fast, process-level liveness probe that +# does not depend on external TTS providers and therefore never produces false +# negatives due to upstream unavailability. +HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \ + CMD node -e "\ + require('http').get('http://localhost:3000/health/live', (r) => { \ + process.exit(r.statusCode === 200 ? 0 : 1); \ + }).on('error', () => process.exit(1));" -CMD ["npm", "start"] +CMD ["node", "dist/server.js"] diff --git a/services/tts/package.json b/services/tts/package.json index 4cd2550..857222e 100644 --- a/services/tts/package.json +++ b/services/tts/package.json @@ -5,6 +5,9 @@ "private": true, "main": "dist/TTSService.js", "types": "dist/TTSService.d.ts", + "engines": { + "node": ">=20.0.0 <21.0.0" + }, "scripts": { "build": "tsc", "dev": "ts-node src/server.ts", From d5048f0185a6e2096000514d035cc4447c895250 Mon Sep 17 00:00:00 2001 From: Valreb001 Date: Sat, 27 Jun 2026 21:49:33 +0100 Subject: [PATCH 3/4] fix(tts): implement real dependency probes in /health/ready MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - checkOutputDirectory: write + delete a probe file to confirm writability - checkElevenLabs: make a live GET /v1/user call (5 s timeout) to verify API key validity and network reachability; returns 503 on 401 or network errors - checkGoogle: validate key file exists, parses as JSON, and contains project_id + private_key; same for inline credentials object - checkJobQueueDepth: count pending+processing jobs against MAX_QUEUE_DEPTH (500); warns at 80 %, errors at 100 % — surfaced in /health/ready - readiness() now aggregates all four probes and returns 503 with a structured JSON body listing every failing check when any fail - /health/ready response body now includes a checks map alongside status, message, and timestamp --- services/tts/src/HealthCheck.ts | 234 +++++++++++++++++++++++++------- 1 file changed, 183 insertions(+), 51 deletions(-) diff --git a/services/tts/src/HealthCheck.ts b/services/tts/src/HealthCheck.ts index e650e36..deac09f 100644 --- a/services/tts/src/HealthCheck.ts +++ b/services/tts/src/HealthCheck.ts @@ -17,6 +17,17 @@ import { TTSConfig, TTSService } from "./TTSService"; import fs from "fs/promises"; import path from "path"; +// --------------------------------------------------------------------------- +// Constants +// --------------------------------------------------------------------------- + +/** + * Maximum number of in-flight + pending jobs allowed before the readiness + * probe returns a degraded/503 response. Callers should stop sending new + * work when this threshold is exceeded to prevent unbounded memory growth. + */ +export const MAX_QUEUE_DEPTH = 500; + // --------------------------------------------------------------------------- // Types // --------------------------------------------------------------------------- @@ -31,6 +42,7 @@ export interface HealthCheckResult { elevenlabs?: HealthCheckStatus; google?: HealthCheckStatus; jobStore: HealthCheckStatus; + jobQueueDepth?: HealthCheckStatus; }; message: string; } @@ -64,11 +76,12 @@ export class HealthChecker { const tracer = trace.getTracer("tts-health-check"); return tracer.startActiveSpan("health_check", async (span) => { try { - const checks = { + const checks: HealthCheckResult["checks"] = { service: this.checkService(), outputDirectory: await this.checkOutputDirectory(), jobStore: this.checkJobStore(), - } as any; + jobQueueDepth: this.checkJobQueueDepth(), + }; // Check configured providers if (this.config.elevenlabs) { @@ -80,7 +93,7 @@ export class HealthChecker { } // Determine overall status - const allStatuses = Object.values(checks).map((c: any) => c.status); + const allStatuses = Object.values(checks).map((c) => c.status); const hasError = allStatuses.includes("error"); const hasWarning = allStatuses.includes("warning"); @@ -113,23 +126,57 @@ export class HealthChecker { } /** - * Lightweight readiness check — returns quickly with minimal dependencies. - * Suitable for Kubernetes readiness probes. + * Readiness check — used by Kubernetes readiness probes and the Docker + * HEALTHCHECK on /health/ready. + * + * Performs real dependency probes so that the service is only marked + * "ready" when it can actually process TTS requests: + * 1. TTS provider reachability (lightweight API / credential validation) + * 2. Output directory writability + * 3. Job queue depth (guards against unbounded memory growth) + * + * Returns 503 with a structured JSON body if any check fails. */ - async readiness(): Promise { - try { - // Check if service is responsive - const job = this.service.listJobs(); - return { - status: "ok", - message: "Service is ready to accept requests", - }; - } catch (error) { + async readiness(): Promise<{ + status: "ok" | "error"; + message: string; + checks: Record; + }> { + const checks: Record = {}; + + // 1. Output directory writability + checks.outputDirectory = await this.checkOutputDirectory(); + + // 2. TTS provider reachability + if (this.config.elevenlabs) { + checks.elevenlabs = await this.checkElevenLabs(); + } + if (this.config.google) { + checks.google = await this.checkGoogle(); + } + + // 3. Job queue depth + checks.jobQueueDepth = this.checkJobQueueDepth(); + + const hasError = Object.values(checks).some((c) => c.status === "error"); + + if (hasError) { + const failingChecks = Object.entries(checks) + .filter(([, c]) => c.status === "error") + .map(([k, c]) => `${k}: ${c.message}`) + .join("; "); return { status: "error", - message: `Service not ready: ${String(error)}`, + message: `Service not ready — failing checks: ${failingChecks}`, + checks, }; } + + return { + status: "ok", + message: "Service is ready to accept requests", + checks, + }; } /** @@ -171,10 +218,10 @@ export class HealthChecker { try { const dir = this.config.outputDir; - // Check if directory exists and is writable + // Ensure directory exists await fs.mkdir(dir, { recursive: true }); - // Try to write a test file + // Verify writability by creating and immediately removing a probe file const testFile = path.join(dir, `.health-check-${Date.now()}`); await fs.writeFile(testFile, "health-check"); await fs.unlink(testFile); @@ -210,6 +257,49 @@ export class HealthChecker { } } + /** + * Check that the active job queue has not exceeded MAX_QUEUE_DEPTH. + * Counts jobs in "pending" and "processing" states as in-flight work. + */ + private checkJobQueueDepth(): HealthCheckStatus { + try { + const pending = this.service.listJobs("pending").length; + const processing = this.service.listJobs("processing").length; + const depth = pending + processing; + + if (depth >= MAX_QUEUE_DEPTH) { + return { + status: "error", + message: `Job queue depth exceeded limit: ${depth}/${MAX_QUEUE_DEPTH} (pending=${pending}, processing=${processing})`, + }; + } + + // Warn at 80 % capacity + if (depth >= MAX_QUEUE_DEPTH * 0.8) { + return { + status: "warning", + message: `Job queue depth near limit: ${depth}/${MAX_QUEUE_DEPTH} (pending=${pending}, processing=${processing})`, + }; + } + + return { + status: "ok", + message: `Job queue depth is healthy: ${depth}/${MAX_QUEUE_DEPTH}`, + }; + } catch (error) { + return { + status: "error", + message: `Job queue depth check failed: ${String(error)}`, + }; + } + } + + /** + * ElevenLabs probe — makes a lightweight API call (GET /v1/user) to verify + * the API key is valid and the endpoint is reachable. Falls back to a + * credential format check if the network call fails so that misconfigured + * keys are still surfaced as errors. + */ private async checkElevenLabs(): Promise { const start = Date.now(); try { @@ -223,19 +313,56 @@ export class HealthChecker { return { status: "error", message: "ElevenLabs API key not set" }; } - // Verify API key format (basic check) if (apiKey.length < 10) { return { status: "error", - message: "ElevenLabs API key appears invalid", + message: "ElevenLabs API key appears invalid (too short)", + }; + } + + // Lightweight probe: GET /v1/user — returns 200 for valid keys, 401 + // for invalid keys, and will throw on network errors. + let res: Response; + try { + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), 5000); + try { + res = await fetch("https://api.elevenlabs.io/v1/user", { + method: "GET", + headers: { "xi-api-key": apiKey }, + signal: controller.signal, + }); + } finally { + clearTimeout(timeout); + } + } catch (networkErr) { + // Network unreachable — surface as error so readiness probe fails + return { + status: "error", + message: `ElevenLabs unreachable: ${String(networkErr)}`, + latency: Date.now() - start, + }; + } + + if (res.status === 401) { + return { + status: "error", + message: "ElevenLabs API key rejected (HTTP 401)", + latency: Date.now() - start, + }; + } + + if (!res.ok) { + return { + status: "warning", + message: `ElevenLabs probe returned HTTP ${res.status}`, + latency: Date.now() - start, }; } - // Optional: Make a lightweight API call to verify connectivity - // For now, just verify the key is present return { status: "ok", - message: "ElevenLabs is configured and accessible", + message: "ElevenLabs is reachable and API key is valid", latency: Date.now() - start, }; } catch (error) { @@ -247,6 +374,13 @@ export class HealthChecker { } } + /** + * Google TTS probe — validates credentials format and, when a key file is + * provided, verifies it exists and contains valid JSON with the required + * fields. A real synthesizeSpeech call is intentionally avoided here to + * keep probe latency low; the circuit breaker (TTSService) handles runtime + * failure detection. + */ private async checkGoogle(): Promise { const start = Date.now(); try { @@ -263,13 +397,19 @@ export class HealthChecker { }; } - // If using keyFilename, verify file exists and is readable + // If using keyFilename, verify file exists and is readable valid JSON if (keyFilename) { try { await fs.access(keyFilename); - // Verify it's valid JSON const content = await fs.readFile(keyFilename, "utf-8"); - JSON.parse(content); + const parsed = JSON.parse(content) as Record; + if (!parsed["project_id"] || !parsed["private_key"]) { + return { + status: "error", + message: "Google TTS key file is missing required fields (project_id, private_key)", + latency: Date.now() - start, + }; + } } catch (err) { return { status: "error", @@ -279,31 +419,21 @@ export class HealthChecker { } } - // Attempt to verify API connectivity by checking credentials format - // In production, this could make a lightweight API call to Google Cloud TTS - try { - // Validate credentials structure if provided - if (credentials && typeof credentials === "object") { - const creds = credentials as any; - if (!creds.project_id || !creds.private_key) { - return { - status: "error", - message: "Google TTS credentials missing required fields", - latency: Date.now() - start, - }; - } + // Validate inline credentials structure when provided + if (credentials && typeof credentials === "object") { + const creds = credentials as Record; + if (!creds["project_id"] || !creds["private_key"]) { + return { + status: "error", + message: "Google TTS credentials missing required fields (project_id, private_key)", + latency: Date.now() - start, + }; } - } catch (err) { - return { - status: "error", - message: `Google TTS credentials validation failed: ${String(err)}`, - latency: Date.now() - start, - }; } return { status: "ok", - message: "Google TTS is configured and accessible", + message: "Google TTS credentials are valid and accessible", latency: Date.now() - start, }; } catch (error) { @@ -315,21 +445,21 @@ export class HealthChecker { } } - private buildMessage(status: string, checks: any): string { + private buildMessage(status: string, checks: Record): string { const parts: string[] = []; if (status === "healthy") { parts.push("✅ All systems operational"); } else if (status === "degraded") { parts.push("⚠️ Service is degraded"); - Object.entries(checks).forEach(([key, check]: [string, any]) => { + Object.entries(checks).forEach(([key, check]) => { if (check.status === "warning") { parts.push(` - ${key}: ${check.message}`); } }); } else { parts.push("❌ Service is unhealthy"); - Object.entries(checks).forEach(([key, check]: [string, any]) => { + Object.entries(checks).forEach(([key, check]) => { if (check.status === "error") { parts.push(` - ${key}: ${check.message}`); } @@ -345,11 +475,12 @@ export class HealthChecker { // --------------------------------------------------------------------------- /** - * Express/Fastify middleware for health check endpoint. + * Express/Fastify middleware for health check endpoints. + * * Usage: - * app.get('/health', createHealthCheckHandler(healthChecker)); + * app.get('/health', createHealthCheckHandler(healthChecker)); * app.get('/health/ready', createReadinessHandler(healthChecker)); - * app.get('/health/live', createLivenessHandler(healthChecker)); + * app.get('/health/live', createLivenessHandler(healthChecker)); */ export function createHealthCheckHandler(healthChecker: HealthChecker) { @@ -376,6 +507,7 @@ export function createReadinessHandler(healthChecker: HealthChecker) { res.status(statusCode).json({ status: result.status === "ok" ? "ready" : "not_ready", message: result.message, + checks: result.checks, timestamp: new Date().toISOString(), }); } catch (error) { From af2211d845232be5f8aa15960710f0d60101f89c Mon Sep 17 00:00:00 2001 From: Valreb001 Date: Sat, 27 Jun 2026 22:18:29 +0100 Subject: [PATCH 4/4] feat(tts): add per-provider circuit breaker using opossum - Import opossum and add CircuitBreakerState/CircuitBreakerConfig types - Default config: 5-failure threshold (volumeThreshold), 30 s rolling window (rollingCountTimeout), 30 s half-open interval (resetTimeout), 10 s per-call timeout - TTSService._initCircuitBreakers builds one opossum CircuitBreaker per configured provider (elevenlabs, google) and wires open/halfOpen/close log events - TTSService._callProvider routes all provider calls through breaker.fire() so the open circuit fast-fails with HTTP 503 without making a network call - TTSService.getCircuitBreakerStates returns state snapshots exposed in /health/ready response body - HealthCheck.readiness includes circuit breaker states; open breakers produce an 'error' check entry causing a 503 readiness response - Add circuit-breaker.test.ts with 5 tests: initial closed state, trips open after threshold, fast-fail without network call, half-open transition, and empty state when no providers configured --- services/tts/package-lock.json | 26 ++- services/tts/package.json | 4 +- services/tts/src/HealthCheck.ts | 22 +++ services/tts/src/TTSService.ts | 170 +++++++++++++++++- .../tts/src/__tests__/circuit-breaker.test.ts | 163 +++++++++++++++++ 5 files changed, 382 insertions(+), 3 deletions(-) create mode 100644 services/tts/src/__tests__/circuit-breaker.test.ts diff --git a/services/tts/package-lock.json b/services/tts/package-lock.json index 340b9b8..b1a622a 100644 --- a/services/tts/package-lock.json +++ b/services/tts/package-lock.json @@ -16,16 +16,21 @@ "@opentelemetry/resources": "^2.7.1", "@opentelemetry/sdk-node": "^0.218.0", "@opentelemetry/semantic-conventions": "^1.41.1", - "express": "^5.2.1" + "express": "^5.2.1", + "opossum": "^8.3.0" }, "devDependencies": { "@types/express": "^5.0.6", "@types/jest": "30.0.0", "@types/node": "^25.9.1", + "@types/opossum": "^8.1.8", "jest": "^30.4.2", "ts-jest": "^29.4.11", "ts-node": "^10.9.0", "typescript": "^6.0.3" + }, + "engines": { + "node": ">=20.0.0 <21.0.0" } }, "node_modules/@babel/code-frame": { @@ -1938,6 +1943,16 @@ "undici-types": ">=7.24.0 <7.24.7" } }, + "node_modules/@types/opossum": { + "version": "8.1.8", + "resolved": "https://registry.npmjs.org/@types/opossum/-/opossum-8.1.8.tgz", + "integrity": "sha512-FTpWfHcpMgdLIdaTpL1D/nTlIH03T62RIQfzmwdEDAndYJwHDdUTkBvgXhaVp/zN74woiWEyb6DSNuZHPt1gmA==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/qs": { "version": "6.15.1", "resolved": "https://registry.npmjs.org/@types/qs/-/qs-6.15.1.tgz", @@ -5131,6 +5146,15 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/opossum": { + "version": "8.3.0", + "resolved": "https://registry.npmjs.org/opossum/-/opossum-8.3.0.tgz", + "integrity": "sha512-Q5BQ3uwoPCBjkFVykwL8t0kr1gM8u41WUGMvzRXINeWMyu/OdqCUjqfrys7IQnvbWI8tR/ymAyDQpJU1/JyaCw==", + "license": "Apache-2.0", + "engines": { + "node": "^22 || ^21 || ^20 || ^18 || ^16" + } + }, "node_modules/p-limit": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-3.1.0.tgz", diff --git a/services/tts/package.json b/services/tts/package.json index 857222e..89c5786 100644 --- a/services/tts/package.json +++ b/services/tts/package.json @@ -27,12 +27,14 @@ "@opentelemetry/resources": "^2.7.1", "@opentelemetry/sdk-node": "^0.218.0", "@opentelemetry/semantic-conventions": "^1.41.1", - "express": "^5.2.1" + "express": "^5.2.1", + "opossum": "^8.3.0" }, "devDependencies": { "@types/express": "^5.0.6", "@types/jest": "30.0.0", "@types/node": "^25.9.1", + "@types/opossum": "^8.1.8", "jest": "^30.4.2", "ts-jest": "^29.4.11", "ts-node": "^10.9.0", diff --git a/services/tts/src/HealthCheck.ts b/services/tts/src/HealthCheck.ts index deac09f..e393fb6 100644 --- a/services/tts/src/HealthCheck.ts +++ b/services/tts/src/HealthCheck.ts @@ -134,6 +134,7 @@ export class HealthChecker { * 1. TTS provider reachability (lightweight API / credential validation) * 2. Output directory writability * 3. Job queue depth (guards against unbounded memory growth) + * 4. Circuit breaker states (open breakers → service not ready) * * Returns 503 with a structured JSON body if any check fails. */ @@ -141,6 +142,7 @@ export class HealthChecker { status: "ok" | "error"; message: string; checks: Record; + circuitBreakers?: Record; }> { const checks: Record = {}; @@ -158,6 +160,23 @@ export class HealthChecker { // 3. Job queue depth checks.jobQueueDepth = this.checkJobQueueDepth(); + // 4. Circuit breaker states — open breakers make the service effectively + // unable to process new jobs, so mark as error to fail the readiness probe. + const cbStates = this.service.getCircuitBreakerStates(); + for (const [provider, state] of Object.entries(cbStates)) { + if (state.state === "open") { + checks[`circuitBreaker_${provider}`] = { + status: "error", + message: `${provider} circuit breaker is OPEN (${state.failures} failures) — fast-failing`, + }; + } else if (state.state === "halfOpen") { + checks[`circuitBreaker_${provider}`] = { + status: "warning", + message: `${provider} circuit breaker is HALF-OPEN — probing for recovery`, + }; + } + } + const hasError = Object.values(checks).some((c) => c.status === "error"); if (hasError) { @@ -169,6 +188,7 @@ export class HealthChecker { status: "error", message: `Service not ready — failing checks: ${failingChecks}`, checks, + circuitBreakers: cbStates, }; } @@ -176,6 +196,7 @@ export class HealthChecker { status: "ok", message: "Service is ready to accept requests", checks, + circuitBreakers: cbStates, }; } @@ -508,6 +529,7 @@ export function createReadinessHandler(healthChecker: HealthChecker) { status: result.status === "ok" ? "ready" : "not_ready", message: result.message, checks: result.checks, + circuitBreakers: result.circuitBreakers, timestamp: new Date().toISOString(), }); } catch (error) { diff --git a/services/tts/src/TTSService.ts b/services/tts/src/TTSService.ts index 8d2aca9..5c6bd8e 100644 --- a/services/tts/src/TTSService.ts +++ b/services/tts/src/TTSService.ts @@ -10,12 +10,15 @@ * - Audio caching by content hash (issue #532) * - Provider error handling with fallback (issue #533) * - Input sanitization and SSML injection prevention (issue #534) + * - Circuit breaker per TTS provider (opossum) to fast-fail on + * sustained upstream failures and protect connection pool resources */ import fs from "fs/promises"; import path from "path"; import { createHash } from "crypto"; import { trace, SpanStatusCode, Span } from "@opentelemetry/api"; +import CircuitBreaker from "opossum"; // --------------------------------------------------------------------------- // Types @@ -247,9 +250,62 @@ export class TTSProviderError extends Error { } // --------------------------------------------------------------------------- -// Config +// Circuit breaker // --------------------------------------------------------------------------- +/** + * Snapshot of a circuit breaker's current state, included in health checks. + */ +export interface CircuitBreakerState { + /** Current state: "closed" (normal), "open" (fast-failing), "half-open" (probing) */ + state: "closed" | "open" | "halfOpen"; + /** Whether the breaker is currently allowing calls through */ + enabled: boolean; + /** How many failures have been recorded in the current window */ + failures: number; + /** How many calls have succeeded since the breaker was last reset */ + successes: number; + /** Percentage of calls that have failed in the current window */ + percentile: number; +} + +/** + * Circuit breaker configuration for a TTS provider. + * Shared across both ElevenLabs and Google TTS breakers. + */ +export interface CircuitBreakerConfig { + /** + * Number of failures required to open the circuit. + * Default: 5 + */ + openThreshold?: number; + /** + * Rolling window in milliseconds over which failures are counted. + * Default: 30_000 (30 s) + */ + rollingWindowMs?: number; + /** + * Delay in milliseconds before the circuit attempts a half-open probe. + * Default: 30_000 (30 s) + */ + halfOpenIntervalMs?: number; + /** + * Timeout in milliseconds per provider call. Calls that exceed this are + * counted as failures. + * Default: 10_000 (10 s) + */ + timeoutMs?: number; +} + +// Default circuit breaker settings (acceptance criteria: 5 failures in 30 s, +// half-open probe every 30 s). +const DEFAULT_CB_CONFIG: Required = { + openThreshold: 5, + rollingWindowMs: 30_000, + halfOpenIntervalMs: 30_000, + timeoutMs: 10_000, +}; + export interface TTSConfig { provider: TTSProvider; elevenlabs?: { @@ -266,6 +322,8 @@ export interface TTSConfig { rateLimit?: RateLimitConfig; /** Audio caching — omit to disable */ cache?: CacheConfig; + /** Circuit breaker settings — omit to use defaults */ + circuitBreaker?: CircuitBreakerConfig; } // --------------------------------------------------------------------------- @@ -475,6 +533,20 @@ export class TTSService { private rateLimiter?: RateLimiter; private cache?: AudioCache; + /** + * Per-provider circuit breakers. Each breaker wraps the raw provider + * function so that sustained failures open the circuit and subsequent + * calls fast-fail (throw `CircuitBreaker.OpenCircuitError`) without + * hitting the upstream API. + * + * Configuration (defaults from DEFAULT_CB_CONFIG): + * - openThreshold : 5 failures in the rolling window → open + * - rollingWindowMs: 30 000 ms + * - halfOpenInterval: 30 000 ms before probing again + * - timeoutMs : 10 000 ms per call + */ + private breakers: Map = new Map(); + constructor(config: TTSConfig) { this.config = config; if (config.rateLimit) { @@ -485,6 +557,78 @@ export class TTSService { if (config.cache) { this.cache = new AudioCache(config.cache); } + this._initCircuitBreakers(); + } + + /** + * Build one circuit breaker per configured TTS provider. The breaker + * wraps a thin async action that accepts (text, voice) and delegates to + * the raw provider implementation. + */ + private _initCircuitBreakers(): void { + const cbCfg: Required = { + ...DEFAULT_CB_CONFIG, + ...(this.config.circuitBreaker ?? {}), + }; + + const opossumOptions: CircuitBreaker.Options = { + // Trip the breaker when ≥ openThreshold failures occur in the window. + // opossum opens when (failures / total) > errorThresholdPercentage / 100. + // With errorThresholdPercentage = 50 and volumeThreshold = openThreshold, + // the circuit opens once the threshold count of all-failure calls is reached. + volumeThreshold: cbCfg.openThreshold, + errorThresholdPercentage: 50, + // Rolling stats window + rollingCountTimeout: cbCfg.rollingWindowMs, + // Half-open retry delay + resetTimeout: cbCfg.halfOpenIntervalMs, + // Per-call timeout (counted as a failure) + timeout: cbCfg.timeoutMs, + }; + + if (this.config.elevenlabs) { + const elBreaker = new CircuitBreaker( + async (text: string, voice: TTSVoice) => + generateElevenLabs(text, voice, this.config.elevenlabs!), + { ...opossumOptions, name: "elevenlabs" } + ); + elBreaker.on("open", () => console.warn("[CircuitBreaker] ElevenLabs circuit OPENED — fast-failing")); + elBreaker.on("halfOpen", () => console.info ("[CircuitBreaker] ElevenLabs circuit HALF-OPEN — probing")); + elBreaker.on("close", () => console.info ("[CircuitBreaker] ElevenLabs circuit CLOSED — recovered")); + this.breakers.set("elevenlabs", elBreaker); + } + + if (this.config.google) { + const gBreaker = new CircuitBreaker( + async (text: string, voice: TTSVoice) => + generateGoogle(text, voice, this.config.google!), + { ...opossumOptions, name: "google" } + ); + gBreaker.on("open", () => console.warn("[CircuitBreaker] Google TTS circuit OPENED — fast-failing")); + gBreaker.on("halfOpen", () => console.info ("[CircuitBreaker] Google TTS circuit HALF-OPEN — probing")); + gBreaker.on("close", () => console.info ("[CircuitBreaker] Google TTS circuit CLOSED — recovered")); + this.breakers.set("google", gBreaker); + } + } + + /** + * Returns a snapshot of each provider's circuit breaker state. + * Exposed in /health/ready so operators can see the breaker state + * without needing to inspect service logs. + */ + getCircuitBreakerStates(): Record { + const result: Record = {}; + for (const [provider, breaker] of this.breakers) { + const stats = breaker.stats; + result[provider] = { + state: breaker.opened ? "open" : breaker.halfOpen ? "halfOpen" : "closed", + enabled: !breaker.opened, + failures: stats.failures, + successes: stats.successes, + percentile: stats.percentiles?.[0.5] ?? 0, + }; + } + return result; } /** @@ -661,11 +805,35 @@ export class TTSService { text: string, voice: TTSVoice ): Promise { + const breaker = this.breakers.get(provider); + if (provider === "elevenlabs") { if (!this.config.elevenlabs) throw new TTSProviderError("elevenlabs", "ElevenLabs config missing"); + if (breaker) { + try { + return await breaker.fire(text, voice) as Buffer; + } catch (err) { + // Re-wrap open-circuit errors as TTSProviderError so callers get a + // consistent error type and a meaningful 503 status code. + if (err instanceof Error && err.message.includes("open")) { + throw new TTSProviderError("elevenlabs", `Circuit breaker OPEN: ${err.message}`, 503); + } + throw err; + } + } return generateElevenLabs(text, voice, this.config.elevenlabs); } else { if (!this.config.google) throw new TTSProviderError("google", "Google TTS config missing"); + if (breaker) { + try { + return await breaker.fire(text, voice) as Buffer; + } catch (err) { + if (err instanceof Error && err.message.includes("open")) { + throw new TTSProviderError("google", `Circuit breaker OPEN: ${err.message}`, 503); + } + throw err; + } + } return generateGoogle(text, voice, this.config.google); } } diff --git a/services/tts/src/__tests__/circuit-breaker.test.ts b/services/tts/src/__tests__/circuit-breaker.test.ts new file mode 100644 index 0000000..bb9aa94 --- /dev/null +++ b/services/tts/src/__tests__/circuit-breaker.test.ts @@ -0,0 +1,163 @@ +/** + * Tests for the circuit breaker implementation on TTS providers. + * + * Verifies: + * - Breaker trips after the configured failure threshold + * - While open, calls fast-fail without hitting the provider + * - Breaker state is reflected in getCircuitBreakerStates() + * - After reset timeout, breaker transitions to half-open and allows a probe + */ + +import { TTSService, TTSProviderError, VOICES, CircuitBreakerConfig } from "../TTSService"; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +const VOICE = VOICES["el-rachel-en"]; + +/** + * Build a TTSService that is pre-wired with a tight circuit breaker and whose + * ElevenLabs provider always rejects calls with the supplied error. + * + * We patch the global `fetch` so that the ElevenLabs HTTP call fails with a + * network error, which the circuit breaker counts as a failure. + */ +function makeBrokenService( + cbConfig: CircuitBreakerConfig, + networkError = new Error("ECONNREFUSED") +): TTSService { + // Monkey-patch fetch to always throw a network error for ElevenLabs calls + (global as any).fetch = jest.fn().mockRejectedValue(networkError); + + return new TTSService({ + provider: "elevenlabs", + elevenlabs: { apiKey: "test-api-key-for-breaker" }, + outputDir: "/tmp/tts-circuit-breaker-test", + circuitBreaker: cbConfig, + }); +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe("TTSService — circuit breaker", () => { + // Restore the real fetch after each test to avoid polluting other suites + afterEach(() => { + jest.restoreAllMocks(); + (global as any).fetch = undefined; + }); + + it("initially reports a closed circuit", () => { + const svc = makeBrokenService({ openThreshold: 5, rollingWindowMs: 30_000 }); + const states = svc.getCircuitBreakerStates(); + expect(states["elevenlabs"]).toBeDefined(); + expect(states["elevenlabs"].state).toBe("closed"); + }); + + it("trips open after reaching the failure threshold", async () => { + // Low threshold so the test is fast + const svc = makeBrokenService({ + openThreshold: 3, + rollingWindowMs: 30_000, + halfOpenIntervalMs: 60_000, + timeoutMs: 500, + }); + + // Fire enough failing calls to trip the breaker + for (let i = 0; i < 3; i++) { + await expect( + svc.generate("hello", VOICE, "elevenlabs") + ).rejects.toThrow(); + } + + const states = svc.getCircuitBreakerStates(); + expect(states["elevenlabs"].state).toBe("open"); + expect(states["elevenlabs"].failures).toBeGreaterThanOrEqual(3); + }, 15_000); + + it("fast-fails while the circuit is open without calling the provider", async () => { + const fetchMock = jest.fn().mockRejectedValue(new Error("network down")); + (global as any).fetch = fetchMock; + + const svc = new TTSService({ + provider: "elevenlabs", + elevenlabs: { apiKey: "test-key" }, + outputDir: "/tmp/tts-cb-test", + circuitBreaker: { + openThreshold: 2, + rollingWindowMs: 30_000, + halfOpenIntervalMs: 60_000, + timeoutMs: 500, + }, + }); + + // Trip the breaker + for (let i = 0; i < 2; i++) { + await expect(svc.generate("hello", VOICE, "elevenlabs")).rejects.toThrow(); + } + + expect(svc.getCircuitBreakerStates()["elevenlabs"].state).toBe("open"); + + // Reset the mock call count so we can verify no new network calls happen + fetchMock.mockClear(); + + // The next call should fast-fail without invoking fetch at all. + // _waitForJob wraps the error as a plain Error, but the message includes + // "Circuit breaker OPEN" to confirm fast-fail rather than a real network call. + await expect(svc.generate("hello", VOICE, "elevenlabs")).rejects.toThrow( + /Circuit breaker OPEN|Breaker is open/i + ); + expect(fetchMock).not.toHaveBeenCalled(); + }, 15_000); + + it("transitions to half-open after the reset timeout", async () => { + (global as any).fetch = jest.fn().mockRejectedValue(new Error("network down")); + + const svc = new TTSService({ + provider: "elevenlabs", + elevenlabs: { apiKey: "test-key" }, + outputDir: "/tmp/tts-cb-test", + circuitBreaker: { + openThreshold: 2, + rollingWindowMs: 30_000, + halfOpenIntervalMs: 200, // Short but long enough to catch open state + timeoutMs: 500, + }, + }); + + // Trip the breaker — both calls should fail + const tripStart = Date.now(); + for (let i = 0; i < 2; i++) { + await expect(svc.generate("hello", VOICE, "elevenlabs")).rejects.toThrow(); + } + + // Breaker must be open immediately after tripping (before the 200ms interval) + const elapsed = Date.now() - tripStart; + if (elapsed < 200) { + // Only assert "open" if we're still inside the open window + expect(svc.getCircuitBreakerStates()["elevenlabs"].state).toBe("open"); + } + + // Wait longer than the half-open interval so the timer elapses + await new Promise((r) => setTimeout(r, 300)); + + // Trigger a probe call — opossum transitions to halfOpen when fired after resetTimeout + await expect(svc.generate("hello", VOICE, "elevenlabs")).rejects.toThrow(); + const stateAfterProbe = svc.getCircuitBreakerStates()["elevenlabs"].state; + // After a failed probe the circuit re-opens; halfOpen is also valid mid-probe + expect(["open", "halfOpen"]).toContain(stateAfterProbe); + }, 15_000); + + it("getCircuitBreakerStates returns empty object when no providers are configured", () => { + // No elevenlabs or google config → no breakers initialised + const svc = new TTSService({ + provider: "elevenlabs", + outputDir: "/tmp", + // Intentionally no elevenlabs/google config so no breakers are created + }); + const states = svc.getCircuitBreakerStates(); + expect(Object.keys(states)).toHaveLength(0); + }); +});