From 51ae4075ecd06f57603c0f854f8976ccf8eba0cb Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 5 Dec 2025 01:13:30 +0000 Subject: [PATCH 1/2] Fix OpenAI Realtime API transcription test - Add intent=transcription to WebSocket URL for transcription-only sessions - Add session.type = transcription in session.update payload - Implement audio_to_message method to wrap audio in base64-encoded JSON events - Add InputAudioBufferAppend struct for proper audio event serialization - Update live.rs to transform audio stream before passing to WebSocket client - Add configurable sample rate support (OpenAI requires 24kHz PCM) - Add speech_started and speech_stopped event handlers for better debugging - Add base64 dependency for audio encoding Co-Authored-By: yujonglee --- Cargo.lock | 1 + owhisper/owhisper-client/Cargo.toml | 1 + owhisper/owhisper-client/src/adapter/mod.rs | 4 + .../src/adapter/openai/live.rs | 52 +++++++++++-- .../owhisper-client/src/adapter/openai/mod.rs | 37 +++------ owhisper/owhisper-client/src/live.rs | 78 ++++++++++++++----- owhisper/owhisper-client/src/test_utils.rs | 38 +++++++-- 7 files changed, 150 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 164bd63fb9..6dab1900a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11138,6 +11138,7 @@ version = "0.0.1" dependencies = [ "audio", "audio-utils", + "base64 0.22.1", "bytes", "data", "deepgram", diff --git a/owhisper/owhisper-client/Cargo.toml b/owhisper/owhisper-client/Cargo.toml index 298f09e365..16abb04021 100644 --- a/owhisper/owhisper-client/Cargo.toml +++ b/owhisper/owhisper-client/Cargo.toml @@ -17,6 +17,7 @@ tokio = { workspace = true } tokio-stream = { workspace = true } ureq = { version = "2", features = ["json"] } +base64 = "0.22.1" bytes = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/owhisper/owhisper-client/src/adapter/mod.rs b/owhisper/owhisper-client/src/adapter/mod.rs index 8ba491881d..6910133ab5 100644 --- a/owhisper/owhisper-client/src/adapter/mod.rs +++ b/owhisper/owhisper-client/src/adapter/mod.rs @@ -56,6 +56,10 @@ pub trait RealtimeSttAdapter: Clone + Default + Send + Sync + 'static { fn finalize_message(&self) -> Message; + fn audio_to_message(&self, audio: bytes::Bytes) -> Message { + Message::Binary(audio) + } + fn initial_message( &self, _api_key: Option<&str>, diff --git a/owhisper/owhisper-client/src/adapter/openai/live.rs b/owhisper/owhisper-client/src/adapter/openai/live.rs index 55189db4ad..91dae2a75b 100644 --- a/owhisper/owhisper-client/src/adapter/openai/live.rs +++ b/owhisper/owhisper-client/src/adapter/openai/live.rs @@ -16,9 +16,8 @@ impl RealtimeSttAdapter for OpenAIAdapter { false } - fn build_ws_url(&self, api_base: &str, params: &ListenParams, _channels: u8) -> url::Url { - let (mut url, existing_params) = - Self::build_ws_url_from_base(api_base, params.model.as_deref()); + fn build_ws_url(&self, api_base: &str, _params: &ListenParams, _channels: u8) -> url::Url { + let (mut url, existing_params) = Self::build_ws_url_from_base(api_base); if !existing_params.is_empty() { let mut query_pairs = url.query_pairs_mut(); @@ -38,6 +37,16 @@ impl RealtimeSttAdapter for OpenAIAdapter { None } + fn audio_to_message(&self, audio: bytes::Bytes) -> Message { + use base64::Engine; + let base64_audio = base64::engine::general_purpose::STANDARD.encode(&audio); + let event = InputAudioBufferAppend { + event_type: "input_audio_buffer.append".to_string(), + audio: base64_audio, + }; + Message::Text(serde_json::to_string(&event).unwrap().into()) + } + fn initial_message( &self, _api_key: Option<&str>, @@ -49,7 +58,10 @@ impl RealtimeSttAdapter for OpenAIAdapter { .first() .map(|l| l.iso639().code().to_string()); - let model = params.model.as_deref().unwrap_or(super::DEFAULT_MODEL); + let model = params + .model + .as_deref() + .unwrap_or(super::DEFAULT_TRANSCRIPTION_MODEL); let session_config = SessionUpdateEvent { event_type: "session.update".to_string(), @@ -59,7 +71,7 @@ impl RealtimeSttAdapter for OpenAIAdapter { input: Some(AudioInputConfig { format: Some(AudioFormat { format_type: "audio/pcm".to_string(), - rate: 24000, + rate: params.sample_rate, }), transcription: Some(TranscriptionConfig { model: model.to_string(), @@ -78,6 +90,7 @@ impl RealtimeSttAdapter for OpenAIAdapter { }; let json = serde_json::to_string(&session_config).ok()?; + tracing::debug!(payload = %json, "openai_session_update_payload"); Some(Message::Text(json.into())) } @@ -114,6 +127,14 @@ impl RealtimeSttAdapter for OpenAIAdapter { tracing::debug!("openai_audio_buffer_cleared"); vec![] } + OpenAIEvent::InputAudioBufferSpeechStarted { item_id } => { + tracing::debug!(item_id = %item_id, "openai_speech_started"); + vec![] + } + OpenAIEvent::InputAudioBufferSpeechStopped { item_id } => { + tracing::debug!(item_id = %item_id, "openai_speech_stopped"); + vec![] + } OpenAIEvent::ConversationItemInputAudioTranscriptionCompleted { item_id, content_index, @@ -226,6 +247,13 @@ struct TurnDetection { silence_duration_ms: Option, } +#[derive(Debug, Serialize)] +struct InputAudioBufferAppend { + #[serde(rename = "type")] + event_type: String, + audio: String, +} + #[derive(Debug, Serialize)] struct InputAudioBufferCommit { #[serde(rename = "type")] @@ -243,6 +271,10 @@ enum OpenAIEvent { InputAudioBufferCommitted { item_id: String }, #[serde(rename = "input_audio_buffer.cleared")] InputAudioBufferCleared, + #[serde(rename = "input_audio_buffer.speech_started")] + InputAudioBufferSpeechStarted { item_id: String }, + #[serde(rename = "input_audio_buffer.speech_stopped")] + InputAudioBufferSpeechStopped { item_id: String }, #[serde(rename = "conversation.item.input_audio_transcription.completed")] ConversationItemInputAudioTranscriptionCompleted { item_id: String, @@ -321,9 +353,11 @@ impl OpenAIAdapter { #[cfg(test)] mod tests { use super::OpenAIAdapter; - use crate::test_utils::{run_dual_test, run_single_test}; + use crate::test_utils::{run_dual_test_with_rate, run_single_test_with_rate}; use crate::ListenClient; + const OPENAI_SAMPLE_RATE: u32 = 24000; + #[tokio::test] #[ignore] async fn test_build_single() { @@ -334,11 +368,12 @@ mod tests { .params(owhisper_interface::ListenParams { model: Some("gpt-4o-transcribe".to_string()), languages: vec![hypr_language::ISO639::En.into()], + sample_rate: OPENAI_SAMPLE_RATE, ..Default::default() }) .build_single(); - run_single_test(client, "openai").await; + run_single_test_with_rate(client, "openai", OPENAI_SAMPLE_RATE).await; } #[tokio::test] @@ -351,10 +386,11 @@ mod tests { .params(owhisper_interface::ListenParams { model: Some("gpt-4o-transcribe".to_string()), languages: vec![hypr_language::ISO639::En.into()], + sample_rate: OPENAI_SAMPLE_RATE, ..Default::default() }) .build_dual(); - run_dual_test(client, "openai").await; + run_dual_test_with_rate(client, "openai", OPENAI_SAMPLE_RATE).await; } } diff --git a/owhisper/owhisper-client/src/adapter/openai/mod.rs b/owhisper/owhisper-client/src/adapter/openai/mod.rs index 59acd3bcf1..089748387f 100644 --- a/owhisper/owhisper-client/src/adapter/openai/mod.rs +++ b/owhisper/owhisper-client/src/adapter/openai/mod.rs @@ -3,7 +3,7 @@ mod live; pub(crate) const DEFAULT_WS_HOST: &str = "api.openai.com"; pub(crate) const WS_PATH: &str = "/v1/realtime"; -pub(crate) const DEFAULT_MODEL: &str = "gpt-4o-transcribe"; +pub(crate) const DEFAULT_TRANSCRIPTION_MODEL: &str = "gpt-4o-transcribe"; #[derive(Clone, Default)] pub struct OpenAIAdapter; @@ -21,17 +21,13 @@ impl OpenAIAdapter { host.contains("openai.com") } - pub(crate) fn build_ws_url_from_base( - api_base: &str, - model: Option<&str>, - ) -> (url::Url, Vec<(String, String)>) { + pub(crate) fn build_ws_url_from_base(api_base: &str) -> (url::Url, Vec<(String, String)>) { if api_base.is_empty() { - let model = model.unwrap_or(DEFAULT_MODEL); return ( format!("wss://{}{}", DEFAULT_WS_HOST, WS_PATH) .parse() .expect("invalid_default_ws_url"), - vec![("model".to_string(), model.to_string())], + vec![("intent".to_string(), "transcription".to_string())], ); } @@ -42,9 +38,8 @@ impl OpenAIAdapter { let parsed: url::Url = api_base.parse().expect("invalid_api_base"); let mut existing_params = super::extract_query_params(&parsed); - if !existing_params.iter().any(|(k, _)| k == "model") { - let model = model.unwrap_or(DEFAULT_MODEL); - existing_params.push(("model".to_string(), model.to_string())); + if !existing_params.iter().any(|(k, _)| k == "intent") { + existing_params.push(("intent".to_string(), "transcription".to_string())); } let host = parsed.host_str().unwrap_or(DEFAULT_WS_HOST); @@ -64,32 +59,18 @@ mod tests { #[test] fn test_build_ws_url_from_base_empty() { - let (url, params) = OpenAIAdapter::build_ws_url_from_base("", None); + let (url, params) = OpenAIAdapter::build_ws_url_from_base(""); assert_eq!(url.as_str(), "wss://api.openai.com/v1/realtime"); assert_eq!( params, - vec![("model".to_string(), "gpt-4o-transcribe".to_string())] - ); - } - - #[test] - fn test_build_ws_url_from_base_with_model() { - let (url, params) = - OpenAIAdapter::build_ws_url_from_base("", Some("gpt-4o-mini-realtime-preview")); - assert_eq!(url.as_str(), "wss://api.openai.com/v1/realtime"); - assert_eq!( - params, - vec![( - "model".to_string(), - "gpt-4o-mini-realtime-preview".to_string() - )] + vec![("intent".to_string(), "transcription".to_string())] ); } #[test] fn test_build_ws_url_from_base_proxy() { let (url, params) = - OpenAIAdapter::build_ws_url_from_base("https://api.hyprnote.com?provider=openai", None); + OpenAIAdapter::build_ws_url_from_base("https://api.hyprnote.com?provider=openai"); assert_eq!(url.as_str(), "wss://api.hyprnote.com/listen"); assert_eq!(params, vec![("provider".to_string(), "openai".to_string())]); } @@ -97,7 +78,7 @@ mod tests { #[test] fn test_build_ws_url_from_base_localhost() { let (url, params) = - OpenAIAdapter::build_ws_url_from_base("http://localhost:8787?provider=openai", None); + OpenAIAdapter::build_ws_url_from_base("http://localhost:8787?provider=openai"); assert_eq!(url.as_str(), "ws://localhost:8787/listen"); assert_eq!(params, vec![("provider".to_string(), "openai".to_string())]); } diff --git a/owhisper/owhisper-client/src/live.rs b/owhisper/owhisper-client/src/live.rs index 9d4ec447e0..7c1efe5676 100644 --- a/owhisper/owhisper-client/src/live.rs +++ b/owhisper/owhisper-client/src/live.rs @@ -113,11 +113,13 @@ fn interleave_audio(mic: &[u8], speaker: &[u8]) -> Vec { interleaved } +pub type TransformedInput = MixedMessage; + pub struct ListenClientIO; impl WebSocketIO for ListenClientIO { - type Data = ListenClientInput; - type Input = ListenClientInput; + type Data = TransformedInput; + type Input = TransformedInput; type Output = String; fn to_input(data: Self::Data) -> Self::Input { @@ -126,7 +128,7 @@ impl WebSocketIO for ListenClientIO { fn to_message(input: Self::Input) -> Message { match input { - MixedMessage::Audio(data) => Message::Binary(data), + MixedMessage::Audio(msg) => msg, MixedMessage::Control(control) => { Message::Text(serde_json::to_string(&control).unwrap().into()) } @@ -141,27 +143,33 @@ impl WebSocketIO for ListenClientIO { } } +pub type TransformedDualInput = MixedMessage<(bytes::Bytes, bytes::Bytes, Message), ControlMessage>; + pub struct ListenClientDualIO; impl WebSocketIO for ListenClientDualIO { - type Data = ListenClientDualInput; - type Input = ListenClientInput; + type Data = TransformedDualInput; + type Input = TransformedInput; type Output = String; fn to_input(data: Self::Data) -> Self::Input { match data { - ListenClientDualInput::Audio((mic, speaker)) => { + TransformedDualInput::Audio((mic, speaker, transform_fn_result)) => { let interleaved = interleave_audio(&mic, &speaker); - ListenClientInput::Audio(interleaved.into()) + // For native multichannel, we need to transform the interleaved audio + // But since we receive a pre-transformed message for the original audio, + // we need to handle this differently + // For now, we'll use the transform_fn_result which should be the transformed interleaved audio + TransformedInput::Audio(transform_fn_result) } - ListenClientDualInput::Control(control) => ListenClientInput::Control(control), + TransformedDualInput::Control(control) => TransformedInput::Control(control), } } fn to_message(input: Self::Input) -> Message { match input { - ListenClientInput::Audio(data) => Message::Binary(data), - ListenClientInput::Control(control) => { + TransformedInput::Audio(msg) => msg, + TransformedInput::Control(control) => { Message::Text(serde_json::to_string(&control).unwrap().into()) } } @@ -194,8 +202,18 @@ impl ListenClient { > { let finalize_text = extract_finalize_text(&self.adapter); let ws = websocket_client_with_keep_alive(&self.request, &self.adapter); + + // Transform audio stream to use adapter's audio_to_message method + let adapter_for_transform = self.adapter.clone(); + let transformed_stream = audio_stream.map(move |input| match input { + MixedMessage::Audio(data) => { + TransformedInput::Audio(adapter_for_transform.audio_to_message(data)) + } + MixedMessage::Control(control) => TransformedInput::Control(control), + }); + let (raw_stream, inner) = ws - .from_audio::(self.initial_message, audio_stream) + .from_audio::(self.initial_message, Box::pin(transformed_stream)) .await?; let adapter = self.adapter; @@ -236,8 +254,20 @@ impl ListenClientDual { ) -> Result<(DualOutputStream, DualHandle), hypr_ws::Error> { let finalize_text = extract_finalize_text(&self.adapter); let ws = websocket_client_with_keep_alive(&self.request, &self.adapter); + + // Transform audio stream to use adapter's audio_to_message method + let adapter_for_transform = self.adapter.clone(); + let transformed_stream = stream.map(move |input| match input { + MixedMessage::Audio((mic, speaker)) => { + let interleaved = interleave_audio(&mic, &speaker); + let msg = adapter_for_transform.audio_to_message(interleaved.into()); + TransformedDualInput::Audio((mic, speaker, msg)) + } + MixedMessage::Control(control) => TransformedDualInput::Control(control), + }); + let (raw_stream, inner) = ws - .from_audio::(self.initial_message, stream) + .from_audio::(self.initial_message, Box::pin(transformed_stream)) .await?; let adapter = self.adapter; @@ -262,8 +292,8 @@ impl ListenClientDual { stream: impl Stream + Send + Unpin + 'static, ) -> Result<(DualOutputStream, DualHandle), hypr_ws::Error> { let finalize_text = extract_finalize_text(&self.adapter); - let (mic_tx, mic_rx) = tokio::sync::mpsc::channel::(32); - let (spk_tx, spk_rx) = tokio::sync::mpsc::channel::(32); + let (mic_tx, mic_rx) = tokio::sync::mpsc::channel::(32); + let (spk_tx, spk_rx) = tokio::sync::mpsc::channel::(32); let mic_ws = websocket_client_with_keep_alive(&self.request, &self.adapter); let spk_ws = websocket_client_with_keep_alive(&self.request, &self.adapter); @@ -278,7 +308,12 @@ impl ListenClientDual { let ((mic_raw, mic_handle), (spk_raw, spk_handle)) = tokio::try_join!(mic_connect, spk_connect)?; - tokio::spawn(forward_dual_to_single(stream, mic_tx, spk_tx)); + tokio::spawn(forward_dual_to_single( + stream, + mic_tx, + spk_tx, + self.adapter.clone(), + )); let adapter = self.adapter.clone(); let mic_stream = mic_raw.flat_map({ @@ -318,16 +353,19 @@ impl ListenClientDual { } } -async fn forward_dual_to_single( +async fn forward_dual_to_single( mut stream: impl Stream + Send + Unpin + 'static, - mic_tx: tokio::sync::mpsc::Sender, - spk_tx: tokio::sync::mpsc::Sender, + mic_tx: tokio::sync::mpsc::Sender, + spk_tx: tokio::sync::mpsc::Sender, + adapter: A, ) { while let Some(msg) = stream.next().await { match msg { MixedMessage::Audio((mic, spk)) => { - let _ = mic_tx.try_send(MixedMessage::Audio(mic)); - let _ = spk_tx.try_send(MixedMessage::Audio(spk)); + let mic_msg = adapter.audio_to_message(mic); + let spk_msg = adapter.audio_to_message(spk); + let _ = mic_tx.try_send(MixedMessage::Audio(mic_msg)); + let _ = spk_tx.try_send(MixedMessage::Audio(spk_msg)); } MixedMessage::Control(ctrl) => { let _ = mic_tx.send(MixedMessage::Control(ctrl.clone())).await; diff --git a/owhisper/owhisper-client/src/test_utils.rs b/owhisper/owhisper-client/src/test_utils.rs index e9d10a261c..05e0560b26 100644 --- a/owhisper/owhisper-client/src/test_utils.rs +++ b/owhisper/owhisper-client/src/test_utils.rs @@ -23,17 +23,23 @@ fn chunk_samples() -> usize { 1600 } -fn sample_rate() -> u32 { +fn default_sample_rate() -> u32 { 16000 } pub fn test_audio_stream_single() -> impl Stream + Send + Unpin + 'static { + test_audio_stream_single_with_rate(default_sample_rate()) +} + +pub fn test_audio_stream_single_with_rate( + sample_rate: u32, +) -> impl Stream + Send + Unpin + 'static { let audio = rodio::Decoder::new(std::io::BufReader::new( std::fs::File::open(hypr_data::english_1::AUDIO_PATH).unwrap(), )) .unwrap() - .to_i16_le_chunks(sample_rate(), chunk_samples()); + .to_i16_le_chunks(sample_rate, chunk_samples()); Box::pin(tokio_stream::StreamExt::throttle( audio.map(|chunk| MixedMessage::Audio(chunk)), @@ -43,11 +49,17 @@ pub fn test_audio_stream_single() -> impl Stream + Sen pub fn test_audio_stream_dual() -> impl Stream + Send + Unpin + 'static { + test_audio_stream_dual_with_rate(default_sample_rate()) +} + +pub fn test_audio_stream_dual_with_rate( + sample_rate: u32, +) -> impl Stream + Send + Unpin + 'static { let audio = rodio::Decoder::new(std::io::BufReader::new( std::fs::File::open(hypr_data::english_1::AUDIO_PATH).unwrap(), )) .unwrap() - .to_i16_le_chunks(sample_rate(), chunk_samples()); + .to_i16_le_chunks(sample_rate, chunk_samples()); Box::pin(tokio_stream::StreamExt::throttle( audio.map(|chunk| MixedMessage::Audio((chunk.clone(), chunk))), @@ -56,10 +68,18 @@ pub fn test_audio_stream_dual() -> impl Stream + S } pub async fn run_single_test(client: ListenClient, provider_name: &str) { + run_single_test_with_rate(client, provider_name, default_sample_rate()).await; +} + +pub async fn run_single_test_with_rate( + client: ListenClient, + provider_name: &str, + sample_rate: u32, +) { let _ = tracing_subscriber::fmt::try_init(); let timeout = Duration::from_secs(timeout_secs()); - let input = test_audio_stream_single(); + let input = test_audio_stream_single_with_rate(sample_rate); let (stream, handle) = client.from_realtime_audio(input).await.unwrap(); futures_util::pin_mut!(stream); @@ -97,11 +117,19 @@ pub async fn run_single_test(client: ListenClient, pro pub async fn run_dual_test( client: ListenClientDual, provider_name: &str, +) { + run_dual_test_with_rate(client, provider_name, default_sample_rate()).await; +} + +pub async fn run_dual_test_with_rate( + client: ListenClientDual, + provider_name: &str, + sample_rate: u32, ) { let _ = tracing_subscriber::fmt::try_init(); let timeout = Duration::from_secs(timeout_secs()); - let input = test_audio_stream_dual(); + let input = test_audio_stream_dual_with_rate(sample_rate); let (stream, handle) = client.from_realtime_audio(input).await.unwrap(); futures_util::pin_mut!(stream); From e1835b45f2ca6a1bcfdd877240c5792401f50500 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 5 Dec 2025 01:26:45 +0000 Subject: [PATCH 2/2] Remove unused interleave_audio call in ListenClientDualIO::to_input Co-Authored-By: yujonglee --- owhisper/owhisper-client/src/live.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/owhisper/owhisper-client/src/live.rs b/owhisper/owhisper-client/src/live.rs index 7c1efe5676..1658b219d9 100644 --- a/owhisper/owhisper-client/src/live.rs +++ b/owhisper/owhisper-client/src/live.rs @@ -154,12 +154,7 @@ impl WebSocketIO for ListenClientDualIO { fn to_input(data: Self::Data) -> Self::Input { match data { - TransformedDualInput::Audio((mic, speaker, transform_fn_result)) => { - let interleaved = interleave_audio(&mic, &speaker); - // For native multichannel, we need to transform the interleaved audio - // But since we receive a pre-transformed message for the original audio, - // we need to handle this differently - // For now, we'll use the transform_fn_result which should be the transformed interleaved audio + TransformedDualInput::Audio((_, _, transform_fn_result)) => { TransformedInput::Audio(transform_fn_result) } TransformedDualInput::Control(control) => TransformedInput::Control(control),