From 9be9c1d2934b3565ce183f944d4aa339026c9faa Mon Sep 17 00:00:00 2001 From: George Helyar <3225358+ghelyar@users.noreply.github.com> Date: Wed, 18 Jun 2025 17:23:49 +0100 Subject: [PATCH 1/3] fix: always use ConnectTimeout when connecting the WebSocket --- .../DataSynchronizer/WebSocketDataSynchronizer.cs | 4 +--- src/FeatBit.ServerSdk/Transport/FbWebSocket.cs | 10 +++++----- src/FeatBit.ServerSdk/Transport/WebSocketTransport.cs | 10 ++++++++-- tests/FeatBit.ServerSdk.Tests/TestApp.cs | 3 ++- .../Transport/WebSocketsTransportTests.cs | 8 ++++++-- 5 files changed, 22 insertions(+), 13 deletions(-) diff --git a/src/FeatBit.ServerSdk/DataSynchronizer/WebSocketDataSynchronizer.cs b/src/FeatBit.ServerSdk/DataSynchronizer/WebSocketDataSynchronizer.cs index 6f61894..9f71b87 100644 --- a/src/FeatBit.ServerSdk/DataSynchronizer/WebSocketDataSynchronizer.cs +++ b/src/FeatBit.ServerSdk/DataSynchronizer/WebSocketDataSynchronizer.cs @@ -3,7 +3,6 @@ using System.Net.WebSockets; using System.Text; using System.Text.Json; -using System.Threading; using System.Threading.Tasks; using FeatBit.Sdk.Server.Concurrent; using FeatBit.Sdk.Server.Options; @@ -69,8 +68,7 @@ public Task StartAsync() { Task.Run(() => { - var cts = new CancellationTokenSource(_options.ConnectTimeout); - return _webSocket.ConnectAsync(cts.Token); + return _webSocket.ConnectAsync(); }); return _initTcs.Task; diff --git a/src/FeatBit.ServerSdk/Transport/FbWebSocket.cs b/src/FeatBit.ServerSdk/Transport/FbWebSocket.cs index 3a24a4d..e4911ad 100644 --- a/src/FeatBit.ServerSdk/Transport/FbWebSocket.cs +++ b/src/FeatBit.ServerSdk/Transport/FbWebSocket.cs @@ -24,7 +24,7 @@ internal sealed partial class FbWebSocket new ReadOnlyMemory(Encoding.UTF8.GetBytes("{\"messageType\":\"ping\",\"data\":{}}")); private readonly FbOptions _options; - private readonly Func _transportFactory; + private readonly Func _transportFactory; private readonly Func _webSocketUriResolver; private WebSocketTransport _transport; private Task _receiveTask; @@ -38,7 +38,7 @@ internal sealed partial class FbWebSocket internal FbWebSocket( FbOptions options, - Func transportFactory = null, + Func transportFactory = null, Func webSocketUriResolver = null) { _options = options; @@ -64,7 +64,7 @@ public async Task ConnectAsync(CancellationToken cancellationToken = default, bo } var transportFactory = _transportFactory ?? DefaultWebSocketTransportFactory; - var transport = transportFactory(); + var transport = transportFactory(_options); if (transport == null) { throw new InvalidOperationException("Configured WebSocketTransportFactory did not return a value."); @@ -108,9 +108,9 @@ public async Task ConnectAsync(CancellationToken cancellationToken = default, bo Log.Started(_logger); } - private WebSocketTransport DefaultWebSocketTransportFactory() + private WebSocketTransport DefaultWebSocketTransportFactory(FbOptions options) { - return new WebSocketTransport(_loggerFactory); + return new WebSocketTransport(options, _loggerFactory); } private static Uri DefaultWebSocketUriResolver(FbOptions options) diff --git a/src/FeatBit.ServerSdk/Transport/WebSocketTransport.cs b/src/FeatBit.ServerSdk/Transport/WebSocketTransport.cs index 30f469d..079bfd8 100644 --- a/src/FeatBit.ServerSdk/Transport/WebSocketTransport.cs +++ b/src/FeatBit.ServerSdk/Transport/WebSocketTransport.cs @@ -6,6 +6,7 @@ using System.Threading; using System.Threading.Tasks; using FeatBit.Sdk.Server.Http; +using FeatBit.Sdk.Server.Options; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -21,6 +22,7 @@ internal sealed partial class WebSocketTransport : IDuplexPipe public WebSocketCloseStatus? CloseStatus => _webSocket?.CloseStatus; public string CloseDescription => _webSocket?.CloseStatusDescription; + private readonly FbOptions _options; private readonly Func> _webSocketFactory; private WebSocket _webSocket; @@ -49,10 +51,12 @@ internal sealed partial class WebSocketTransport : IDuplexPipe ); public WebSocketTransport( + FbOptions options, ILoggerFactory loggerFactory = null, Func> webSocketFactory = null) { _logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger(); + _options = options; _webSocketFactory = webSocketFactory; } @@ -88,7 +92,7 @@ public async Task StartAsync( Log.StartedTransport(_logger); } - private static async Task DefaultWebSocketFactory(Uri uri, CancellationToken cancellationToken) + private async Task DefaultWebSocketFactory(Uri uri, CancellationToken cancellationToken) { var webSocket = new ClientWebSocket(); @@ -104,7 +108,9 @@ private static async Task DefaultWebSocketFactory(Uri uri, Cancellati try { - await webSocket.ConnectAsync(uri, cancellationToken); + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(_options.ConnectTimeout); + await webSocket.ConnectAsync(uri, cts.Token); } catch { diff --git a/tests/FeatBit.ServerSdk.Tests/TestApp.cs b/tests/FeatBit.ServerSdk.Tests/TestApp.cs index d9c5ea3..7df0a55 100644 --- a/tests/FeatBit.ServerSdk.Tests/TestApp.cs +++ b/tests/FeatBit.ServerSdk.Tests/TestApp.cs @@ -22,11 +22,12 @@ internal Uri GetWsUri(string op) return wsUri; } - internal WebSocketTransport CreateWebSocketTransport() + internal WebSocketTransport CreateWebSocketTransport(FbOptions options) { var client = Server.CreateWebSocketClient(); return new WebSocketTransport( + options, webSocketFactory: (uri, cancellationToken) => client.ConnectAsync(uri, cancellationToken) ); } diff --git a/tests/FeatBit.ServerSdk.Tests/Transport/WebSocketsTransportTests.cs b/tests/FeatBit.ServerSdk.Tests/Transport/WebSocketsTransportTests.cs index d24509d..8917cc7 100644 --- a/tests/FeatBit.ServerSdk.Tests/Transport/WebSocketsTransportTests.cs +++ b/tests/FeatBit.ServerSdk.Tests/Transport/WebSocketsTransportTests.cs @@ -1,6 +1,8 @@ using System.Net.WebSockets; using System.Text; +using FeatBit.Sdk.Server.Options; + namespace FeatBit.Sdk.Server.Transport; [Collection(nameof(TestApp))] @@ -16,7 +18,8 @@ public WebSocketsTransportTests(TestApp app) [Fact] public async Task StartAndStopAsync() { - var transport = _app.CreateWebSocketTransport(); + var options = new FbOptionsBuilder().Build(); + var transport = _app.CreateWebSocketTransport(options); var uri = _app.GetWsUri("echo"); await transport.StartAsync(uri); @@ -32,7 +35,8 @@ public async Task StartAndStopAsync() [Fact] public async Task SendReceiveAsync() { - var transport = _app.CreateWebSocketTransport(); + var options = new FbOptionsBuilder().Build(); + var transport = _app.CreateWebSocketTransport(options); var uri = _app.GetWsUri("echo"); await transport.StartAsync(uri); From 9ae94c406fd167f438a9113008faaefc2f0fc7e4 Mon Sep 17 00:00:00 2001 From: George Helyar <3225358+ghelyar@users.noreply.github.com> Date: Thu, 19 Jun 2025 09:07:29 +0100 Subject: [PATCH 2/3] wrap exception in TimeoutException --- .../Transport/WebSocketTransport.cs | 10 ++++++++-- .../Transport/WebSocketsTransportTests.cs | 12 ++++++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/FeatBit.ServerSdk/Transport/WebSocketTransport.cs b/src/FeatBit.ServerSdk/Transport/WebSocketTransport.cs index 079bfd8..4bfe3ab 100644 --- a/src/FeatBit.ServerSdk/Transport/WebSocketTransport.cs +++ b/src/FeatBit.ServerSdk/Transport/WebSocketTransport.cs @@ -112,9 +112,15 @@ private async Task DefaultWebSocketFactory(Uri uri, CancellationToken cts.CancelAfter(_options.ConnectTimeout); await webSocket.ConnectAsync(uri, cts.Token); } - catch + catch (Exception ex) { webSocket.Dispose(); + + if (ex is OperationCanceledException && !cancellationToken.IsCancellationRequested) + { + throw new TimeoutException("Connect timed out.", ex); + } + throw; } @@ -385,4 +391,4 @@ public async Task StopAsync() // Transport stopped. } } -} \ No newline at end of file +} diff --git a/tests/FeatBit.ServerSdk.Tests/Transport/WebSocketsTransportTests.cs b/tests/FeatBit.ServerSdk.Tests/Transport/WebSocketsTransportTests.cs index 8917cc7..da4d709 100644 --- a/tests/FeatBit.ServerSdk.Tests/Transport/WebSocketsTransportTests.cs +++ b/tests/FeatBit.ServerSdk.Tests/Transport/WebSocketsTransportTests.cs @@ -61,4 +61,16 @@ public async Task SendReceiveAsync() await transport.StopAsync(); } + + [Fact] + public async Task StartAsyncShouldThrowTimeoutExceptionWhenConnectTimesOut() + { + var options = new FbOptionsBuilder() + .ConnectTimeout(TimeSpan.FromTicks(1)) + .Build(); + var transport = new WebSocketTransport(options); + var uri = _app.GetWsUri("echo"); + + await Assert.ThrowsAsync(() => transport.StartAsync(uri)); + } } \ No newline at end of file From 1096f0935fb69ffe1219f99caa1f64a8364621ab Mon Sep 17 00:00:00 2001 From: deleteLater Date: Thu, 19 Jun 2025 23:58:28 +0800 Subject: [PATCH 3/3] nit changes --- .../WebSocketDataSynchronizer.cs | 5 +--- .../Transport/FbWebSocket.cs | 6 ++--- .../Transport/WebSocketTransport.cs | 23 +++++++------------ 3 files changed, 12 insertions(+), 22 deletions(-) diff --git a/src/FeatBit.ServerSdk/DataSynchronizer/WebSocketDataSynchronizer.cs b/src/FeatBit.ServerSdk/DataSynchronizer/WebSocketDataSynchronizer.cs index 9f71b87..1ccacf2 100644 --- a/src/FeatBit.ServerSdk/DataSynchronizer/WebSocketDataSynchronizer.cs +++ b/src/FeatBit.ServerSdk/DataSynchronizer/WebSocketDataSynchronizer.cs @@ -66,10 +66,7 @@ private static FbWebSocket DefaultFbWebSocketFactory(FbOptions options) public Task StartAsync() { - Task.Run(() => - { - return _webSocket.ConnectAsync(); - }); + Task.Run(() => _webSocket.ConnectAsync()); return _initTcs.Task; } diff --git a/src/FeatBit.ServerSdk/Transport/FbWebSocket.cs b/src/FeatBit.ServerSdk/Transport/FbWebSocket.cs index e4911ad..3df6a89 100644 --- a/src/FeatBit.ServerSdk/Transport/FbWebSocket.cs +++ b/src/FeatBit.ServerSdk/Transport/FbWebSocket.cs @@ -52,7 +52,7 @@ internal FbWebSocket( _logger = _loggerFactory.CreateLogger(); } - public async Task ConnectAsync(CancellationToken cancellationToken = default, bool isReconnecting = false) + public async Task ConnectAsync(bool isReconnecting = false, CancellationToken cancellationToken = default) { Log.Starting(_logger); @@ -75,7 +75,7 @@ public async Task ConnectAsync(CancellationToken cancellationToken = default, bo { // starts the transport Log.StartingTransport(_logger, "WebSockets", webSocketUri); - await transport.StartAsync(webSocketUri, _options.CloseTimeout, cancellationToken); + await transport.StartAsync(webSocketUri, cancellationToken); } catch (Exception ex) { @@ -254,7 +254,7 @@ private async Task ReconnectAsync() try { - await ConnectAsync(_stopCts.Token, isReconnecting: true).ConfigureAwait(false); + await ConnectAsync(isReconnecting: true, _stopCts.Token).ConfigureAwait(false); Log.Reconnected(_logger, retryTimes, DateTime.UtcNow - reconnectStartTime); diff --git a/src/FeatBit.ServerSdk/Transport/WebSocketTransport.cs b/src/FeatBit.ServerSdk/Transport/WebSocketTransport.cs index 4bfe3ab..ca123fd 100644 --- a/src/FeatBit.ServerSdk/Transport/WebSocketTransport.cs +++ b/src/FeatBit.ServerSdk/Transport/WebSocketTransport.cs @@ -29,14 +29,11 @@ internal sealed partial class WebSocketTransport : IDuplexPipe private IDuplexPipe _transport; private IDuplexPipe _application; - private TimeSpan _closeTimeout; private readonly CancellationTokenSource _stopCts = new CancellationTokenSource(); private volatile bool _aborted; private Task Running { get; set; } = Task.CompletedTask; private readonly ILogger _logger; - private static readonly TimeSpan DefaultCloseTimeout = TimeSpan.FromSeconds(5); - // 1MB private const long DefaultBufferSize = 1024 * 1024; @@ -55,18 +52,13 @@ public WebSocketTransport( ILoggerFactory loggerFactory = null, Func> webSocketFactory = null) { - _logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger(); _options = options; + _logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger(); _webSocketFactory = webSocketFactory; } - public async Task StartAsync( - Uri uri, - TimeSpan? closeTimeout = null, - CancellationToken cancellationToken = default) + public async Task StartAsync(Uri uri, CancellationToken cancellationToken = default) { - _closeTimeout = closeTimeout ?? DefaultCloseTimeout; - // Create the pipe pair (Application's writer is connected to Transport's reader, and vice versa) var pair = DuplexPipe.CreateConnectionPair(DefaultPipeOptions, DefaultPipeOptions); @@ -110,7 +102,8 @@ private async Task DefaultWebSocketFactory(Uri uri, CancellationToken { using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_options.ConnectTimeout); - await webSocket.ConnectAsync(uri, cts.Token); + + await webSocket.ConnectAsync(uri, cts.Token).ConfigureAwait(false); } catch (Exception ex) { @@ -140,7 +133,7 @@ private async Task ProcessSocketAsync(WebSocket socket) // Wait for send or receive to complete var trigger = await Task.WhenAny(receiving, sending).ConfigureAwait(false); - _stopCts.CancelAfter(_closeTimeout); + _stopCts.CancelAfter(_options.CloseTimeout); if (trigger == receiving) { @@ -151,7 +144,7 @@ private async Task ProcessSocketAsync(WebSocket socket) // Cancel the application so that ReadAsync yields _application.Input.CancelPendingRead(); - var resultTask = await Task.WhenAny(sending, Task.Delay(_closeTimeout, _stopCts.Token)) + var resultTask = await Task.WhenAny(sending, Task.Delay(_options.CloseTimeout, _stopCts.Token)) .ConfigureAwait(false); if (resultTask != sending) { @@ -371,7 +364,7 @@ public async Task StopAsync() _application.Input.CancelPendingRead(); // Start ungraceful close timer - _stopCts.CancelAfter(_closeTimeout); + _stopCts.CancelAfter(_options.CloseTimeout); try { @@ -391,4 +384,4 @@ public async Task StopAsync() // Transport stopped. } } -} +} \ No newline at end of file