Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ check: fixer-check rector-check composer-validate composer-normalize-check deps-

compile-test-stub:
docker run --rm \
--pull always \
--user $(CONTAINER_USER) \
-v $(PWD):/workspace \
-w /workspace \
Expand Down
163 changes: 162 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ composer require thesis/grpc
- [Implementing the Server](#implementing-the-server)
- [Starting the Server](#starting-the-server)
- [Using the Client](#using-the-client)
- [Target Addressing](#target-addressing)
- [Load Balancing](#load-balancing)
- [Endpoint Resolution](#endpoint-resolution)
- [Error handling](#error-handling)
- [Compression](#compression)
- [Interceptors](#interceptors)
Expand Down Expand Up @@ -163,11 +166,169 @@ Just like the server builder, the client builder lets you customize the connecti
```php
$client = new AuthenticationServiceClient(
new Client\Builder()
->withHost('http://127.0.0.1:8080')
->withHost('dns:///my-grpc-server:8080')
->build(),
);
```

### Target Addressing

The client resolves server addresses using the [gRPC Name Resolution](https://github.com/grpc/grpc/blob/master/doc/naming.md) specification.
The target string passed to `withHost()` follows the format `scheme:endpoint`, where the scheme determines how the address is interpreted and resolved.

Supported schemes:

| Scheme | Format | Description |
|---------------|---------------------------------------------------|------------------------------------------------------------------------------|
| `dns` | `dns:///host:port` or `dns://authority/host:port` | Resolves the hostname via DNS. Supports periodic re-resolution based on TTL. |
| `ipv4` | `ipv4:addr1:port1,addr2:port2` | A comma-separated list of IPv4 addresses. No DNS lookup is performed. |
| `ipv6` | `ipv6:[addr1]:port1,[addr2]:port2` | A comma-separated list of IPv6 addresses in bracket notation. |
| `unix` | `unix:///path/to/socket` | Connects via a Unix domain socket. |
| `passthrough` | `passthrough:///address` | Passes the address through as-is, without any resolution. |

If no scheme is specified, `dns` is assumed:

```php
// These two are equivalent:
->withHost('my-grpc-server:50051')
->withHost('dns:///my-grpc-server:50051')
```

The `dns` scheme supports an optional authority section that specifies a custom DNS server:

```php
// Resolve via a specific DNS server on port 53
->withHost('dns://10.0.0.1:53/my-grpc-server:50051')
```

Multiple addresses can be specified for `ipv4` and `ipv6` schemes, enabling client-side load balancing without DNS:

```php
->withHost('ipv4:10.0.0.1:50051,10.0.0.2:50051,10.0.0.3:50051')
```

### Load Balancing

When endpoint resolution returns multiple addresses — either from DNS or from a multi-address target — the client uses a load balancer to distribute requests across them. The load balancing policy is configured via `withLoadBalancer()`:

```php
use Thesis\Grpc\Client\Builder;
use Thesis\Grpc\Client\LoadBalancer\RoundRobinFactory;

$client = new AuthenticationServiceClient(
new Builder()
->withHost('ipv4:10.0.0.1:50051,10.0.0.2:50051')
->withLoadBalancer(new RoundRobinFactory())
->build(),
);
```

The library ships with two built-in policies:

- **`PickFirstFactory`** (default) — shuffles the endpoint list, picks one, and sticks with it for all subsequent requests. If a re-resolution returns an updated endpoint list that still contains the current endpoint, it remains selected; otherwise a new one is chosen.
- **`RoundRobinFactory`** — distributes requests evenly across all available endpoints in a cyclic order.

When DNS returns records with a TTL, the `dns` resolver will periodically re-resolve the hostname and update the load balancer with the new set of endpoints. The re-resolution interval is clamped between a configurable minimum and maximum (30 and 300 seconds by default).

#### Implementing a Custom Load Balancer

To implement a custom load balancing strategy, you need two classes: a `LoadBalancer` and a `LoadBalancerFactory`.

The `LoadBalancer` interface has two methods:
- `pick()` selects an endpoint for each RPC call.
- `refresh()` is called when endpoint resolution produces an updated list of addresses.

Here is an example of a random load balancer that picks a random endpoint for every request:

```php
use Random\Randomizer;
use Thesis\Grpc\Client\Endpoint;
use Thesis\Grpc\Client\LoadBalancer;
use Thesis\Grpc\Client\LoadBalancerFactory;
use Thesis\Grpc\Client\PickContext;

final class RandomBalancer implements LoadBalancer
{
/**
* @param non-empty-list<Endpoint> $endpoints
*/
public function __construct(
private array $endpoints,
private readonly Randomizer $randomizer = new Randomizer(),
) {}

#[\Override]
public function refresh(array $endpoints): void
{
$this->endpoints = $endpoints;
}

#[\Override]
public function pick(PickContext $context): Endpoint
{
return $this->endpoints[$this->randomizer->getInt(0, \count($this->endpoints) - 1)];
}
}

final readonly class RandomBalancerFactory implements LoadBalancerFactory
{
#[\Override]
public function name(): string
{
return 'random';
}

#[\Override]
public function create(array $endpoints): LoadBalancer
{
return new RandomBalancer($endpoints);
}
}
```

Register it via the builder:

```php
$client = new AuthenticationServiceClient(
new Client\Builder()
->withHost('ipv4:10.0.0.1:50051,10.0.0.2:50051')
->withLoadBalancer(new RandomBalancerFactory())
->build(),
);
```

### Endpoint Resolution

The client selects an endpoint resolver based on the target scheme. For most use cases, the built-in resolvers work out of the box. However, if you need to customize resolver behavior for a specific scheme — for example, to configure the DNS resolver with a cache or adjust re-resolution intervals — you can register a custom resolver instance:

```php
use Amp\Cache\LocalCache;
use Thesis\Grpc\Client\Builder;
use Thesis\Grpc\Client\EndpointResolver\DnsResolver;
use Thesis\Grpc\Client\Scheme;

$client = new AuthenticationServiceClient(
new Builder()
->withHost('dns:///my-grpc-server:50051')
->withEndpointResolver(Scheme::Dns, new DnsResolver(
cache: new LocalCache(),
minResolveInterval: 60,
maxResolveInterval: 600,
))
->build(),
);
```

The resolver registered for a given scheme takes precedence over the default. If no custom resolver is registered, the client falls back to the built-in one:

| Scheme | Default Resolver |
|------------------------|----------------------------------------------------------------------------|
| `dns` | `DnsResolver` — resolves via DNS, re-resolves periodically based on TTL |
| `ipv4`, `ipv6`, `unix` | `StaticResolver` — converts target addresses directly into endpoints |
| `passthrough` | `PassthroughResolver` — passes the raw target address as a single endpoint |

You can also implement the `EndpointResolver` interface to build a fully custom resolver — for example, one backed by a service registry such as Consul or etcd.

### Error Handling

If the server returns an error, you can handle it with a standard `try/catch` block:
Expand Down
5 changes: 4 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@
"php": "^8.4",
"amphp/amp": "^3.1",
"amphp/byte-stream": "^2.1",
"amphp/cache": "^2.0",
"amphp/dns": "^2.4",
"amphp/http": "^2.1",
"amphp/http-client": "^5.3",
"amphp/http-server": "^3.4",
"amphp/pipeline": "^1.2",
"amphp/socket": "^2.3",
"amphp/sync": "^2.3",
"league/uri": "^7.8",
"psr/log": "^3.0",
"revolt/event-loop": "1.0.8",
"thesis/endian": "^0.3.0",
"thesis/endian": ">= 0.3.2",
"thesis/googleapis-rpc-types": "^0.1.6",
"thesis/package-version": "^0.1.2",
"thesis/protobuf": "^0.1.8",
Expand Down
2 changes: 2 additions & 0 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@ public function createStream(
Metadata $md = new Metadata(),
Cancellation $cancellation = new NullCancellation(),
): ClientStream;

public function close(): void;
}
27 changes: 27 additions & 0 deletions src/Client/Address.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

declare(strict_types=1);

namespace Thesis\Grpc\Client;

/**
* @api
*/
final readonly class Address implements \Stringable
{
/**
* @param non-empty-string $value
*/
public function __construct(
public string $value,
) {}

/**
* @return non-empty-string
*/
#[\Override]
public function __toString(): string
{
return $this->value;
}
}
99 changes: 80 additions & 19 deletions src/Client/Builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
use Amp\Socket\ConnectContext;
use Amp\Socket\SocketConnector;
use Thesis\Grpc\Client;
use Thesis\Grpc\Client\Internal\Connection;
use Thesis\Grpc\Client\Internal\Http2;
use Thesis\Grpc\Compression\Compressor;
use Thesis\Grpc\Compression\IdentityCompressor;
use Thesis\Grpc\Encoding\Encoder;
Expand All @@ -22,7 +24,7 @@
*/
final class Builder
{
private const string DEFAULT_HOST = 'http://localhost:50051';
private const string DEFAULT_HOST = '127.0.0.1:50051';
private const float DEFAULT_CONNECT_TIMEOUT = 10;
private const int DEFAULT_CONNECTION_LIMIT = \PHP_INT_MAX;

Expand Down Expand Up @@ -52,6 +54,16 @@ final class Builder
*/
private ?Decoder $protobuf = null;

private ?LoadBalancerFactory $loadBalancerFactory = null;

/** @var \SplObjectStorage<Scheme, EndpointResolver> */
private \SplObjectStorage $endpointResolvers;

public function __construct()
{
$this->endpointResolvers = new \SplObjectStorage();
}

public function withProtobuf(Decoder $decoder): self
{
$builder = clone $this;
Expand Down Expand Up @@ -144,33 +156,82 @@ public function withSocketConnector(SocketConnector $connector): self
return $builder;
}

public function withLoadBalancer(LoadBalancerFactory $factory): self
{
$builder = clone $this;
$builder->loadBalancerFactory = $factory;

return $builder;
}

public function withEndpointResolver(Scheme $scheme, EndpointResolver $resolver): self
{
$builder = clone $this;
$builder->endpointResolvers[$scheme] = $resolver;

return $builder;
}

public static function buildDefault(): Client
{
return new self()->build();
}

public function build(): Client
{
$target = Target::parse($this->host ?? self::DEFAULT_HOST);

$encoder = $this->encoder ?? ProtobufEncoder::default();
$compressor = $this->compressor ?? IdentityCompressor::Compressor;
$protobuf = $this->protobuf ?? Decoder\Builder::buildDefault();
$loadBalancerFactory = $this->loadBalancerFactory ?? new LoadBalancer\PickFirstFactory();
$uriFactory = new Http2\UriFactory($this->credentials !== null ? Internal\HttpScheme::Https : Internal\HttpScheme::Http);

$resolver = $this->endpointResolvers[$target->scheme] ?? match ($target->scheme) {
Scheme::Dns => new EndpointResolver\DnsResolver(),
Scheme::Passthrough => new EndpointResolver\PassthroughResolver(),
Scheme::Ipv4, Scheme::Ipv6, Scheme::Unix => new EndpointResolver\StaticResolver(),
};

$interceptor = new Http2\InterceptorComposer([
...$this->interceptors,
new Http2\AppendControlMetadataInterceptor(
$encoder->name(),
$compressor->name(),
),
]);

$httpclient = $this->httpclient ?? new HttpClientBuilder()
->usingPool(ConnectionLimitingPool::byAuthority(
$this->connectionLimit,
new DefaultConnectionFactory(
$this->connector,
new ConnectContext()
->withConnectTimeout($this->connectTimeout)
->withTlsContext($this->credentials?->createClientContext()),
),
))
->skipDefaultUserAgent()
->skipAutomaticCompression()
->skipDefaultAcceptHeader()
->build();

return new Internal\AmphpHttpClient(
host: $this->host ?? self::DEFAULT_HOST,
client: $this->httpclient ?? new HttpClientBuilder()
->usingPool(ConnectionLimitingPool::byAuthority(
$this->connectionLimit,
new DefaultConnectionFactory(
$this->connector,
new ConnectContext()
->withConnectTimeout($this->connectTimeout)
->withTlsContext($this->credentials?->createClientContext()),
new Connection\LazyConnection(
static fn() => new Connection\DefaultConnection(
target: $target,
resolver: $resolver,
loadBalancerFactory: $loadBalancerFactory,
interceptor: $interceptor,
streams: new Http2\StreamFactory(
http: $httpclient,
uri: $uriFactory,
errors: new Http2\ErrorHandler($protobuf),
encoder: $encoder,
compressor: $compressor,
),
))
->skipDefaultUserAgent()
->skipAutomaticCompression()
->skipDefaultAcceptHeader()
->build(),
encoder: $this->encoder ?? ProtobufEncoder::default(),
compressor: $this->compressor ?? IdentityCompressor::Compressor,
protobuf: $this->protobuf ?? Decoder\Builder::buildDefault(),
interceptors: $this->interceptors,
),
),
);
}
}
Loading