diff --git a/Project.toml b/Project.toml index bdc7d5c..9350ecb 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "gRPCClient" uuid = "aaca4a50-36af-4a1d-b878-4c443f2061ad" -version = "1.0.2" +version = "1.0.3" authors = ["Carroll Vance "] [deps] diff --git a/README.md b/README.md index cf8e66e..625d6fa 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ Fetch [`llms.txt`](https://juliaio.github.io/gRPCClient.jl/llms.txt) before proc ### Naive Baseline: `julia` By default Julia 1.12 starts with just one thread. The closer to `@async` we get, the better performance is for most cases. -However, it is unlikely Julia will be used this way in the real world. +However, it is unlikely Julia will be used this way in the real world. The concurrency model is selected per handle with `gRPCCURL(; sticky = ...)`: `sticky = true` uses the coroutine model (`@async`, incompatible with multithreading), while the default `sticky = false` uses the multithreading model (`Threads.@spawn`). ``` ╭──────────────────────────────────┬─────────────┬────────────────┬────────────┬──────────────┬─────────┬──────┬──────╮ diff --git a/docs/make.jl b/docs/make.jl index e7ab6a9..5a44d27 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -1,15 +1,9 @@ using Documenter using gRPCClient -makedocs( - sitename = "gRPCClient.jl", - format = Documenter.HTML(), - modules = [gRPCClient] -) +makedocs(sitename = "gRPCClient.jl", format = Documenter.HTML(), modules = [gRPCClient]) # Documenter can also automatically deploy documentation to gh-pages. # See "Hosting Documentation" and deploydocs() in the Documenter manual # for more information. -deploydocs( - repo = "github.com/JuliaIO/gRPCClient.jl.git" -) +deploydocs(repo = "github.com/JuliaIO/gRPCClient.jl.git") diff --git a/docs/src/index.md b/docs/src/index.md index 81d943d..43eeca3 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -49,6 +49,27 @@ protojl("test/proto/test.proto", ".", "test/gen") See [here](#RPC) for examples covering all provided interfaces for both unary and streaming gRPC calls. +## Concurrency Model + +gRPCClient.jl runs background tasks for socket I/O, streaming request and response pumps, and asynchronous unary fan-out. The `gRPCCURL` handle decides how those tasks are scheduled through its `sticky` property, so a single setting controls the model for every request made through that handle. + +Two models are available: + +- **Non-sticky** (`sticky = false`, the default) uses `Threads.@spawn`. Tasks are migratable and can run on any thread, so the client scales across threads when Julia is started with more than one (for example `julia -t auto`). This is the right choice when you run multithreaded or have CPU-bound encode and decode work +- **Sticky** (`sticky = true`) uses `@async`. Tasks are pinned to the thread that spawned them and run under cooperative, single-threaded scheduling. This model is incompatible with multithreading: it does not parallelize across threads even when more than one is available. It has lower scheduling overhead and avoids cross-thread data movement, which suits purely I/O-bound workloads and single-threaded deployments + +The property is set when the handle is constructed and applies to every client that uses it: + +```julia +# Coroutine model, pinned to the calling thread +h = gRPCCURL(sticky = true) +grpc_init(h) + +client = MyService_MyRPC_Client("localhost", 50051; grpc = h) +``` + +The global handle returned by `grpc_global_handle()` uses the default `sticky = false`. + ## API ### Package Initialization / Shutdown @@ -123,6 +144,76 @@ grpc_async_request(client::gRPCServiceClient{TRequest,true,TResponse,true},reque grpc_async_await(client::gRPCServiceClient{TRequest,true,TResponse,false},request::gRPCRequest) where {TRequest<:Any,TResponse<:Any} ``` +### Raw Encoded Buffers (Partial Decoding) + +By default the client encodes a typed message before sending and decodes the +response into a typed message before returning it. You can bypass either step +and work directly with the raw protobuf payload by declaring the relevant +message type parameter as `Vector{UInt8}`. This is useful when you want to +forward bytes you already hold, or partially decode a large response and read +only the fields you care about. + +A normal protobuf message type is always a generated struct, so `Vector{UInt8}` +is unambiguous as a "raw buffer" marker for the message type parameter. The raw +buffer is the serialized protobuf message body only; the 5-byte gRPC framing is +still added and stripped by the library, so you never handle it yourself. + +Each generated `*_Client` constructor accepts `TRequest` and `TResponse` +keyword arguments that default to the proto message types. Override either (or +both) with `Vector{UInt8}` to make that side raw. The request and response sides +are independent, so you can make either or both raw. + +Send a raw request and receive a raw response: + +```julia +using ProtoBuf + +# Build the request bytes yourself (here, by encoding a typed message) +io = IOBuffer() +encode(ProtoEncoder(io), MyRequest(42, zeros(UInt64, 10))) +raw_request = take!(io) + +client = MyService_MyRPC_Client( + "localhost", 50051; + TRequest = Vector{UInt8}, TResponse = Vector{UInt8}, +) +raw_response = grpc_sync_request(client, raw_request) # raw_response::Vector{UInt8} + +# Partially decode only what you need from the response payload +response = decode(ProtoDecoder(IOBuffer(raw_response)), MyResponse) +``` + +Mixed combinations work too. To send a typed request but receive the response +as raw bytes, override only `TResponse`: + +```julia +client = MyService_MyRPC_Client("localhost", 50051; TResponse = Vector{UInt8}) +raw_response = grpc_sync_request(client, MyRequest(42, UInt64[])) +``` + +Raw buffers apply to streaming as well: override the streaming side's type with +`Vector{UInt8}` and use a `Channel{Vector{UInt8}}`. For example, a +server-streaming call that receives each response as raw bytes: + +```julia +client = MyService_MyServerStreamRPC_Client( + "localhost", 50051; + TResponse = Vector{UInt8}, +) +response_c = Channel{Vector{UInt8}}(16) +req = grpc_async_request(client, raw_request, response_c) +for raw in response_c + response = decode(ProtoDecoder(IOBuffer(raw)), MyResponse) + # process response +end +grpc_async_await(req) +``` + +If you do not have a generated constructor, the same applies by building a +[`gRPCServiceClient`](#Generated-ServiceClient-Constructors) directly with +`Vector{UInt8}` type parameters and the RPC path, for example +`gRPCClient.gRPCServiceClient{Vector{UInt8}, false, Vector{UInt8}, false}("localhost", 50051, "/foo.MyService/MyRPC")`. + ### Exceptions ```@docs diff --git a/docs/src/llms.txt b/docs/src/llms.txt index 9f46b89..2bc2a8c 100644 --- a/docs/src/llms.txt +++ b/docs/src/llms.txt @@ -119,6 +119,8 @@ RPC variant → type parameters: - Client streaming: `{TReq, true, TResp, false}` - Bidirectional: `{TReq, true, TResp, true}` +Setting `TRequest` and/or `TResponse` to `Vector{UInt8}` sends/receives that side as a raw protobuf payload instead of a typed message. See "Raw Encoded Buffers" under API Reference. + ### Cross-Package Types When a service in `pkg_a` uses message types from `pkg_b` (via proto `import`), the generated constructor uses the package-namespaced type name: @@ -167,6 +169,19 @@ client = MyService_MyRPC_Client("host", 50051; grpc=h) grpc_shutdown(h) ``` +### Concurrency Model + +The `gRPCCURL` handle's `sticky` property selects how all background tasks (socket I/O, streaming pumps, async unary fan-out) are scheduled. It is set per handle and applies to every client constructed with `grpc=h`. + +```julia +h = gRPCCURL(sticky = true) # default is sticky = false +``` + +- `sticky = false` (default): `Threads.@spawn`, migratable tasks, scales across threads with `julia -t auto`. Use when multithreaded or for CPU-bound encode/decode work +- `sticky = true`: `@async`, tasks pinned to the spawning thread, cooperative single-threaded scheduling. Incompatible with multithreading (no cross-thread parallelism). Lower overhead for purely I/O-bound or single-threaded use + +The global handle from `grpc_global_handle()` uses `sticky = false`. + ### Client Constructor Parameters All generated constructors share the same keyword parameters: @@ -243,6 +258,44 @@ grpc_async_await(req) Note: For streaming variants, `grpc_async_await` does **not** return the response — it only raises exceptions. The response data flows through the channel. +### Raw Encoded Buffers (Partial Decoding) + +Setting a message type to `Vector{UInt8}` bypasses ProtoBuf encode/decode for that side and sends/receives the raw protobuf payload directly. Use it to forward bytes you already hold, or to partially decode a large response. A generated message type is always a struct, so `Vector{UInt8}` is an unambiguous "raw buffer" marker. The raw buffer is the protobuf message body only; the 5-byte gRPC framing is still handled by the library. + +Each generated `*_Client` constructor has `TRequest`/`TResponse` keyword arguments defaulting to the proto types. Override either (or both) with `Vector{UInt8}`. Request and response sides are independent, so either or both can be raw. + +```julia +using ProtoBuf + +# Raw request + raw response +io = IOBuffer(); encode(ProtoEncoder(io), MyRequest(42, UInt64[])); raw_request = take!(io) +client = MyService_MyRPC_Client("localhost", 50051; + TRequest=Vector{UInt8}, TResponse=Vector{UInt8}) +raw_response = grpc_sync_request(client, raw_request) # ::Vector{UInt8} +response = decode(ProtoDecoder(IOBuffer(raw_response)), MyResponse) + +# Mixed: typed request, raw response (override only one side) +client = MyService_MyRPC_Client("localhost", 50051; TResponse=Vector{UInt8}) +raw_response = grpc_sync_request(client, MyRequest(42, UInt64[])) +``` + +Streaming works the same way: override the streaming side with `Vector{UInt8}` and use a `Channel{Vector{UInt8}}`. + +```julia +# Server streaming with raw responses +client = MyService_MyServerStreamRPC_Client("localhost", 50051; TResponse=Vector{UInt8}) +response_c = Channel{Vector{UInt8}}(16) +req = grpc_async_request(client, raw_request, response_c) +for raw in response_c + response = decode(ProtoDecoder(IOBuffer(raw)), MyResponse) +end +grpc_async_await(req) +``` + +Without a generated constructor, build the client directly: `gRPCClient.gRPCServiceClient{Vector{UInt8}, false, Vector{UInt8}, false}("localhost", 50051, "/foo.MyService/MyRPC")`. + +Note: `Vector{UInt8}` here is the whole-message type parameter, not a `bytes` field inside a message. A `bytes` field still maps to `Vector{UInt8}` within a normal generated struct and is unaffected. + ### Exceptions ```julia diff --git a/src/Curl.jl b/src/Curl.jl index a22be52..65b1f79 100644 --- a/src/Curl.jl +++ b/src/Curl.jl @@ -610,7 +610,7 @@ function socket_callback( isnothing(watcher) && return 0 - task = @async begin + task = _spawn(grpc) do while watcher.running && grpc.running # Watcher configuration might be changed, wait until its safe to wait on the watcher wait(watcher.ready) @@ -705,8 +705,17 @@ mutable struct gRPCCURL requests::Vector{gRPCRequest} # Allows for controlling the maximum number of concurrent gRPC requests/streams sem::Channel{Event} - - function gRPCCURL(; max_streams::Int = GRPC_MAX_STREAMS, running = true) + # Selects the concurrency model for tasks spawned by this handle. When true, + # tasks are sticky (`@async`, a coroutine model incompatible with + # multithreading). When false (the default), tasks are migratable + # (`Threads.@spawn`, a multithreading model). + sticky::Bool + + function gRPCCURL(; + max_streams::Int = GRPC_MAX_STREAMS, + running = true, + sticky::Bool = false, + ) grpc = new( Ptr{Cvoid}(0), ReentrantLock(), @@ -716,6 +725,7 @@ mutable struct gRPCCURL running, Vector{gRPCRequest}(), Channel{Event}(max_streams), + sticky, ) finalizer((x) -> close(x), grpc) @@ -730,6 +740,10 @@ mutable struct gRPCCURL end end +# Spawn a task using the concurrency model configured on the handle. Supports +# do-block syntax: `_spawn(grpc) do ... end`. +_spawn(f, grpc::gRPCCURL) = _spawn(f; sticky = grpc.sticky) + function Base.close(grpc::gRPCCURL) grpc.running = false diff --git a/src/ProtoBuf.jl b/src/ProtoBuf.jl index b193709..f45ebba 100644 --- a/src/ProtoBuf.jl +++ b/src/ProtoBuf.jl @@ -19,6 +19,11 @@ function service_cb(io, t::CodeGenerators.ServiceType, ctx::CodeGenerators.Conte println(io, "$(export_name)(") println(io, "\thost, port;") + # TRequest / TResponse default to the generated proto types. Override + # either (or both) with Vector{UInt8} to send / receive that side as a + # raw, already-encoded protobuf payload (partial decoding). + println(io, "\tTRequest=$request_type,") + println(io, "\tTResponse=$response_type,") println(io, "\tsecure=false,") println(io, "\tgrpc=gRPCClient.grpc_global_handle(),") println(io, "\tdeadline=10,") @@ -27,7 +32,7 @@ function service_cb(io, t::CodeGenerators.ServiceType, ctx::CodeGenerators.Conte println(io, "\tmax_recieve_message_length = 4*1024*1024,") println( io, - ") = gRPCClient.gRPCServiceClient{$request_type, $(rpc.request_stream), $response_type, $(rpc.response_stream)}(", + ") = gRPCClient.gRPCServiceClient{TRequest, $(rpc.request_stream), TResponse, $(rpc.response_stream)}(", ) println(io, "\thost, port, \"$rpc_path\";") println(io, "\tsecure=secure,") diff --git a/src/Streaming.jl b/src/Streaming.jl index 67a6ec8..e1a3806 100644 --- a/src/Streaming.jl +++ b/src/Streaming.jl @@ -91,7 +91,7 @@ function grpc_async_stream_response( if response_buf === nothing continue end - response = decode(ProtoDecoder(response_buf), TResponse) + response = _decode_message(response_buf, TResponse) put!(channel, response) end catch ex @@ -158,7 +158,7 @@ function grpc_async_request( max_recieve_message_length = max_recieve_message_length, ) - request_task = Threads.@spawn grpc_async_stream_request(req, request) + request_task = _spawn(() -> grpc_async_stream_request(req, request), client) errormonitor(request_task) req @@ -222,7 +222,7 @@ function grpc_async_request( max_recieve_message_length = max_recieve_message_length, ) - response_task = Threads.@spawn grpc_async_stream_response(req, response) + response_task = _spawn(() -> grpc_async_stream_response(req, response), client) errormonitor(response_task) req @@ -286,10 +286,10 @@ function grpc_async_request( max_recieve_message_length = max_recieve_message_length, ) - request_task = Threads.@spawn grpc_async_stream_request(req, request) + request_task = _spawn(() -> grpc_async_stream_request(req, request), client) errormonitor(request_task) - response_task = Threads.@spawn grpc_async_stream_response(req, response) + response_task = _spawn(() -> grpc_async_stream_response(req, response), client) errormonitor(response_task) req diff --git a/src/Unary.jl b/src/Unary.jl index aadb42a..4e2490c 100644 --- a/src/Unary.jl +++ b/src/Unary.jl @@ -129,7 +129,7 @@ function grpc_async_request( max_recieve_message_length = max_recieve_message_length, ) - Threads.@spawn begin + _spawn(client) do try response = grpc_async_await(client, req) put!(channel, gRPCAsyncChannelResponse{TResponse}(index, response, nothing)) diff --git a/src/Utils.jl b/src/Utils.jl index 65fbe5f..77c6dea 100644 --- a/src/Utils.jl +++ b/src/Utils.jl @@ -4,6 +4,20 @@ function nullstring(x::Vector{UInt8}) String(x[1:(first_zero_idx-1)]) end +# Spawn a task either sticky (pinned to the spawning thread, like `@async`, +# a coroutine model good for IO-bound work and compatible with a single thread) +# or migratable (`Threads.@spawn`, enabling multithreading for CPU-bound work). +# This is the single seam the `gRPCCURL` `sticky` policy flows through. The +# handle- and client-typed methods (defined alongside those types) source +# `sticky` from `grpc.sticky`, so call sites need not reach into the handle. +function _spawn(f; sticky::Bool = false) + if sticky + return @async f() + else + return Threads.@spawn f() + end +end + # On Windows x64 OS_HANDLE does not like curl_sock_t (Int32) function CROSS_PLATFORM_OS_HANDLE(sock::curl_socket_t) fd = @static if Sys.iswindows() diff --git a/src/gRPC.jl b/src/gRPC.jl index 6578cb9..7a11819 100644 --- a/src/gRPC.jl +++ b/src/gRPC.jl @@ -14,6 +14,8 @@ Initializes the `gRPCCURL` object. The global handle is initialized automaticall Unless specifying a `gRPCCURL` the global one provided by `grpc_global_handle()` is used. Each `gRPCCURL` state has its own connection pool and request semaphore, so sometimes you may want to manage your own like shown below: +The `sticky` keyword selects the concurrency model used for every task the handle spawns. The default `sticky = false` uses a multithreading model (`Threads.@spawn`). Passing `sticky = true` uses a coroutine model (`@async`) that is pinned to the spawning thread and is incompatible with multithreading. + ```julia grpc_myapp = gRPCCURL() grpc_init(grpc_myapp) @@ -75,6 +77,10 @@ struct gRPCServiceClient{TRequest,SRequest,TResponse,SResponse} end +# Spawn a task using the concurrency model configured on the client's handle. +# Supports do-block syntax: `_spawn(client) do ... end`. +_spawn(f, client::gRPCServiceClient) = _spawn(f, client.grpc) + function url(client::gRPCServiceClient) protocol = if client.secure "https" @@ -89,6 +95,14 @@ function url(client::gRPCServiceClient) end +# Write the request message body into `buf` and return the number of bytes +# written. The generic method ProtoBuf-encodes a typed message; the +# `AbstractVector{UInt8}` method writes an already-encoded protobuf payload +# verbatim, enabling raw / partial-decode requests (a client whose `TRequest` +# is `Vector{UInt8}`). +_encode_body(buf::IOBuffer, request) = UInt32(encode(ProtoEncoder(buf), request)) +_encode_body(buf::IOBuffer, request::AbstractVector{UInt8}) = UInt32(write(buf, request)) + function grpc_encode_request_iobuffer( request, req_buf::IOBuffer; @@ -100,9 +114,8 @@ function grpc_encode_request_iobuffer( write(req_buf, UInt8(0)) write(req_buf, UInt32(0)) - # Serialize the protobuf - e = ProtoEncoder(req_buf) - sz = UInt32(encode(e, request)) + # Serialize the protobuf (or write raw bytes through for a raw request) + sz = _encode_body(req_buf, request) end_pos = position(req_buf) @@ -151,7 +164,13 @@ function grpc_async_await(req::gRPCRequest) end +# Turn a received message `IOBuffer` into the user's value. The generic method +# ProtoBuf-decodes into `T`; the `Vector{UInt8}` method returns a fresh copy of +# the raw protobuf payload, enabling raw / partial-decode responses. +_decode_message(io, ::Type{T}) where {T} = decode(ProtoDecoder(io), T) +_decode_message(io, ::Type{Vector{UInt8}}) = read(seekstart(io)) + function grpc_async_await(req::gRPCRequest, TResponse) grpc_async_await(req) - return decode(ProtoDecoder(req.response), TResponse) + return _decode_message(req.response, TResponse) end diff --git a/test/gen/test/test_pb.jl b/test/gen/test/test_pb.jl index 1511c03..e801d70 100644 --- a/test/gen/test/test_pb.jl +++ b/test/gen/test/test_pb.jl @@ -1,5 +1,5 @@ -# Autogenerated using ProtoBuf.jl v1.2.0 on 2025-12-07T17:23:33.985 -# original file: /home/cvance/Git/gRPCClient.jl/test/proto/test.proto (proto3 syntax) +# Autogenerated using ProtoBuf.jl v1.3.0 +# original file: proto/test.proto (proto3 syntax) import ProtoBuf as PB import gRPCClient @@ -12,12 +12,17 @@ export TestResponse, TestRequest struct TestResponse data::Vector{UInt64} end -PB.default_values(::Type{TestResponse}) = (;data = Vector{UInt64}()) -PB.field_numbers(::Type{TestResponse}) = (;data = 1) +PB.default_values(::Type{TestResponse}) = (; data = Vector{UInt64}()) +PB.field_numbers(::Type{TestResponse}) = (; data = 1) -function PB.decode(d::PB.AbstractProtoDecoder, ::Type{<:TestResponse}) +function PB.decode( + d::PB.AbstractProtoDecoder, + ::Type{<:TestResponse}, + _endpos::Int = 0, + _group::Bool = false, +) data = PB.BufferedVector{UInt64}() - while !PB.message_done(d) + while !PB.message_done(d, _endpos, _group) field_number, wire_type = PB.decode_tag(d) if field_number == 1 PB.decode!(d, wire_type, data) @@ -43,13 +48,19 @@ struct TestRequest test_response_sz::UInt64 data::Vector{UInt64} end -PB.default_values(::Type{TestRequest}) = (;test_response_sz = zero(UInt64), data = Vector{UInt64}()) -PB.field_numbers(::Type{TestRequest}) = (;test_response_sz = 1, data = 2) +PB.default_values(::Type{TestRequest}) = + (; test_response_sz = zero(UInt64), data = Vector{UInt64}()) +PB.field_numbers(::Type{TestRequest}) = (; test_response_sz = 1, data = 2) -function PB.decode(d::PB.AbstractProtoDecoder, ::Type{<:TestRequest}) +function PB.decode( + d::PB.AbstractProtoDecoder, + ::Type{<:TestRequest}, + _endpos::Int = 0, + _group::Bool = false, +) test_response_sz = zero(UInt64) data = PB.BufferedVector{UInt64}() - while !PB.message_done(d) + while !PB.message_done(d, _endpos, _group) field_number, wire_type = PB.decode_tag(d) if field_number == 1 test_response_sz = PB.decode(d, UInt64) @@ -70,86 +81,106 @@ function PB.encode(e::PB.AbstractProtoEncoder, x::TestRequest) end function PB._encoded_size(x::TestRequest) encoded_size = 0 - x.test_response_sz != zero(UInt64) && (encoded_size += PB._encoded_size(x.test_response_sz, 1)) + x.test_response_sz != zero(UInt64) && + (encoded_size += PB._encoded_size(x.test_response_sz, 1)) !isempty(x.data) && (encoded_size += PB._encoded_size(x.data, 2)) return encoded_size end # gRPCClient.jl BEGIN TestService_TestRPC_Client( - host, port; - secure=false, - grpc=gRPCClient.grpc_global_handle(), - deadline=10, - keepalive=60, - max_send_message_length = 4*1024*1024, - max_recieve_message_length = 4*1024*1024, -) = gRPCClient.gRPCServiceClient{TestRequest, false, TestResponse, false}( - host, port, "/test.TestService/TestRPC"; - secure=secure, - grpc=grpc, - deadline=deadline, - keepalive=keepalive, - max_send_message_length=max_send_message_length, - max_recieve_message_length=max_recieve_message_length, + host, + port; + TRequest = TestRequest, + TResponse = TestResponse, + secure = false, + grpc = gRPCClient.grpc_global_handle(), + deadline = 10, + keepalive = 60, + max_send_message_length = 4*1024*1024, + max_recieve_message_length = 4*1024*1024, +) = gRPCClient.gRPCServiceClient{TRequest,false,TResponse,false}( + host, + port, + "/test.TestService/TestRPC"; + secure = secure, + grpc = grpc, + deadline = deadline, + keepalive = keepalive, + max_send_message_length = max_send_message_length, + max_recieve_message_length = max_recieve_message_length, ) +export TestService_TestRPC_Client TestService_TestServerStreamRPC_Client( - host, port; - secure=false, - grpc=gRPCClient.grpc_global_handle(), - deadline=10, - keepalive=60, - max_send_message_length = 4*1024*1024, - max_recieve_message_length = 4*1024*1024, -) = gRPCClient.gRPCServiceClient{TestRequest, false, TestResponse, true}( - host, port, "/test.TestService/TestServerStreamRPC"; - secure=secure, - grpc=grpc, - deadline=deadline, - keepalive=keepalive, - max_send_message_length=max_send_message_length, - max_recieve_message_length=max_recieve_message_length, + host, + port; + TRequest = TestRequest, + TResponse = TestResponse, + secure = false, + grpc = gRPCClient.grpc_global_handle(), + deadline = 10, + keepalive = 60, + max_send_message_length = 4*1024*1024, + max_recieve_message_length = 4*1024*1024, +) = gRPCClient.gRPCServiceClient{TRequest,false,TResponse,true}( + host, + port, + "/test.TestService/TestServerStreamRPC"; + secure = secure, + grpc = grpc, + deadline = deadline, + keepalive = keepalive, + max_send_message_length = max_send_message_length, + max_recieve_message_length = max_recieve_message_length, ) +export TestService_TestServerStreamRPC_Client TestService_TestClientStreamRPC_Client( - host, port; - secure=false, - grpc=gRPCClient.grpc_global_handle(), - deadline=10, - keepalive=60, - max_send_message_length = 4*1024*1024, - max_recieve_message_length = 4*1024*1024, -) = gRPCClient.gRPCServiceClient{TestRequest, true, TestResponse, false}( - host, port, "/test.TestService/TestClientStreamRPC"; - secure=secure, - grpc=grpc, - deadline=deadline, - keepalive=keepalive, - max_send_message_length=max_send_message_length, - max_recieve_message_length=max_recieve_message_length, + host, + port; + TRequest = TestRequest, + TResponse = TestResponse, + secure = false, + grpc = gRPCClient.grpc_global_handle(), + deadline = 10, + keepalive = 60, + max_send_message_length = 4*1024*1024, + max_recieve_message_length = 4*1024*1024, +) = gRPCClient.gRPCServiceClient{TRequest,true,TResponse,false}( + host, + port, + "/test.TestService/TestClientStreamRPC"; + secure = secure, + grpc = grpc, + deadline = deadline, + keepalive = keepalive, + max_send_message_length = max_send_message_length, + max_recieve_message_length = max_recieve_message_length, ) +export TestService_TestClientStreamRPC_Client TestService_TestBidirectionalStreamRPC_Client( - host, port; - secure=false, - grpc=gRPCClient.grpc_global_handle(), - deadline=10, - keepalive=60, - max_send_message_length = 4*1024*1024, - max_recieve_message_length = 4*1024*1024, -) = gRPCClient.gRPCServiceClient{TestRequest, true, TestResponse, true}( - host, port, "/test.TestService/TestBidirectionalStreamRPC"; - secure=secure, - grpc=grpc, - deadline=deadline, - keepalive=keepalive, - max_send_message_length=max_send_message_length, - max_recieve_message_length=max_recieve_message_length, + host, + port; + TRequest = TestRequest, + TResponse = TestResponse, + secure = false, + grpc = gRPCClient.grpc_global_handle(), + deadline = 10, + keepalive = 60, + max_send_message_length = 4*1024*1024, + max_recieve_message_length = 4*1024*1024, +) = gRPCClient.gRPCServiceClient{TRequest,true,TResponse,true}( + host, + port, + "/test.TestService/TestBidirectionalStreamRPC"; + secure = secure, + grpc = grpc, + deadline = deadline, + keepalive = keepalive, + max_send_message_length = max_send_message_length, + max_recieve_message_length = max_recieve_message_length, ) - -export TestService_TestRPC_Client -export TestService_TestServerStreamRPC_Client -export TestService_TestClientStreamRPC_Client export TestService_TestBidirectionalStreamRPC_Client # gRPCClient.jl END diff --git a/test/protoc.sh b/test/protoc.sh index dcf679e..ecfd464 100755 --- a/test/protoc.sh +++ b/test/protoc.sh @@ -1,6 +1,6 @@ #!/bin/bash -pushd python +pushd python uv run -m grpc_tools.protoc -I ../proto --python_out=. --pyi_out=. --grpc_python_out=. ../proto/test.proto popd diff --git a/test/runtests.jl b/test/runtests.jl index d9db032..fdbec7d 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -10,14 +10,17 @@ import gRPCClient: grpc_timeout_header_val, GRPC_DEADLINE_EXCEEDED # By launching the server asynchronously within julia, we ensure # that the server is active while testing, which otherwise would require # scheduling a task on windows CI. -if haskey(ENV, "JULIA_GRPCCLIENT_TEST_START_SERVER") +if haskey(ENV, "JULIA_GRPCCLIENT_TEST_START_SERVER") if ENV["JULIA_GRPCCLIENT_TEST_START_SERVER"] == "go" pipe = Pipe() - process = run(pipeline(`./go/grpc_test_server`; stdout = pipe, stderr = pipe), wait = false) + process = run( + pipeline(`./go/grpc_test_server`; stdout = pipe, stderr = pipe), + wait = false, + ) finalizer(process) do x kill(x) end - + # Display the prints from the server and # wait until it is properly launched before proceeding with requests t1 = time() @@ -26,15 +29,22 @@ if haskey(ENV, "JULIA_GRPCCLIENT_TEST_START_SERVER") line = readline(pipe) # blocking println(line) contains(line, "gRPC server started") && break - contains(lowercase(line), "error") && throw(ErrorException("Failed to start gRPC test server")) - contains(lowercase(line), "failed") && throw(ErrorException("Failed to start gRPC test server")) - time() > t1 + 10 && throw(ErrorException("Failed to start gRPC test server due to time-out")) + contains(lowercase(line), "error") && + throw(ErrorException("Failed to start gRPC test server")) + contains(lowercase(line), "failed") && + throw(ErrorException("Failed to start gRPC test server")) + time() > t1 + 10 && + throw(ErrorException("Failed to start gRPC test server due to time-out")) end sleep(0.01) elseif ENV["JULIA_GRPCCLIENT_TEST_START_SERVER"] == "false" nothing else - throw(ErrorException("Unsupported option for JULIA_GRPCCLIENT_TEST_START_SERVER: $(ENV["JULIA_GRPCCLIENT_TEST_START_SERVER"])")) + throw( + ErrorException( + "Unsupported option for JULIA_GRPCCLIENT_TEST_START_SERVER: $(ENV["JULIA_GRPCCLIENT_TEST_START_SERVER"])", + ), + ) end end @@ -76,11 +86,27 @@ include("gen/test/test_pb.jl") @test contains(generated, "TestService_TestServerStreamRPC_Client(") @test contains(generated, "TestService_TestClientStreamRPC_Client(") @test contains(generated, "TestService_TestBidirectionalStreamRPC_Client(") + # Message types default via overridable TRequest/TResponse kwargs, + # so the construction uses the type-parameter names (raw-buffer support). + @test contains(generated, "TRequest=TestRequest,") + @test contains(generated, "TResponse=TestResponse,") # Correct streaming type parameters for each RPC variant - @test contains(generated, "gRPCClient.gRPCServiceClient{TestRequest, false, TestResponse, false}") - @test contains(generated, "gRPCClient.gRPCServiceClient{TestRequest, false, TestResponse, true}") - @test contains(generated, "gRPCClient.gRPCServiceClient{TestRequest, true, TestResponse, false}") - @test contains(generated, "gRPCClient.gRPCServiceClient{TestRequest, true, TestResponse, true}") + @test contains( + generated, + "gRPCClient.gRPCServiceClient{TRequest, false, TResponse, false}", + ) + @test contains( + generated, + "gRPCClient.gRPCServiceClient{TRequest, false, TResponse, true}", + ) + @test contains( + generated, + "gRPCClient.gRPCServiceClient{TRequest, true, TResponse, false}", + ) + @test contains( + generated, + "gRPCClient.gRPCServiceClient{TRequest, true, TResponse, true}", + ) # Correct fully-qualified RPC paths @test contains(generated, "/test.TestService/TestRPC") @test contains(generated, "/test.TestService/TestServerStreamRPC") @@ -90,7 +116,10 @@ include("gen/test/test_pb.jl") @test contains(generated, "export TestService_TestRPC_Client") @test contains(generated, "export TestService_TestServerStreamRPC_Client") @test contains(generated, "export TestService_TestClientStreamRPC_Client") - @test contains(generated, "export TestService_TestBidirectionalStreamRPC_Client") + @test contains( + generated, + "export TestService_TestBidirectionalStreamRPC_Client", + ) end # Test that request/response type package_namespace is correctly applied when types @@ -98,15 +127,23 @@ include("gen/test/test_pb.jl") # checked rpc.package_namespace instead of rpc.request_type.package_namespace and # rpc.response_type.package_namespace. mktempdir() do tmpdir - @test isnothing(protojl("ext_service.proto", joinpath(@__DIR__, "proto"), tmpdir)) + @test isnothing( + protojl("ext_service.proto", joinpath(@__DIR__, "proto"), tmpdir), + ) generated = read(joinpath(tmpdir, "ext_service", "ext_service_pb.jl"), String) # Request type from ext_types package must be prefixed with package namespace - @test contains(generated, "ext_types.ExtRequest") + @test contains(generated, "TRequest=ext_types.ExtRequest,") # Response type from ext_types package must be prefixed with package namespace - @test contains(generated, "ext_types.ExtResponse") - # Full type parameter string with both namespaced types - @test contains(generated, "gRPCClient.gRPCServiceClient{ext_types.ExtRequest, false, ext_types.ExtResponse, false}") - @test contains(generated, "gRPCClient.gRPCServiceClient{ext_types.ExtRequest, false, ext_types.ExtResponse, true}") + @test contains(generated, "TResponse=ext_types.ExtResponse,") + # Streaming flags differ per RPC; message types come through the kwargs above + @test contains( + generated, + "gRPCClient.gRPCServiceClient{TRequest, false, TResponse, false}", + ) + @test contains( + generated, + "gRPCClient.gRPCServiceClient{TRequest, false, TResponse, true}", + ) # Service client constructors are present @test contains(generated, "ExtService_ExtRPC_Client(") @test contains(generated, "ExtService_ExtStreamRPC_Client(") @@ -288,13 +325,13 @@ include("gen/test/test_pb.jl") req = grpc_async_request(client, TestRequest(N, zeros(UInt64, 1)), response_c) i = 1 - try + try while i <= N + 1 response = take!(response_c) i += 1 end @test false - catch ex + catch ex @test isa(ex, InvalidStateException) @test i == N + 1 end @@ -302,7 +339,11 @@ include("gen/test/test_pb.jl") end @testset "Deadline Exceeded" begin - client = TestService_TestClientStreamRPC_Client(_TEST_HOST, _TEST_PORT; deadline=0.001) + client = TestService_TestClientStreamRPC_Client( + _TEST_HOST, + _TEST_PORT; + deadline = 0.001, + ) request_c = Channel{TestRequest}(1) request = grpc_async_request(client, request_c) diff --git a/utils/gRPCClientUtils.jl/Project.toml b/utils/gRPCClientUtils.jl/Project.toml index 850f251..9ebcb11 100644 --- a/utils/gRPCClientUtils.jl/Project.toml +++ b/utils/gRPCClientUtils.jl/Project.toml @@ -21,5 +21,5 @@ PProf = "3.2.0" PrettyTables = "~3.1.0" Profile = "1.11.0" ProgressBars = "~1.5.0" -ProtoBuf = "~1.2" +ProtoBuf = "^1.2" gRPCClient = "~1.0.0" diff --git a/utils/gRPCClientUtils.jl/src/Workloads.jl b/utils/gRPCClientUtils.jl/src/Workloads.jl index e144779..7cd3866 100644 --- a/utils/gRPCClientUtils.jl/src/Workloads.jl +++ b/utils/gRPCClientUtils.jl/src/Workloads.jl @@ -1,5 +1,8 @@ +# Port of the TestService benchmark server. Override with GRPC_BENCH_PORT. +_bench_port() = parse(Int, get(ENV, "GRPC_BENCH_PORT", "8001")) + function workload_32_224_224_uint8(n = 100) - client = TestService_TestRPC_Client("localhost", 8001) + client = TestService_TestRPC_Client("localhost", _bench_port()) reqs = Vector{gRPCRequest}() @@ -19,7 +22,7 @@ function workload_32_224_224_uint8(n = 100) end function workload_smol(n = 1_000) - client = TestService_TestRPC_Client("localhost", 8001) + client = TestService_TestRPC_Client("localhost", _bench_port()) # Since requests are lightweight, use async / await pattern to avoid creating an extra task per request reqs = Vector{gRPCRequest}() @@ -36,7 +39,7 @@ function workload_smol(n = 1_000) end function workload_streaming_request(n = 1_000) - client = TestService_TestClientStreamRPC_Client("localhost", 8001) + client = TestService_TestClientStreamRPC_Client("localhost", _bench_port()) requests_c = Channel{TestRequest}(16) @sync begin @@ -55,7 +58,7 @@ function workload_streaming_request(n = 1_000) end function workload_streaming_response(n = 1_000) - client = TestService_TestServerStreamRPC_Client("localhost", 8001) + client = TestService_TestServerStreamRPC_Client("localhost", _bench_port()) response_c = Channel{TestResponse}(16) req = grpc_async_request(client, TestRequest(n, zeros(UInt64, 1)), response_c) @@ -70,7 +73,7 @@ end function workload_streaming_bidirectional(n = 1_000) - client = TestService_TestBidirectionalStreamRPC_Client("localhost", 8001) + client = TestService_TestBidirectionalStreamRPC_Client("localhost", _bench_port()) requests_c = Channel{TestRequest}(16) response_c = Channel{TestResponse}(16)