diff --git a/src/FeatBit.ServerSdk/DataSynchronizer/WebSocketDataSynchronizer.cs b/src/FeatBit.ServerSdk/DataSynchronizer/WebSocketDataSynchronizer.cs index 6f61894..1ccacf2 100644 --- a/src/FeatBit.ServerSdk/DataSynchronizer/WebSocketDataSynchronizer.cs +++ b/src/FeatBit.ServerSdk/DataSynchronizer/WebSocketDataSynchronizer.cs @@ -3,7 +3,6 @@ using System.Net.WebSockets; using System.Text; using System.Text.Json; -using System.Threading; using System.Threading.Tasks; using FeatBit.Sdk.Server.Concurrent; using FeatBit.Sdk.Server.Options; @@ -67,11 +66,7 @@ private static FbWebSocket DefaultFbWebSocketFactory(FbOptions options) public Task StartAsync() { - Task.Run(() => - { - var cts = new CancellationTokenSource(_options.ConnectTimeout); - return _webSocket.ConnectAsync(cts.Token); - }); + Task.Run(() => _webSocket.ConnectAsync()); return _initTcs.Task; } diff --git a/src/FeatBit.ServerSdk/Transport/FbWebSocket.cs b/src/FeatBit.ServerSdk/Transport/FbWebSocket.cs index 3a24a4d..3df6a89 100644 --- a/src/FeatBit.ServerSdk/Transport/FbWebSocket.cs +++ b/src/FeatBit.ServerSdk/Transport/FbWebSocket.cs @@ -24,7 +24,7 @@ internal sealed partial class FbWebSocket new ReadOnlyMemory(Encoding.UTF8.GetBytes("{\"messageType\":\"ping\",\"data\":{}}")); private readonly FbOptions _options; - private readonly Func _transportFactory; + private readonly Func _transportFactory; private readonly Func _webSocketUriResolver; private WebSocketTransport _transport; private Task _receiveTask; @@ -38,7 +38,7 @@ internal sealed partial class FbWebSocket internal FbWebSocket( FbOptions options, - Func transportFactory = null, + Func transportFactory = null, Func webSocketUriResolver = null) { _options = options; @@ -52,7 +52,7 @@ internal FbWebSocket( _logger = _loggerFactory.CreateLogger(); } - public async Task ConnectAsync(CancellationToken cancellationToken = default, bool isReconnecting = false) + public async Task ConnectAsync(bool isReconnecting = false, CancellationToken cancellationToken = default) { Log.Starting(_logger); @@ -64,7 +64,7 @@ public async Task ConnectAsync(CancellationToken cancellationToken = default, bo } var transportFactory = _transportFactory ?? DefaultWebSocketTransportFactory; - var transport = transportFactory(); + var transport = transportFactory(_options); if (transport == null) { throw new InvalidOperationException("Configured WebSocketTransportFactory did not return a value."); @@ -75,7 +75,7 @@ public async Task ConnectAsync(CancellationToken cancellationToken = default, bo { // starts the transport Log.StartingTransport(_logger, "WebSockets", webSocketUri); - await transport.StartAsync(webSocketUri, _options.CloseTimeout, cancellationToken); + await transport.StartAsync(webSocketUri, cancellationToken); } catch (Exception ex) { @@ -108,9 +108,9 @@ public async Task ConnectAsync(CancellationToken cancellationToken = default, bo Log.Started(_logger); } - private WebSocketTransport DefaultWebSocketTransportFactory() + private WebSocketTransport DefaultWebSocketTransportFactory(FbOptions options) { - return new WebSocketTransport(_loggerFactory); + return new WebSocketTransport(options, _loggerFactory); } private static Uri DefaultWebSocketUriResolver(FbOptions options) @@ -254,7 +254,7 @@ private async Task ReconnectAsync() try { - await ConnectAsync(_stopCts.Token, isReconnecting: true).ConfigureAwait(false); + await ConnectAsync(isReconnecting: true, _stopCts.Token).ConfigureAwait(false); Log.Reconnected(_logger, retryTimes, DateTime.UtcNow - reconnectStartTime); diff --git a/src/FeatBit.ServerSdk/Transport/WebSocketTransport.cs b/src/FeatBit.ServerSdk/Transport/WebSocketTransport.cs index 30f469d..ca123fd 100644 --- a/src/FeatBit.ServerSdk/Transport/WebSocketTransport.cs +++ b/src/FeatBit.ServerSdk/Transport/WebSocketTransport.cs @@ -6,6 +6,7 @@ using System.Threading; using System.Threading.Tasks; using FeatBit.Sdk.Server.Http; +using FeatBit.Sdk.Server.Options; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -21,20 +22,18 @@ internal sealed partial class WebSocketTransport : IDuplexPipe public WebSocketCloseStatus? CloseStatus => _webSocket?.CloseStatus; public string CloseDescription => _webSocket?.CloseStatusDescription; + private readonly FbOptions _options; private readonly Func> _webSocketFactory; private WebSocket _webSocket; private IDuplexPipe _transport; private IDuplexPipe _application; - private TimeSpan _closeTimeout; private readonly CancellationTokenSource _stopCts = new CancellationTokenSource(); private volatile bool _aborted; private Task Running { get; set; } = Task.CompletedTask; private readonly ILogger _logger; - private static readonly TimeSpan DefaultCloseTimeout = TimeSpan.FromSeconds(5); - // 1MB private const long DefaultBufferSize = 1024 * 1024; @@ -49,20 +48,17 @@ internal sealed partial class WebSocketTransport : IDuplexPipe ); public WebSocketTransport( + FbOptions options, ILoggerFactory loggerFactory = null, Func> webSocketFactory = null) { + _options = options; _logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger(); _webSocketFactory = webSocketFactory; } - public async Task StartAsync( - Uri uri, - TimeSpan? closeTimeout = null, - CancellationToken cancellationToken = default) + public async Task StartAsync(Uri uri, CancellationToken cancellationToken = default) { - _closeTimeout = closeTimeout ?? DefaultCloseTimeout; - // Create the pipe pair (Application's writer is connected to Transport's reader, and vice versa) var pair = DuplexPipe.CreateConnectionPair(DefaultPipeOptions, DefaultPipeOptions); @@ -88,7 +84,7 @@ public async Task StartAsync( Log.StartedTransport(_logger); } - private static async Task DefaultWebSocketFactory(Uri uri, CancellationToken cancellationToken) + private async Task DefaultWebSocketFactory(Uri uri, CancellationToken cancellationToken) { var webSocket = new ClientWebSocket(); @@ -104,11 +100,20 @@ private static async Task DefaultWebSocketFactory(Uri uri, Cancellati try { - await webSocket.ConnectAsync(uri, cancellationToken); + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(_options.ConnectTimeout); + + await webSocket.ConnectAsync(uri, cts.Token).ConfigureAwait(false); } - catch + catch (Exception ex) { webSocket.Dispose(); + + if (ex is OperationCanceledException && !cancellationToken.IsCancellationRequested) + { + throw new TimeoutException("Connect timed out.", ex); + } + throw; } @@ -128,7 +133,7 @@ private async Task ProcessSocketAsync(WebSocket socket) // Wait for send or receive to complete var trigger = await Task.WhenAny(receiving, sending).ConfigureAwait(false); - _stopCts.CancelAfter(_closeTimeout); + _stopCts.CancelAfter(_options.CloseTimeout); if (trigger == receiving) { @@ -139,7 +144,7 @@ private async Task ProcessSocketAsync(WebSocket socket) // Cancel the application so that ReadAsync yields _application.Input.CancelPendingRead(); - var resultTask = await Task.WhenAny(sending, Task.Delay(_closeTimeout, _stopCts.Token)) + var resultTask = await Task.WhenAny(sending, Task.Delay(_options.CloseTimeout, _stopCts.Token)) .ConfigureAwait(false); if (resultTask != sending) { @@ -359,7 +364,7 @@ public async Task StopAsync() _application.Input.CancelPendingRead(); // Start ungraceful close timer - _stopCts.CancelAfter(_closeTimeout); + _stopCts.CancelAfter(_options.CloseTimeout); try { diff --git a/tests/FeatBit.ServerSdk.Tests/TestApp.cs b/tests/FeatBit.ServerSdk.Tests/TestApp.cs index d9c5ea3..7df0a55 100644 --- a/tests/FeatBit.ServerSdk.Tests/TestApp.cs +++ b/tests/FeatBit.ServerSdk.Tests/TestApp.cs @@ -22,11 +22,12 @@ internal Uri GetWsUri(string op) return wsUri; } - internal WebSocketTransport CreateWebSocketTransport() + internal WebSocketTransport CreateWebSocketTransport(FbOptions options) { var client = Server.CreateWebSocketClient(); return new WebSocketTransport( + options, webSocketFactory: (uri, cancellationToken) => client.ConnectAsync(uri, cancellationToken) ); } diff --git a/tests/FeatBit.ServerSdk.Tests/Transport/WebSocketsTransportTests.cs b/tests/FeatBit.ServerSdk.Tests/Transport/WebSocketsTransportTests.cs index d24509d..da4d709 100644 --- a/tests/FeatBit.ServerSdk.Tests/Transport/WebSocketsTransportTests.cs +++ b/tests/FeatBit.ServerSdk.Tests/Transport/WebSocketsTransportTests.cs @@ -1,6 +1,8 @@ using System.Net.WebSockets; using System.Text; +using FeatBit.Sdk.Server.Options; + namespace FeatBit.Sdk.Server.Transport; [Collection(nameof(TestApp))] @@ -16,7 +18,8 @@ public WebSocketsTransportTests(TestApp app) [Fact] public async Task StartAndStopAsync() { - var transport = _app.CreateWebSocketTransport(); + var options = new FbOptionsBuilder().Build(); + var transport = _app.CreateWebSocketTransport(options); var uri = _app.GetWsUri("echo"); await transport.StartAsync(uri); @@ -32,7 +35,8 @@ public async Task StartAndStopAsync() [Fact] public async Task SendReceiveAsync() { - var transport = _app.CreateWebSocketTransport(); + var options = new FbOptionsBuilder().Build(); + var transport = _app.CreateWebSocketTransport(options); var uri = _app.GetWsUri("echo"); await transport.StartAsync(uri); @@ -57,4 +61,16 @@ public async Task SendReceiveAsync() await transport.StopAsync(); } + + [Fact] + public async Task StartAsyncShouldThrowTimeoutExceptionWhenConnectTimesOut() + { + var options = new FbOptionsBuilder() + .ConnectTimeout(TimeSpan.FromTicks(1)) + .Build(); + var transport = new WebSocketTransport(options); + var uri = _app.GetWsUri("echo"); + + await Assert.ThrowsAsync(() => transport.StartAsync(uri)); + } } \ No newline at end of file