Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "gRPCClient"
uuid = "aaca4a50-36af-4a1d-b878-4c443f2061ad"
version = "1.0.2"
version = "1.0.3"
authors = ["Carroll Vance <cs.vance@icloud.com>"]

[deps]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Fetch [`llms.txt`](https://juliaio.github.io/gRPCClient.jl/llms.txt) before proc
### Naive Baseline: `julia`

By default Julia 1.12 starts with just one thread. The closer to `@async` we get, the better performance is for most cases.
However, it is unlikely Julia will be used this way in the real world.
However, it is unlikely Julia will be used this way in the real world. The concurrency model is selected per handle with `gRPCCURL(; sticky = ...)`: `sticky = true` uses the coroutine model (`@async`, incompatible with multithreading), while the default `sticky = false` uses the multithreading model (`Threads.@spawn`).

```
╭──────────────────────────────────┬─────────────┬────────────────┬────────────┬──────────────┬─────────┬──────┬──────╮
Expand Down
10 changes: 2 additions & 8 deletions docs/make.jl
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
using Documenter
using gRPCClient

makedocs(
sitename = "gRPCClient.jl",
format = Documenter.HTML(),
modules = [gRPCClient]
)
makedocs(sitename = "gRPCClient.jl", format = Documenter.HTML(), modules = [gRPCClient])

# Documenter can also automatically deploy documentation to gh-pages.
# See "Hosting Documentation" and deploydocs() in the Documenter manual
# for more information.
deploydocs(
repo = "github.com/JuliaIO/gRPCClient.jl.git"
)
deploydocs(repo = "github.com/JuliaIO/gRPCClient.jl.git")
91 changes: 91 additions & 0 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,27 @@ protojl("test/proto/test.proto", ".", "test/gen")

See [here](#RPC) for examples covering all provided interfaces for both unary and streaming gRPC calls.

## Concurrency Model

gRPCClient.jl runs background tasks for socket I/O, streaming request and response pumps, and asynchronous unary fan-out. The `gRPCCURL` handle decides how those tasks are scheduled through its `sticky` property, so a single setting controls the model for every request made through that handle.

Two models are available:

- **Non-sticky** (`sticky = false`, the default) uses `Threads.@spawn`. Tasks are migratable and can run on any thread, so the client scales across threads when Julia is started with more than one (for example `julia -t auto`). This is the right choice when you run multithreaded or have CPU-bound encode and decode work
- **Sticky** (`sticky = true`) uses `@async`. Tasks are pinned to the thread that spawned them and run under cooperative, single-threaded scheduling. This model is incompatible with multithreading: it does not parallelize across threads even when more than one is available. It has lower scheduling overhead and avoids cross-thread data movement, which suits purely I/O-bound workloads and single-threaded deployments

The property is set when the handle is constructed and applies to every client that uses it:

```julia
# Coroutine model, pinned to the calling thread
h = gRPCCURL(sticky = true)
grpc_init(h)

client = MyService_MyRPC_Client("localhost", 50051; grpc = h)
```

The global handle returned by `grpc_global_handle()` uses the default `sticky = false`.

## API

### Package Initialization / Shutdown
Expand Down Expand Up @@ -123,6 +144,76 @@ grpc_async_request(client::gRPCServiceClient{TRequest,true,TResponse,true},reque
grpc_async_await(client::gRPCServiceClient{TRequest,true,TResponse,false},request::gRPCRequest) where {TRequest<:Any,TResponse<:Any}
```

### Raw Encoded Buffers (Partial Decoding)

By default the client encodes a typed message before sending and decodes the
response into a typed message before returning it. You can bypass either step
and work directly with the raw protobuf payload by declaring the relevant
message type parameter as `Vector{UInt8}`. This is useful when you want to
forward bytes you already hold, or partially decode a large response and read
only the fields you care about.

A normal protobuf message type is always a generated struct, so `Vector{UInt8}`
is unambiguous as a "raw buffer" marker for the message type parameter. The raw
buffer is the serialized protobuf message body only; the 5-byte gRPC framing is
still added and stripped by the library, so you never handle it yourself.

Each generated `*_Client` constructor accepts `TRequest` and `TResponse`
keyword arguments that default to the proto message types. Override either (or
both) with `Vector{UInt8}` to make that side raw. The request and response sides
are independent, so you can make either or both raw.

Send a raw request and receive a raw response:

```julia
using ProtoBuf

# Build the request bytes yourself (here, by encoding a typed message)
io = IOBuffer()
encode(ProtoEncoder(io), MyRequest(42, zeros(UInt64, 10)))
raw_request = take!(io)

client = MyService_MyRPC_Client(
"localhost", 50051;
TRequest = Vector{UInt8}, TResponse = Vector{UInt8},
)
raw_response = grpc_sync_request(client, raw_request) # raw_response::Vector{UInt8}

# Partially decode only what you need from the response payload
response = decode(ProtoDecoder(IOBuffer(raw_response)), MyResponse)
```

Mixed combinations work too. To send a typed request but receive the response
as raw bytes, override only `TResponse`:

```julia
client = MyService_MyRPC_Client("localhost", 50051; TResponse = Vector{UInt8})
raw_response = grpc_sync_request(client, MyRequest(42, UInt64[]))
```

Raw buffers apply to streaming as well: override the streaming side's type with
`Vector{UInt8}` and use a `Channel{Vector{UInt8}}`. For example, a
server-streaming call that receives each response as raw bytes:

```julia
client = MyService_MyServerStreamRPC_Client(
"localhost", 50051;
TResponse = Vector{UInt8},
)
response_c = Channel{Vector{UInt8}}(16)
req = grpc_async_request(client, raw_request, response_c)
for raw in response_c
response = decode(ProtoDecoder(IOBuffer(raw)), MyResponse)
# process response
end
grpc_async_await(req)
```

If you do not have a generated constructor, the same applies by building a
[`gRPCServiceClient`](#Generated-ServiceClient-Constructors) directly with
`Vector{UInt8}` type parameters and the RPC path, for example
`gRPCClient.gRPCServiceClient{Vector{UInt8}, false, Vector{UInt8}, false}("localhost", 50051, "/foo.MyService/MyRPC")`.

### Exceptions

```@docs
Expand Down
53 changes: 53 additions & 0 deletions docs/src/llms.txt
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ RPC variant → type parameters:
- Client streaming: `{TReq, true, TResp, false}`
- Bidirectional: `{TReq, true, TResp, true}`

Setting `TRequest` and/or `TResponse` to `Vector{UInt8}` sends/receives that side as a raw protobuf payload instead of a typed message. See "Raw Encoded Buffers" under API Reference.

### Cross-Package Types

When a service in `pkg_a` uses message types from `pkg_b` (via proto `import`), the generated constructor uses the package-namespaced type name:
Expand Down Expand Up @@ -167,6 +169,19 @@ client = MyService_MyRPC_Client("host", 50051; grpc=h)
grpc_shutdown(h)
```

### Concurrency Model

The `gRPCCURL` handle's `sticky` property selects how all background tasks (socket I/O, streaming pumps, async unary fan-out) are scheduled. It is set per handle and applies to every client constructed with `grpc=h`.

```julia
h = gRPCCURL(sticky = true) # default is sticky = false
```

- `sticky = false` (default): `Threads.@spawn`, migratable tasks, scales across threads with `julia -t auto`. Use when multithreaded or for CPU-bound encode/decode work
- `sticky = true`: `@async`, tasks pinned to the spawning thread, cooperative single-threaded scheduling. Incompatible with multithreading (no cross-thread parallelism). Lower overhead for purely I/O-bound or single-threaded use

The global handle from `grpc_global_handle()` uses `sticky = false`.

### Client Constructor Parameters

All generated constructors share the same keyword parameters:
Expand Down Expand Up @@ -243,6 +258,44 @@ grpc_async_await(req)

Note: For streaming variants, `grpc_async_await` does **not** return the response — it only raises exceptions. The response data flows through the channel.

### Raw Encoded Buffers (Partial Decoding)

Setting a message type to `Vector{UInt8}` bypasses ProtoBuf encode/decode for that side and sends/receives the raw protobuf payload directly. Use it to forward bytes you already hold, or to partially decode a large response. A generated message type is always a struct, so `Vector{UInt8}` is an unambiguous "raw buffer" marker. The raw buffer is the protobuf message body only; the 5-byte gRPC framing is still handled by the library.

Each generated `*_Client` constructor has `TRequest`/`TResponse` keyword arguments defaulting to the proto types. Override either (or both) with `Vector{UInt8}`. Request and response sides are independent, so either or both can be raw.

```julia
using ProtoBuf

# Raw request + raw response
io = IOBuffer(); encode(ProtoEncoder(io), MyRequest(42, UInt64[])); raw_request = take!(io)
client = MyService_MyRPC_Client("localhost", 50051;
TRequest=Vector{UInt8}, TResponse=Vector{UInt8})
raw_response = grpc_sync_request(client, raw_request) # ::Vector{UInt8}
response = decode(ProtoDecoder(IOBuffer(raw_response)), MyResponse)

# Mixed: typed request, raw response (override only one side)
client = MyService_MyRPC_Client("localhost", 50051; TResponse=Vector{UInt8})
raw_response = grpc_sync_request(client, MyRequest(42, UInt64[]))
```

Streaming works the same way: override the streaming side with `Vector{UInt8}` and use a `Channel{Vector{UInt8}}`.

```julia
# Server streaming with raw responses
client = MyService_MyServerStreamRPC_Client("localhost", 50051; TResponse=Vector{UInt8})
response_c = Channel{Vector{UInt8}}(16)
req = grpc_async_request(client, raw_request, response_c)
for raw in response_c
response = decode(ProtoDecoder(IOBuffer(raw)), MyResponse)
end
grpc_async_await(req)
```

Without a generated constructor, build the client directly: `gRPCClient.gRPCServiceClient{Vector{UInt8}, false, Vector{UInt8}, false}("localhost", 50051, "/foo.MyService/MyRPC")`.

Note: `Vector{UInt8}` here is the whole-message type parameter, not a `bytes` field inside a message. A `bytes` field still maps to `Vector{UInt8}` within a normal generated struct and is unaffected.

### Exceptions

```julia
Expand Down
20 changes: 17 additions & 3 deletions src/Curl.jl
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ function socket_callback(

isnothing(watcher) && return 0

task = @async begin
task = _spawn(grpc) do
while watcher.running && grpc.running
# Watcher configuration might be changed, wait until its safe to wait on the watcher
wait(watcher.ready)
Expand Down Expand Up @@ -705,8 +705,17 @@ mutable struct gRPCCURL
requests::Vector{gRPCRequest}
# Allows for controlling the maximum number of concurrent gRPC requests/streams
sem::Channel{Event}

function gRPCCURL(; max_streams::Int = GRPC_MAX_STREAMS, running = true)
# Selects the concurrency model for tasks spawned by this handle. When true,
# tasks are sticky (`@async`, a coroutine model incompatible with
# multithreading). When false (the default), tasks are migratable
# (`Threads.@spawn`, a multithreading model).
sticky::Bool

function gRPCCURL(;
max_streams::Int = GRPC_MAX_STREAMS,
running = true,
sticky::Bool = false,
)
grpc = new(
Ptr{Cvoid}(0),
ReentrantLock(),
Expand All @@ -716,6 +725,7 @@ mutable struct gRPCCURL
running,
Vector{gRPCRequest}(),
Channel{Event}(max_streams),
sticky,
)

finalizer((x) -> close(x), grpc)
Expand All @@ -730,6 +740,10 @@ mutable struct gRPCCURL
end
end

# Spawn a task using the concurrency model configured on the handle. Supports
# do-block syntax: `_spawn(grpc) do ... end`.
_spawn(f, grpc::gRPCCURL) = _spawn(f; sticky = grpc.sticky)

function Base.close(grpc::gRPCCURL)
grpc.running = false

Expand Down
7 changes: 6 additions & 1 deletion src/ProtoBuf.jl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ function service_cb(io, t::CodeGenerators.ServiceType, ctx::CodeGenerators.Conte

println(io, "$(export_name)(")
println(io, "\thost, port;")
# TRequest / TResponse default to the generated proto types. Override
# either (or both) with Vector{UInt8} to send / receive that side as a
# raw, already-encoded protobuf payload (partial decoding).
println(io, "\tTRequest=$request_type,")
println(io, "\tTResponse=$response_type,")
println(io, "\tsecure=false,")
println(io, "\tgrpc=gRPCClient.grpc_global_handle(),")
println(io, "\tdeadline=10,")
Expand All @@ -27,7 +32,7 @@ function service_cb(io, t::CodeGenerators.ServiceType, ctx::CodeGenerators.Conte
println(io, "\tmax_recieve_message_length = 4*1024*1024,")
println(
io,
") = gRPCClient.gRPCServiceClient{$request_type, $(rpc.request_stream), $response_type, $(rpc.response_stream)}(",
") = gRPCClient.gRPCServiceClient{TRequest, $(rpc.request_stream), TResponse, $(rpc.response_stream)}(",
)
println(io, "\thost, port, \"$rpc_path\";")
println(io, "\tsecure=secure,")
Expand Down
10 changes: 5 additions & 5 deletions src/Streaming.jl
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ function grpc_async_stream_response(
if response_buf === nothing
continue
end
response = decode(ProtoDecoder(response_buf), TResponse)
response = _decode_message(response_buf, TResponse)
put!(channel, response)
end
catch ex
Expand Down Expand Up @@ -158,7 +158,7 @@ function grpc_async_request(
max_recieve_message_length = max_recieve_message_length,
)

request_task = Threads.@spawn grpc_async_stream_request(req, request)
request_task = _spawn(() -> grpc_async_stream_request(req, request), client)
errormonitor(request_task)

req
Expand Down Expand Up @@ -222,7 +222,7 @@ function grpc_async_request(
max_recieve_message_length = max_recieve_message_length,
)

response_task = Threads.@spawn grpc_async_stream_response(req, response)
response_task = _spawn(() -> grpc_async_stream_response(req, response), client)
errormonitor(response_task)

req
Expand Down Expand Up @@ -286,10 +286,10 @@ function grpc_async_request(
max_recieve_message_length = max_recieve_message_length,
)

request_task = Threads.@spawn grpc_async_stream_request(req, request)
request_task = _spawn(() -> grpc_async_stream_request(req, request), client)
errormonitor(request_task)

response_task = Threads.@spawn grpc_async_stream_response(req, response)
response_task = _spawn(() -> grpc_async_stream_response(req, response), client)
errormonitor(response_task)

req
Expand Down
2 changes: 1 addition & 1 deletion src/Unary.jl
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ function grpc_async_request(
max_recieve_message_length = max_recieve_message_length,
)

Threads.@spawn begin
_spawn(client) do
try
response = grpc_async_await(client, req)
put!(channel, gRPCAsyncChannelResponse{TResponse}(index, response, nothing))
Expand Down
14 changes: 14 additions & 0 deletions src/Utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,20 @@ function nullstring(x::Vector{UInt8})
String(x[1:(first_zero_idx-1)])
end

# Spawn a task either sticky (pinned to the spawning thread, like `@async`,
# a coroutine model good for IO-bound work and compatible with a single thread)
# or migratable (`Threads.@spawn`, enabling multithreading for CPU-bound work).
# This is the single seam the `gRPCCURL` `sticky` policy flows through. The
# handle- and client-typed methods (defined alongside those types) source
# `sticky` from `grpc.sticky`, so call sites need not reach into the handle.
function _spawn(f; sticky::Bool = false)
if sticky
return @async f()
else
return Threads.@spawn f()
end
end

# On Windows x64 OS_HANDLE does not like curl_sock_t (Int32)
function CROSS_PLATFORM_OS_HANDLE(sock::curl_socket_t)
fd = @static if Sys.iswindows()
Expand Down
Loading
Loading