composer require thesis/grpc- Requirements
- Unary requests
- Implementing the Server
- Starting the Server
- Using the Client
- TLS and mTLS
- Target Addressing
- Load Balancing
- Endpoint Resolution
- Error handling
- Compression
- Interceptors
- Stream decorators
- Client streaming
- Server streaming
- Bidirectional streaming
- Graceful Shutdown
This library implements non-blocking gRPC for PHP. It supports all the advantages of the gRPC framework, including client, server, and bidirectional streaming — not to mention the ability to run PHP as a gRPC server, not just a client.
To use this library you will need thesis/protoc-plugin and thesis/protobuf. The former, as the name suggests, is a protoc plugin that generates a gRPC client and server in the conventional gRPC style, so it is recommended to read the plugin documentation first. Once you've done that, you're ready to start building gRPC-based communication.
Let's start with unary requests — the simplest form of gRPC communication. A unary request performs a single round trip between the client and the server: send a request, get a response. Think of it as classical HTTP. Consider the following proto schema:
syntax = "proto3";
package auth.api.v1;
message AuthenticateRequest {
string user = 1;
string pass = 2;
}
message AuthenticateResponse {
string token = 1;
}
service AuthenticationService {
rpc Authenticate(AuthenticateRequest) returns (AuthenticateResponse);
}Assuming your .proto files live in a protos/ directory alongside src/, and you want to generate code into genproto/, run the following command (see the plugin documentation for installation instructions):
protoc \
--plugin=protoc-gen-php-plugin=/usr/local/bin/protoc-gen-php \
protos/*.proto \
--php-plugin_out=genprotoThis will produce an Auth/Api/V1/ directory inside genproto/ containing the following files:
AuthenticateRequest.phpAuthenticateResponse.phpAuthenticationServiceClient.phpAuthenticationServiceServer.phpAuthenticationServiceServerRegistry.phpautoload.metadata.phpProtosAuthV1DescriptorRegistry.php
If you have read the plugin documentation, you will already know how to work with these files. Once you have registered autoload.metadata.php in your composer.json and configured a PSR-4 namespace for your generated gRPC code, you are ready to implement the server.
To implement a gRPC server, you need to provide a concrete implementation of the generated AuthenticationServiceServer interface:
use Amp\Cancellation;
use Auth\Api\V1\AuthenticateRequest;
use Auth\Api\V1\AuthenticateResponse;
use Auth\Api\V1\AuthenticationServiceServer;
use Google\Rpc\Code;
use Google\Rpc\PreconditionFailure;
use Thesis\Grpc\InvokeError;
use Thesis\Grpc\Metadata;
/**
* @api
*/
final readonly class AuthenticationServer implements AuthenticationServiceServer
{
#[\Override]
public function authenticate(AuthenticateRequest $request, Metadata $md, Cancellation $cancellation): AuthenticateResponse
{
if ($request->user === 'root' && $request->pass === 'secret') {
return new AuthenticateResponse('supertoken');
}
throw new InvokeError(Code::FAILED_PRECONDITION, 'Invalid authentication credentials', [
new PreconditionFailure([
new PreconditionFailure\Violation('auth', 'credentials', 'invalid credentials'),
]),
]);
}
}<?php
declare(strict_types=1);
use Auth\Api\V1\AuthenticationServiceServerRegistry;
use Thesis\Grpc\Server;
use function Amp\trapSignal;
$server = new Server\Builder()
->withServices(new AuthenticationServiceServerRegistry(
new AuthenticationServer(),
))
->build();
$server->start();
trapSignal([\SIGINT, \SIGTERM]);
$server->stop();By default, the server listens on 0.0.0.0:50051. To change the address, use the withAddresses builder method:
$server = new Server\Builder()
->withAddresses('0.0.0.0:8080')
// ...
->build();Unlike the server, the client requires no implementation — it is already generated for you. Simply create a connection and pass it to the generated client:
use Auth\Api\V1\AuthenticateRequest;
use Auth\Api\V1\AuthenticationServiceClient;
use Thesis\Grpc\Client;
$client = new AuthenticationServiceClient(
new Client\Builder()
->build(),
);
$response = $client->authenticate(new AuthenticateRequest('root', 'secret'));
dump($response->token); // supertokenJust like the server builder, the client builder lets you customize the connection. For example, you can override the host if the server is not running on the default address:
$client = new AuthenticationServiceClient(
new Client\Builder()
->withHost('dns:///my-grpc-server:8080')
->build(),
);The library supports both standard TLS (server authentication) and mTLS (mutual client/server authentication) via TransportCredentials on both the server and client sides.
For TLS, the server needs a certificate and private key, while the client needs a trusted CA and the expected server name:
use Amp\Socket\Certificate;
use Thesis\Grpc\Client;
use Thesis\Grpc\Server;
$server = new Server\Builder()
->withTransportCredentials(
new Server\TransportCredentials()
->withDefaultCertificate(new Certificate('/certs/server.crt', '/certs/server.key')),
)
->build();
$client = new Client\Builder()
->withTransportCredentials(
new Client\TransportCredentials()
->withCaCert('/certs/ca.crt')
->withPeerName('localhost'),
)
->build();For mTLS, additionally enable client verification on the server and provide a client certificate on the client side:
use Amp\Socket\Certificate;
use Thesis\Grpc\Client;
use Thesis\Grpc\Server;
$server = new Server\Builder()
->withTransportCredentials(
new Server\TransportCredentials()
->withDefaultCertificate(new Certificate('/certs/server.crt', '/certs/server.key'))
->withCaCert('/certs/ca.crt')
->withPeerName('client')
->withVerifyPeer(),
)
->build();
$client = new Client\Builder()
->withHost('ipv4:127.0.0.1:50051')
->withTransportCredentials(
new Client\TransportCredentials()
->withCaCert('/certs/ca.crt')
->withPeerName('localhost')
->withCertificate(new Certificate('/certs/client.crt', '/certs/client.key')),
)
->build();Practical recommendations:
- Make sure the server certificate includes SAN entries (
DNS/IP) matching the value passed towithPeerName()on the client. - For mTLS, the client certificate should include the proper extension (
extendedKeyUsage=clientAuth), and the server certificate should includeextendedKeyUsage=serverAuth. - Use certificates signed by a trusted CA and modern signature algorithms (for example, SHA-256).
The client resolves server addresses using the gRPC Name Resolution specification.
The target string passed to withHost() follows the format scheme:endpoint, where the scheme determines how the address is interpreted and resolved.
Supported schemes:
| Scheme | Format | Description |
|---|---|---|
dns |
dns:///host:port or dns://authority/host:port |
Resolves the hostname via DNS. Supports periodic re-resolution based on TTL. |
ipv4 |
ipv4:addr1:port1,addr2:port2 |
A comma-separated list of IPv4 addresses. No DNS lookup is performed. |
ipv6 |
ipv6:[addr1]:port1,[addr2]:port2 |
A comma-separated list of IPv6 addresses in bracket notation. |
unix |
unix:///path/to/socket |
Connects via a Unix domain socket. |
passthrough |
passthrough:///address |
Passes the address through as-is, without any resolution. |
If no scheme is specified, dns is assumed:
// These two are equivalent:
->withHost('my-grpc-server:50051')
->withHost('dns:///my-grpc-server:50051')The dns scheme supports an optional authority section that specifies a custom DNS server:
// Resolve via a specific DNS server on port 53
->withHost('dns://10.0.0.1:53/my-grpc-server:50051')Multiple addresses can be specified for ipv4 and ipv6 schemes, enabling client-side load balancing without DNS:
->withHost('ipv4:10.0.0.1:50051,10.0.0.2:50051,10.0.0.3:50051')When endpoint resolution returns multiple addresses — either from DNS or from a multi-address target — the client uses a load balancer to distribute requests across them. The load balancing policy is configured via withLoadBalancer():
use Thesis\Grpc\Client\Builder;
use Thesis\Grpc\Client\LoadBalancer\RoundRobinFactory;
$client = new AuthenticationServiceClient(
new Builder()
->withHost('ipv4:10.0.0.1:50051,10.0.0.2:50051')
->withLoadBalancer(new RoundRobinFactory())
->build(),
);The library ships with two built-in policies:
PickFirstFactory(default) — shuffles the endpoint list, picks one, and sticks with it for all subsequent requests. If a re-resolution returns an updated endpoint list that still contains the current endpoint, it remains selected; otherwise a new one is chosen.RoundRobinFactory— distributes requests evenly across all available endpoints in a cyclic order.
When DNS returns records with a TTL, the dns resolver will periodically re-resolve the hostname and update the load balancer with the new set of endpoints. The re-resolution interval is clamped between a configurable minimum and maximum (30 and 300 seconds by default).
To implement a custom load balancing strategy, you need two classes: a LoadBalancer and a LoadBalancerFactory.
The LoadBalancer interface has two methods:
pick()selects an endpoint for each RPC call.refresh()is called when endpoint resolution produces an updated list of addresses.
Here is an example of a random load balancer that picks a random endpoint for every request:
use Random\Randomizer;
use Thesis\Grpc\Client\Endpoint;
use Thesis\Grpc\Client\LoadBalancer;
use Thesis\Grpc\Client\LoadBalancerFactory;
use Thesis\Grpc\Client\PickContext;
final class RandomBalancer implements LoadBalancer
{
/**
* @param non-empty-list<Endpoint> $endpoints
*/
public function __construct(
private array $endpoints,
private readonly Randomizer $randomizer = new Randomizer(),
) {}
#[\Override]
public function refresh(array $endpoints): void
{
$this->endpoints = $endpoints;
}
#[\Override]
public function pick(PickContext $context): Endpoint
{
return $this->endpoints[$this->randomizer->getInt(0, \count($this->endpoints) - 1)];
}
}
final readonly class RandomBalancerFactory implements LoadBalancerFactory
{
#[\Override]
public function name(): string
{
return 'random';
}
#[\Override]
public function create(array $endpoints): LoadBalancer
{
return new RandomBalancer($endpoints);
}
}Register it via the builder:
$client = new AuthenticationServiceClient(
new Client\Builder()
->withHost('ipv4:10.0.0.1:50051,10.0.0.2:50051')
->withLoadBalancer(new RandomBalancerFactory())
->build(),
);The client selects an endpoint resolver based on the target scheme. For most use cases, the built-in resolvers work out of the box. However, if you need to customize resolver behavior for a specific scheme — for example, to configure the DNS resolver with a cache or adjust re-resolution intervals — you can register a custom resolver instance:
use Amp\Cache\LocalCache;
use Thesis\Grpc\Client\Builder;
use Thesis\Grpc\Client\EndpointResolver\DnsResolver;
use Thesis\Grpc\Client\Scheme;
$client = new AuthenticationServiceClient(
new Builder()
->withHost('dns:///my-grpc-server:50051')
->withEndpointResolver(Scheme::Dns, new DnsResolver(
cache: new LocalCache(),
minResolveInterval: 60,
maxResolveInterval: 600,
))
->build(),
);The resolver registered for a given scheme takes precedence over the default. If no custom resolver is registered, the client falls back to the built-in one:
| Scheme | Default Resolver |
|---|---|
dns |
DnsResolver — resolves via DNS, re-resolves periodically based on TTL |
ipv4, ipv6, unix |
StaticResolver — converts target addresses directly into endpoints |
passthrough |
PassthroughResolver — passes the raw target address as a single endpoint |
You can also implement the EndpointResolver interface to build a fully custom resolver — for example, one backed by a service registry such as Consul or etcd.
If the server returns an error, you can handle it with a standard try/catch block:
try {
$response = $client->authenticate(new AuthenticateRequest('root', 'secret'));
dump($response->token);
} catch (InvokeError $e) {
dump($e->statusCode, $e->statusMessage, $e->details);
}You can configure compression between the client and the server. If the client compresses protobuf messages using gzip, the server must support it as well — otherwise you will receive an UNIMPLEMENTED error. Compression is configured via the respective builders.
Configuring compression on the server:
$server = new Server\Builder()
->withServices(new AuthenticationServiceServerRegistry(
new AuthenticationServer(),
))
->withCompressors(new GzipCompressor())
->build();And on the client:
$client = new AuthenticationServiceClient(
new Client\Builder()
->withCompression(new GzipCompressor())
->build(),
);A server can support multiple compression algorithms simultaneously, serving different clients with different configurations, while each client uses exactly one. The library ships with built-in implementations for the most popular algorithms: gzip, deflate, and snappy. Some of these may require the corresponding PHP extension to be installed. If you need a custom compression strategy, implement the Thesis\Grpc\Compression\Compressor interface.
The library supports interceptors for both the client and the server. For example, if you want to restrict access to authorized clients only, you can write a server-side interceptor:
use Amp\Cancellation;
use Google\Rpc\Code;
use Thesis\Grpc\InvokeError;
use Thesis\Grpc\Metadata;
use Thesis\Grpc\Server;
use Thesis\Grpc\Server\StreamInfo;
use Thesis\Grpc\ServerStream;
final readonly class ServerAuthenticationInterceptor implements Server\Interceptor
{
#[\Override]
public function intercept(
ServerStream $stream,
StreamInfo $info,
Metadata $md,
Cancellation $cancellation,
callable $next,
): void {
if (!str_ends_with($info->method, 'Authenticate') && $md->value('Authorization') !== 'supertoken') {
throw new InvokeError(Code::UNAUTHENTICATED);
}
$next($stream, $info, $md, $cancellation);
}
}Incoming headers — through which authorization tokens are typically passed — are available via Metadata. The interceptor also has access to StreamInfo, which exposes the current RPC method name, allowing you to selectively skip authorization checks for specific methods such as the authentication endpoint itself.
Registering the interceptor on the server:
$server = new Server\Builder()
->withInterceptors(new ServerAuthenticationInterceptor())
->build();The same pattern applies on the client side. Here is a client interceptor that automatically attaches an authorization token to every outgoing request:
use Amp\Cancellation;
use Thesis\Grpc\Client;
use Thesis\Grpc\Client\Invoke;
use Thesis\Grpc\ClientStream;
use Thesis\Grpc\Metadata;
final readonly class ClientAuthenticationInterceptor implements Client\Interceptor
{
#[\Override]
public function intercept(
Invoke $invoke,
Metadata $md,
Cancellation $cancellation,
callable $next,
): ClientStream {
return $next($invoke, $md->with('Authorization', 'supertoken'), $cancellation);
}
}Registering it on the client:
$client = new AuthenticationServiceClient(
new Client\Builder()
->withInterceptors(new ClientAuthenticationInterceptor())
->build(),
);For more advanced use cases, you can decorate streams directly. This allows you to intercept every individual message flowing through a stream — useful for logging, performance metrics, tracing, or any other cross-cutting concern.
Let's implement a server-side stream decorator that logs every incoming and outgoing message:
use Psr\Log\LoggerInterface;
use Thesis\Grpc\Server;
use Thesis\Grpc\ServerStream;
/**
* @api
* @template-covariant In of object
* @template Out of object
* @template-extends Server\DecoratedStream<In, Out>
*/
final class LoggingServerStream extends Server\DecoratedStream
{
public function __construct(
ServerStream $stream,
private readonly LoggerInterface $logger,
) {
parent::__construct($stream);
}
#[\Override]
public function send(object $message): void
{
$this->logger->info('message "{type}" was sent', [
'type' => $message::class,
]);
parent::send($message);
}
#[\Override]
public function receive(): object
{
$message = parent::receive();
$this->logger->info('message "{type}" was received', [
'type' => $message::class,
]);
return $message;
}
}Now wire it up via an interceptor that substitutes the original stream with the decorated one:
use Amp\Cancellation;
use Psr\Log\LoggerInterface;
use Thesis\Grpc\Metadata;
use Thesis\Grpc\Server;
use Thesis\Grpc\Server\StreamInfo;
use Thesis\Grpc\ServerStream;
final readonly class LoggingServerInterceptor implements Server\Interceptor
{
public function __construct(
private LoggerInterface $logger,
) {}
#[\Override]
public function intercept(ServerStream $stream, StreamInfo $info, Metadata $md, Cancellation $cancellation, callable $next): void
{
$next(new LoggingServerStream($stream, $this->logger), $info, $md, $cancellation);
}
}Register the interceptor on the server:
$server = new Server\Builder()
->withInterceptors(new LoggingServerInterceptor(/** LoggerInterface implementation */))
->build();The same approach works on the client side. First, the stream decorator:
use Psr\Log\LoggerInterface;
use Thesis\Grpc\Client;
use Thesis\Grpc\ClientStream;
/**
* @api
* @template In of object
* @template-covariant Out of object
* @template-extends Client\DecoratedStream<In, Out>
*/
final readonly class LoggingClientStream extends Client\DecoratedStream
{
public function __construct(
ClientStream $stream,
private LoggerInterface $logger,
) {
parent::__construct($stream);
}
#[\Override]
public function send(object $message): void
{
$this->logger->info('message "{type}" was sent', [
'type' => $message::class,
]);
parent::send($message);
}
#[\Override]
public function receive(): object
{
$message = parent::receive();
$this->logger->info('message "{type}" was received', [
'type' => $message::class,
]);
return $message;
}
}The interceptor that substitutes the stream:
use Amp\Cancellation;
use Psr\Log\LoggerInterface;
use Thesis\Grpc\Client;
use Thesis\Grpc\Client\Invoke;
use Thesis\Grpc\ClientStream;
use Thesis\Grpc\Metadata;
final readonly class LoggingClientInterceptor implements Client\Interceptor
{
public function __construct(
private LoggerInterface $logger,
) {}
#[\Override]
public function intercept(Invoke $invoke, Metadata $md, Cancellation $cancellation, callable $next): ClientStream
{
return new LoggingClientStream(
$next($invoke, $md, $cancellation),
$this->logger,
);
}
}And register it via the client builder:
$client = new AuthenticationServiceClient(
new Client\Builder()
->withInterceptors(new LoggingClientInterceptor(/** LoggerInterface implementation */))
->build(),
);If you have prior gRPC experience in PHP, nothing so far will have surprised you. Let's move on to what truly sets gRPC apart — streams.
We'll start with client streaming. In this pattern, the client sends a stream of messages to the server, and once the stream is complete, the server returns a single response and closes the connection. As an example, consider a proto schema where the client sends a series of words and asks the server to count the total number of bytes received:
syntax = "proto3";
package counter.api.v1;
message Word {
bytes value = 1;
}
message Info {
int32 count = 1;
}
service CounterService {
rpc Count(stream Word) returns (Info);
}After generating the code, implement the server:
use Amp\Cancellation;
use Counter\Api\V1\CounterServiceServer;
use Counter\Api\V1\Info;
use Counter\Api\V1\Word;
use Thesis\Grpc\Metadata;
use Thesis\Grpc\Server;
/**
* @api
*/
final readonly class CounterServer implements CounterServiceServer
{
#[\Override]
public function count(Server\ClientStreamChannel $stream, Metadata $md, Cancellation $cancellation): Info
{
$bytes = 0;
foreach ($stream as $message) {
$bytes += \strlen($message->value);
}
return new Info($bytes);
}
}And register it:
$server = new Server\Builder()
->withServices(new AuthenticationServiceServerRegistry(new AuthenticationServer()))
->withServices(new CounterServiceServerRegistry(new CounterServer()))
->build();Note that a single server instance can host multiple service handlers.
On the client side, calling count() returns a stream with two methods: send(), which transmits messages to the server, and close(), which finalizes the stream and waits for the server's response. Everything operates in non-blocking mode — if needed, the stream can be wrapped in \Amp\async().
use Counter\Api\V1\CounterServiceClient;
use Counter\Api\V1\Word;
use Thesis\Grpc\Client;
$client = new CounterServiceClient(
new Client\Builder()
->build(),
);
$words = $client->count();
for ($i = 0; $i < 10; ++$i) {
$words->send(new Word(random_bytes(10)));
}
dump($words->close()->count); // 100The next pattern is server streaming — the mirror image of client streaming. It is again a unidirectional flow of messages, but this time originating from the server. The client sends an initial request to establish the connection, then receives a stream it can read from until the server closes it.
As an example, let's ask the server to generate a set of random words, with the desired count specified in the initial request. We'll adapt the proto schema from the previous section:
syntax = "proto3";
package counter.api.v1;
message Word {
bytes value = 1;
}
message Info {
int32 count = 1;
}
service CounterService {
rpc Count(Info) returns (stream Word);
}The server implementation now iterates up to the requested count, streaming each word back to the client:
use Amp\Cancellation;
use Counter\Api\V1\Info;
use Counter\Api\V1\Word;
use Thesis\Grpc\Metadata;
use Thesis\Grpc\Server;
/**
* @api
*/
final readonly class CounterServer implements CounterServiceServer
{
#[\Override]
public function count(Info $request, Metadata $md, Cancellation $cancellation): iterable
{
for ($i = 0; $i < $request->count; ++$i) {
yield new Word(random_bytes(10));
}
}
}On the client side, the stream is iterable — the loop runs until the server closes the connection:
use Counter\Api\V1\CounterServiceClient;
use Counter\Api\V1\Info;
use Thesis\Grpc\Client;
$client = new CounterServiceClient(
new Client\Builder()
->build(),
);
$words = $client->count(new Info(10));
$bytes = 0;
foreach ($words as $word) {
$bytes += \strlen($word->value);
}
dump($bytes);Finally, the most powerful and most interesting streaming pattern — bidirectional streaming. Both the client and the server can send and receive messages freely, at any time. A classic use case for this pattern is a message queue. Let's model a simple one and implement it end to end.
syntax = "proto3";
package queue.api.v1;
message FromClient {
message ReadRequest {
string topic = 1;
int32 qos = 2;
}
message AckRequest {}
oneof event {
ReadRequest read_request = 1;
AckRequest ack_request = 2;
}
}
message FromServer {
message Message {
string content = 1;
}
message WriteRequest {
Message message = 1;
}
message CloseRequest {}
oneof event {
WriteRequest write_request = 1;
CloseRequest close_request = 2;
}
}
service QueueService {
rpc Subscribe(stream FromClient) returns (stream FromServer);
}The communication flow works as follows:
- The client sends a subscription request (
FromClient\ReadRequest) specifying a topic and a QoS (quantity of messages to receive). - The server delivers the requested number of messages (
FromServer\WriteRequest), then waits for an acknowledgement (FromClient\AckRequest) from the client. - This cycle repeats until the client decides to unsubscribe (
Stream::close()). - The stream is then closed by both sides.
Server implementation:
use Amp\Cancellation;
use Queue\Api\V1\FromClient;
use Queue\Api\V1\FromServer;
use Queue\Api\V1\QueueServiceServer;
use Thesis\Grpc\Metadata;
use Thesis\Grpc\Server;
final readonly class QueueServer implements QueueServiceServer
{
#[\Override]
public function subscribe(Server\BidirectionalStreamChannel $stream, Metadata $md, Cancellation $cancellation): void
{
foreach ($stream as $request) {
if ($request->event instanceof FromClient\EventReadRequest) {
dump("subscription for queue '{$request->event->readRequest->topic}' received");
for ($i = 0; $i < $request->event->readRequest->qos; ++$i) {
$stream->send(new FromServer(new FromServer\EventWriteRequest(
new FromServer\WriteRequest(new FromServer\Message(random_bytes(10)))
)));
}
$request = $stream->receive();
\assert($request->event instanceof FromClient\EventAckRequest);
dump('messages acked');
}
}
$stream->send(new FromServer(new FromServer\EventCloseRequest(new FromServer\CloseRequest())));
$stream->close();
dump('stream closed');
}
}Client implementation:
use Queue\Api\V1\FromClient;
use Queue\Api\V1\QueueServiceClient;
use Thesis\Grpc\Client;
$client = new QueueServiceClient(new Client\Builder()->build());
$queue = $client->subscribe();
$queue->send(new FromClient(
new FromClient\EventReadRequest(
new FromClient\ReadRequest(
topic: 'messages',
qos: 5,
),
),
));
for ($i = 0; $i < 5; ++$i) {
dump($queue->receive());
}
$queue->send(new FromClient(new FromClient\EventAckRequest(new FromClient\AckRequest())));
$queue->close();
dump($queue->receive()); // CloseRequestWhen Server::stop() is called, the server will stop accepting new connections and requests, then wait for all active handlers to finish processing.
Once all handlers have completed, stop() returns normally. If the handlers take too long, you can pass a Cancellation to set an upper bound on how long to wait:
use Amp\TimeoutCancellation;
use Auth\Api\V1\AuthenticationServiceServerRegistry;
use Thesis\Grpc\Server;
use function Amp\trapSignal;
$server = new Server\Builder()
->withServices(new AuthenticationServiceServerRegistry(
new AuthenticationServer(),
))
->build();
$server->start();
trapSignal([\SIGINT, \SIGTERM]);
// Wait up to 30 seconds for all active handlers to finish.
// If the timeout expires, stop() throws a CancelledException.
$server->stop(new TimeoutCancellation(30));During shutdown, the server notifies all active handlers via the Cancellation argument passed to each handler method.
This means your handler implementations should check cancellation and avoid ignoring the $cancellation argument — otherwise the server will have no way to signal them to stop, and shutdown will block until they finish on their own.