Feature [client]: add supervised connection pool with legacy mode opt…#523
Feature [client]: add supervised connection pool with legacy mode opt…#523VladoPlavsic wants to merge 3 commits intoelixir-grpc:masterfrom
Conversation
…-out Introduces a built-in HTTP/2 connection pool for every GRPC.Stub.connect/2 call. Connections are checked out before each RPC and returned afterwards, with stream counts tracked per connection. Supports configurable pool size, overflow capacity, per-connection stream limits, and optional health-check pings — all via a :pool option on connect/2. Adds legacy mode (config :grpc, pool_enabled: false) to restore the pre-pool single-connection behaviour for gradual migration. Adds nil max_overflow support for unbounded overflow connections. Includes full unit and integration test coverage for pool, overflow, and legacy paths.
|
Hello @VladoPlavsic, thank you for the PR. We have some open PRs for the gRPC client and we need to review them before taking a look here. |
|
|
||
| @spec start_for_address(Channel.t(), term(), non_neg_integer(), keyword()) :: | ||
| {:ok, Channel.t()} | {:error, any()} | ||
| def start_for_address(%Channel{} = vc, host, port, norm_opts) do |
There was a problem hiding this comment.
Previously RPC.Client.Connection was doing this:
defp connect_real_channel(%Channel{scheme: "unix"} = vc, path, port, opts, adapter) do
%Channel{vc | host: path, port: port}
|> adapter.connect(opts[:adapter_opts])
end
defp connect_real_channel(%Channel{} = vc, host, port, opts, adapter) do
%Channel{vc | host: host, port: port}
|> adapter.connect(opts[:adapter_opts])
endShould GRPC.Client.Pool.start_for_address/4 also take care of the path vs host thing?
I worry doing this would break for anyone who tries to do something like:
GRPC.Stub.connect("unix:///tmp/grpc.sock")There was a problem hiding this comment.
defp connect_real_channel(%Channel{scheme: "unix"} = vc, path, port, opts, adapter) do
%Channel{vc | host: path, port: port}
|> adapter.connect(opts[:adapter_opts])
end
defp connect_real_channel(%Channel{} = vc, host, port, opts, adapter) do
%Channel{vc | host: host, port: port}
|> adapter.connect(opts[:adapter_opts])
end
as far as I can tell these two functions are identical - except for second parameter name. That's why I decided to remove it (the resolution happens before this function call)
| old_leases = Map.get(leases, channel.id, []) | ||
| old_lease = Enum.find(old_leases, fn %State.Lease{caller_pid: pid} -> pid == caller_pid end) | ||
|
|
||
| new_leases = | ||
| Enum.reject(old_leases, fn %State.Lease{caller_pid: pid} -> pid == caller_pid end) |
There was a problem hiding this comment.
Can there be more than one matching "old" lease? If so, it returns just one and then removes all of them, instead of just the one it finds. Would that be an issue? 🤔
| old_leases = Map.get(leases, channel.id, []) | |
| old_lease = Enum.find(old_leases, fn %State.Lease{caller_pid: pid} -> pid == caller_pid end) | |
| new_leases = | |
| Enum.reject(old_leases, fn %State.Lease{caller_pid: pid} -> pid == caller_pid end) | |
| old_leases = Map.get(leases, channel.id, []) | |
| old_lease = Enum.find(old_leases, fn %State.Lease{caller_pid: pid} -> pid == caller_pid end) | |
| new_leases = List.delete(old_leases, old_lease) |
There was a problem hiding this comment.
gRPC call is blocking, and we lease to a PID that requires a channel, so even if you were to do from a single process something like
Task.async(fn -> execute_your_grpc end)
we would lease a channel to the task process, so, to answer your question, no, there shouldn't (can't?) be a case where we lease a channel to same PID twice.
Co-authored-by: Noah Betzen <noah@nezteb.net>
Introduces a built-in HTTP/2 connection pool for every GRPC.Stub.connect/2
call. Connections are checked out before each RPC and returned afterwards,
with stream counts tracked per connection. Supports configurable pool size,
overflow capacity, per-connection stream limits, and optional health-check
pings — all via a :pool option on connect/2.
Adds legacy mode (config :grpc, pool_enabled: false) to restore the
pre-pool single-connection behaviour for gradual migration. Adds nil
max_overflow support for unbounded overflow connections. Includes full
unit and integration test coverage for pool, overflow, and legacy paths.
Closes: #522
Full patch notes
TL;DR - For legacy mode see Legacy Mode section at the bottom
Connection Pool — Patch Notes
Summary
This PR introduces a built-in connection pool for the Elixir gRPC client. Every call to
GRPC.Stub.connect/2(orGRPC.Client.Connection.connect/2) now starts a supervised pool oflong-lived HTTP/2 connections instead of opening a single raw connection. Callers share those
connections transparently: the client checks out a connection before each RPC and returns it
afterwards, keeping stream counts accurate. Pool size, overflow capacity, and per-connection
stream limits are all configurable via a single
:pooloption.What Changed
New modules
GRPC.Client.Poolstart_for_address/4,stop_for_address/1,checkout/1,checkin/2GRPC.Client.Pool.ConfigGRPC.Client.Pool.SupervisorGRPC.Client.Pool.ServerGenServerthat tracks channels, open-stream counts, and leasesGRPC.Client.Pool.ImplementationGRPC.Client.Pool.HealthCheck.DynamicSupervisorGRPC.Client.Pool.HealthCheck.ServerModified modules
GRPC.Channel— addedpool: reference() | nilfield. The field isnilon raw channels andholds a pool reference on virtual channels returned by
connect/2.GRPC.Client.Application— registersGRPC.Client.Pool.Registryat startup so poolsupervisors and servers can be found by
pool_refwithout additional setup.GRPC.Client.Connection— replaced directadapter.connectcalls withGRPC.Client.Pool.start_for_address/4. Thedo_disconnectprivate function now callsGRPC.Client.Pool.stop_for_address/1instead ofadapter.disconnect/1. Added:poolto theKeyword.validate!options list with sensible defaults.GRPC.Stub— thecall/5implementation now goes throughacquire_channel/2(poolcheckout) and
release_channel/2(pool checkin) bracketing every RPC, replacing the previousad-hoc
pick_channel+ liveness check.Benefits
per-call handshake cost.
max_streamscaps how many concurrent requests share a singleconnection. When a connection is saturated, the pool opens an overflow connection instead of
stacking unlimited streams.
clears its leases, and lets Gun re-establish the session in the background. Callers do not need
to handle reconnection logic.
GRPC.Status.resource_exhausted()is returned immediately rather than stacking requestsindefinitely.
pool state inspectable at any time via
:sys.get_state/1.Breaking Changes
1.
GRPC.Stub.connect/2returns a virtual channel, not a raw connectionPreviously the returned
%GRPC.Channel{}hadadapter_payloadpopulated with the underlyingconnection PID (e.g.
%{conn_pid: pid}). That field is nownilon the virtual channel —internal connection details belong to the pool, not the caller.
Before:
After:
2.
GRPC.Stub.disconnect/2return value changedPreviously the disconnected channel carried
adapter_payload: %{conn_pid: nil}. Now it carriespool: nil.Before:
After:
3. Connection-process crash messages are no longer forwarded to the caller
Previously, if the underlying Gun connection process crashed, an
{:EXIT, pid, reason}messagecould propagate to the process that opened the connection (when it was trap-exiting). The pool
now owns all connections and handles those exits internally. Callers will no longer receive
connection-crash messages.
New API:
:pooloptionGRPC.Stub.connect/2andGRPC.Client.Connection.connect/2accept a new:poolkeywordoption:
All three keys are optional; omitting
:poolentirely uses%{size: 1, max_overflow: 0, max_streams: nil}.Setting
max_overflow: nilremoves the overflow cap entirely — the pool opens new connections ondemand whenever all existing connections are saturated, with no upper bound beyond what the server
and OS allow.
Migration Guide
Remove
adapter_payloadpattern matches onconnectresults. If your code inspectschannel.adapter_payload.conn_pidafter connecting, remove that assertion. The pool nowmanages connection PIDs internally.
Update
disconnectpattern matches. Replace checks foradapter_payload: %{conn_pid: nil}with
pool: nil.Remove manual connection-crash handling. If your process was trapping exits and handling
{:EXIT, conn_pid, _}to detect drops, that logic is no longer needed. The pool handlesreconnection automatically.
Tune pool size for your workload (optional). The default of one connection is conservative.
For services with moderate to high RPC concurrency, consider increasing
:sizeor setting:max_overflowto absorb traffic bursts without opening unbounded connections.Legacy Mode (opt-out)
If you need to temporarily disable the pool and restore the pre-pool behaviour, set the following
in your config:
With the pool disabled:
GRPC.Stub.connect/2callsadapter.connectdirectly and returns a channel withadapter_payload: %{conn_pid: pid}andpool: nil— exactly as before.pick_channel+Process.alive?liveness check instead of poolcheckout/checkin.
GRPC.Stub.disconnect/1callsadapter.disconnectdirectly and returns a channel withadapter_payload: %{conn_pid: nil}.The default is
pool_enabled: true. This option is intended as a temporary escape hatch whilemigrating — not as a permanent configuration.