diff --git a/Makefile b/Makefile index a0102f3..488aee1 100644 --- a/Makefile +++ b/Makefile @@ -135,6 +135,7 @@ check: fixer-check rector-check composer-validate composer-normalize-check deps- compile-test-stub: docker run --rm \ + --pull always \ --user $(CONTAINER_USER) \ -v $(PWD):/workspace \ -w /workspace \ diff --git a/README.md b/README.md index 044751d..7821d1f 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,9 @@ composer require thesis/grpc - [Implementing the Server](#implementing-the-server) - [Starting the Server](#starting-the-server) - [Using the Client](#using-the-client) +- [Target Addressing](#target-addressing) +- [Load Balancing](#load-balancing) +- [Endpoint Resolution](#endpoint-resolution) - [Error handling](#error-handling) - [Compression](#compression) - [Interceptors](#interceptors) @@ -163,11 +166,169 @@ Just like the server builder, the client builder lets you customize the connecti ```php $client = new AuthenticationServiceClient( new Client\Builder() - ->withHost('http://127.0.0.1:8080') + ->withHost('dns:///my-grpc-server:8080') ->build(), ); ``` +### Target Addressing + +The client resolves server addresses using the [gRPC Name Resolution](https://github.com/grpc/grpc/blob/master/doc/naming.md) specification. +The target string passed to `withHost()` follows the format `scheme:endpoint`, where the scheme determines how the address is interpreted and resolved. + +Supported schemes: + +| Scheme | Format | Description | +|---------------|---------------------------------------------------|------------------------------------------------------------------------------| +| `dns` | `dns:///host:port` or `dns://authority/host:port` | Resolves the hostname via DNS. Supports periodic re-resolution based on TTL. | +| `ipv4` | `ipv4:addr1:port1,addr2:port2` | A comma-separated list of IPv4 addresses. No DNS lookup is performed. | +| `ipv6` | `ipv6:[addr1]:port1,[addr2]:port2` | A comma-separated list of IPv6 addresses in bracket notation. | +| `unix` | `unix:///path/to/socket` | Connects via a Unix domain socket. | +| `passthrough` | `passthrough:///address` | Passes the address through as-is, without any resolution. | + +If no scheme is specified, `dns` is assumed: + +```php +// These two are equivalent: +->withHost('my-grpc-server:50051') +->withHost('dns:///my-grpc-server:50051') +``` + +The `dns` scheme supports an optional authority section that specifies a custom DNS server: + +```php +// Resolve via a specific DNS server on port 53 +->withHost('dns://10.0.0.1:53/my-grpc-server:50051') +``` + +Multiple addresses can be specified for `ipv4` and `ipv6` schemes, enabling client-side load balancing without DNS: + +```php +->withHost('ipv4:10.0.0.1:50051,10.0.0.2:50051,10.0.0.3:50051') +``` + +### Load Balancing + +When endpoint resolution returns multiple addresses — either from DNS or from a multi-address target — the client uses a load balancer to distribute requests across them. The load balancing policy is configured via `withLoadBalancer()`: + +```php +use Thesis\Grpc\Client\Builder; +use Thesis\Grpc\Client\LoadBalancer\RoundRobinFactory; + +$client = new AuthenticationServiceClient( + new Builder() + ->withHost('ipv4:10.0.0.1:50051,10.0.0.2:50051') + ->withLoadBalancer(new RoundRobinFactory()) + ->build(), +); +``` + +The library ships with two built-in policies: + +- **`PickFirstFactory`** (default) — shuffles the endpoint list, picks one, and sticks with it for all subsequent requests. If a re-resolution returns an updated endpoint list that still contains the current endpoint, it remains selected; otherwise a new one is chosen. +- **`RoundRobinFactory`** — distributes requests evenly across all available endpoints in a cyclic order. + +When DNS returns records with a TTL, the `dns` resolver will periodically re-resolve the hostname and update the load balancer with the new set of endpoints. The re-resolution interval is clamped between a configurable minimum and maximum (30 and 300 seconds by default). + +#### Implementing a Custom Load Balancer + +To implement a custom load balancing strategy, you need two classes: a `LoadBalancer` and a `LoadBalancerFactory`. + +The `LoadBalancer` interface has two methods: +- `pick()` selects an endpoint for each RPC call. +- `refresh()` is called when endpoint resolution produces an updated list of addresses. + +Here is an example of a random load balancer that picks a random endpoint for every request: + +```php +use Random\Randomizer; +use Thesis\Grpc\Client\Endpoint; +use Thesis\Grpc\Client\LoadBalancer; +use Thesis\Grpc\Client\LoadBalancerFactory; +use Thesis\Grpc\Client\PickContext; + +final class RandomBalancer implements LoadBalancer +{ + /** + * @param non-empty-list $endpoints + */ + public function __construct( + private array $endpoints, + private readonly Randomizer $randomizer = new Randomizer(), + ) {} + + #[\Override] + public function refresh(array $endpoints): void + { + $this->endpoints = $endpoints; + } + + #[\Override] + public function pick(PickContext $context): Endpoint + { + return $this->endpoints[$this->randomizer->getInt(0, \count($this->endpoints) - 1)]; + } +} + +final readonly class RandomBalancerFactory implements LoadBalancerFactory +{ + #[\Override] + public function name(): string + { + return 'random'; + } + + #[\Override] + public function create(array $endpoints): LoadBalancer + { + return new RandomBalancer($endpoints); + } +} +``` + +Register it via the builder: + +```php +$client = new AuthenticationServiceClient( + new Client\Builder() + ->withHost('ipv4:10.0.0.1:50051,10.0.0.2:50051') + ->withLoadBalancer(new RandomBalancerFactory()) + ->build(), +); +``` + +### Endpoint Resolution + +The client selects an endpoint resolver based on the target scheme. For most use cases, the built-in resolvers work out of the box. However, if you need to customize resolver behavior for a specific scheme — for example, to configure the DNS resolver with a cache or adjust re-resolution intervals — you can register a custom resolver instance: + +```php +use Amp\Cache\LocalCache; +use Thesis\Grpc\Client\Builder; +use Thesis\Grpc\Client\EndpointResolver\DnsResolver; +use Thesis\Grpc\Client\Scheme; + +$client = new AuthenticationServiceClient( + new Builder() + ->withHost('dns:///my-grpc-server:50051') + ->withEndpointResolver(Scheme::Dns, new DnsResolver( + cache: new LocalCache(), + minResolveInterval: 60, + maxResolveInterval: 600, + )) + ->build(), +); +``` + +The resolver registered for a given scheme takes precedence over the default. If no custom resolver is registered, the client falls back to the built-in one: + +| Scheme | Default Resolver | +|------------------------|----------------------------------------------------------------------------| +| `dns` | `DnsResolver` — resolves via DNS, re-resolves periodically based on TTL | +| `ipv4`, `ipv6`, `unix` | `StaticResolver` — converts target addresses directly into endpoints | +| `passthrough` | `PassthroughResolver` — passes the raw target address as a single endpoint | + +You can also implement the `EndpointResolver` interface to build a fully custom resolver — for example, one backed by a service registry such as Consul or etcd. + ### Error Handling If the server returns an error, you can handle it with a standard `try/catch` block: diff --git a/composer.json b/composer.json index b1fdb29..0b199bb 100644 --- a/composer.json +++ b/composer.json @@ -21,15 +21,18 @@ "php": "^8.4", "amphp/amp": "^3.1", "amphp/byte-stream": "^2.1", + "amphp/cache": "^2.0", + "amphp/dns": "^2.4", "amphp/http": "^2.1", "amphp/http-client": "^5.3", "amphp/http-server": "^3.4", "amphp/pipeline": "^1.2", "amphp/socket": "^2.3", "amphp/sync": "^2.3", + "league/uri": "^7.8", "psr/log": "^3.0", "revolt/event-loop": "1.0.8", - "thesis/endian": "^0.3.0", + "thesis/endian": ">= 0.3.2", "thesis/googleapis-rpc-types": "^0.1.6", "thesis/package-version": "^0.1.2", "thesis/protobuf": "^0.1.8", diff --git a/src/Client.php b/src/Client.php index 46a2e98..57a1adc 100644 --- a/src/Client.php +++ b/src/Client.php @@ -40,4 +40,6 @@ public function createStream( Metadata $md = new Metadata(), Cancellation $cancellation = new NullCancellation(), ): ClientStream; + + public function close(): void; } diff --git a/src/Client/Address.php b/src/Client/Address.php new file mode 100644 index 0000000..6f06dee --- /dev/null +++ b/src/Client/Address.php @@ -0,0 +1,27 @@ +value; + } +} diff --git a/src/Client/Builder.php b/src/Client/Builder.php index 09518a9..e78af49 100644 --- a/src/Client/Builder.php +++ b/src/Client/Builder.php @@ -11,6 +11,8 @@ use Amp\Socket\ConnectContext; use Amp\Socket\SocketConnector; use Thesis\Grpc\Client; +use Thesis\Grpc\Client\Internal\Connection; +use Thesis\Grpc\Client\Internal\Http2; use Thesis\Grpc\Compression\Compressor; use Thesis\Grpc\Compression\IdentityCompressor; use Thesis\Grpc\Encoding\Encoder; @@ -22,7 +24,7 @@ */ final class Builder { - private const string DEFAULT_HOST = 'http://localhost:50051'; + private const string DEFAULT_HOST = '127.0.0.1:50051'; private const float DEFAULT_CONNECT_TIMEOUT = 10; private const int DEFAULT_CONNECTION_LIMIT = \PHP_INT_MAX; @@ -52,6 +54,16 @@ final class Builder */ private ?Decoder $protobuf = null; + private ?LoadBalancerFactory $loadBalancerFactory = null; + + /** @var \SplObjectStorage */ + private \SplObjectStorage $endpointResolvers; + + public function __construct() + { + $this->endpointResolvers = new \SplObjectStorage(); + } + public function withProtobuf(Decoder $decoder): self { $builder = clone $this; @@ -144,6 +156,22 @@ public function withSocketConnector(SocketConnector $connector): self return $builder; } + public function withLoadBalancer(LoadBalancerFactory $factory): self + { + $builder = clone $this; + $builder->loadBalancerFactory = $factory; + + return $builder; + } + + public function withEndpointResolver(Scheme $scheme, EndpointResolver $resolver): self + { + $builder = clone $this; + $builder->endpointResolvers[$scheme] = $resolver; + + return $builder; + } + public static function buildDefault(): Client { return new self()->build(); @@ -151,26 +179,59 @@ public static function buildDefault(): Client public function build(): Client { + $target = Target::parse($this->host ?? self::DEFAULT_HOST); + + $encoder = $this->encoder ?? ProtobufEncoder::default(); + $compressor = $this->compressor ?? IdentityCompressor::Compressor; + $protobuf = $this->protobuf ?? Decoder\Builder::buildDefault(); + $loadBalancerFactory = $this->loadBalancerFactory ?? new LoadBalancer\PickFirstFactory(); + $uriFactory = new Http2\UriFactory($this->credentials !== null ? Internal\HttpScheme::Https : Internal\HttpScheme::Http); + + $resolver = $this->endpointResolvers[$target->scheme] ?? match ($target->scheme) { + Scheme::Dns => new EndpointResolver\DnsResolver(), + Scheme::Passthrough => new EndpointResolver\PassthroughResolver(), + Scheme::Ipv4, Scheme::Ipv6, Scheme::Unix => new EndpointResolver\StaticResolver(), + }; + + $interceptor = new Http2\InterceptorComposer([ + ...$this->interceptors, + new Http2\AppendControlMetadataInterceptor( + $encoder->name(), + $compressor->name(), + ), + ]); + + $httpclient = $this->httpclient ?? new HttpClientBuilder() + ->usingPool(ConnectionLimitingPool::byAuthority( + $this->connectionLimit, + new DefaultConnectionFactory( + $this->connector, + new ConnectContext() + ->withConnectTimeout($this->connectTimeout) + ->withTlsContext($this->credentials?->createClientContext()), + ), + )) + ->skipDefaultUserAgent() + ->skipAutomaticCompression() + ->skipDefaultAcceptHeader() + ->build(); + return new Internal\AmphpHttpClient( - host: $this->host ?? self::DEFAULT_HOST, - client: $this->httpclient ?? new HttpClientBuilder() - ->usingPool(ConnectionLimitingPool::byAuthority( - $this->connectionLimit, - new DefaultConnectionFactory( - $this->connector, - new ConnectContext() - ->withConnectTimeout($this->connectTimeout) - ->withTlsContext($this->credentials?->createClientContext()), + new Connection\LazyConnection( + static fn() => new Connection\DefaultConnection( + target: $target, + resolver: $resolver, + loadBalancerFactory: $loadBalancerFactory, + interceptor: $interceptor, + streams: new Http2\StreamFactory( + http: $httpclient, + uri: $uriFactory, + errors: new Http2\ErrorHandler($protobuf), + encoder: $encoder, + compressor: $compressor, ), - )) - ->skipDefaultUserAgent() - ->skipAutomaticCompression() - ->skipDefaultAcceptHeader() - ->build(), - encoder: $this->encoder ?? ProtobufEncoder::default(), - compressor: $this->compressor ?? IdentityCompressor::Compressor, - protobuf: $this->protobuf ?? Decoder\Builder::buildDefault(), - interceptors: $this->interceptors, + ), + ), ); } } diff --git a/src/Client/Endpoint.php b/src/Client/Endpoint.php new file mode 100644 index 0000000..1075517 --- /dev/null +++ b/src/Client/Endpoint.php @@ -0,0 +1,20 @@ +address->value === $other->address->value; + } +} diff --git a/src/Client/EndpointResolver.php b/src/Client/EndpointResolver.php new file mode 100644 index 0000000..9d3d77b --- /dev/null +++ b/src/Client/EndpointResolver.php @@ -0,0 +1,19 @@ + $cache + */ + public function __construct( + private ?AmphpDnsResolver $dns = null, + private ?Cache $cache = null, + private float $minResolveInterval = self::MIN_RESOLVE_INTERVAL, + private float $maxResolveInterval = self::MAX_RESOLVE_INTERVAL, + ) {} + + #[\Override] + public function resolve( + Target $target, + EndpointResolverListener $listener, + Cancellation $cancellation, + ): Resolution { + $resolver = $this->dns ?? $this->configureResolver($target); + + $result = $this->resolveNow( + $resolver, + $target, + $cancellation, + ); + + $resolveTTL = $this->computeResolveTTL($result); + + if ($resolveTTL > 0) { + EventLoop::queue( + $this->resolveLater(...), + $resolver, + $listener, + $target, + $resolveTTL, + $cancellation, + ); + } + + return $result->resolution; + } + + private function resolveNow( + AmphpDnsResolver $resolver, + Target $target, + Cancellation $cancellation, + ): ResolveResult { + $endpoints = []; + $ttl = null; + + foreach ($target->addresses as $address) { + $records = $resolver->resolve($address->host, cancellation: $cancellation); + + foreach ($records as $record) { + $ip = $record->getValue(); + + if ($record->getType() === DnsRecord::AAAA) { + $ip = "[{$ip}]"; + } + + $endpoints[] = new Endpoint( + new Address("{$ip}:{$address->port}"), + ); + + if ($record->getTtl() !== null) { + $ttl = min($record->getTtl(), $ttl ?? \PHP_INT_MAX); + } + } + } + + return new ResolveResult( + new Resolution($endpoints), + $ttl, + ); + } + + private function resolveLater( + AmphpDnsResolver $resolver, + EndpointResolverListener $listener, + Target $target, + float $ttl, + Cancellation $cancellation, + ): void { + while (true) { + $suspension = EventLoop::getSuspension(); + $timerId = EventLoop::delay($ttl, $suspension->resume(...)); + $cancellationId = $cancellation->subscribe($suspension->throw(...)); + + try { + $suspension->suspend(); + + $result = $this->resolveNow($resolver, $target, $cancellation); + $listener->onResolve($result->resolution); + + $ttl = $this->computeResolveTTL($result); + + if ($ttl <= 0) { + return; + } + } catch (CancelledException) { + return; + } catch (\Throwable $e) { + $listener->onResolve($e); + } finally { + EventLoop::cancel($timerId); + $cancellation->unsubscribe($cancellationId); + } + } + } + + private function configureResolver(Target $target): AmphpDnsResolver + { + $configLoader = null; + if ($target->authority !== null) { + $configLoader = new StaticDnsConfigLoader( + new DnsConfig([$target->authority]), + ); + } + + return new Rfc1035StubDnsResolver( + $this->cache, + $configLoader, + ); + } + + private function computeResolveTTL(ResolveResult $result): float + { + return max( + $this->minResolveInterval, + min($this->maxResolveInterval, $result->ttl ?? $this->minResolveInterval), + ); + } +} + +final readonly class ResolveResult +{ + public function __construct( + public Resolution $resolution, + public ?float $ttl = null, + ) {} +} diff --git a/src/Client/EndpointResolver/PassthroughResolver.php b/src/Client/EndpointResolver/PassthroughResolver.php new file mode 100644 index 0000000..d9246ff --- /dev/null +++ b/src/Client/EndpointResolver/PassthroughResolver.php @@ -0,0 +1,25 @@ +opaque))]); + } +} diff --git a/src/Client/EndpointResolver/StaticResolver.php b/src/Client/EndpointResolver/StaticResolver.php new file mode 100644 index 0000000..9a94be6 --- /dev/null +++ b/src/Client/EndpointResolver/StaticResolver.php @@ -0,0 +1,31 @@ + new Endpoint(new Address((string) $address)), + $target->addresses, + ), + ); + } +} diff --git a/src/Client/EndpointResolverListener.php b/src/Client/EndpointResolverListener.php new file mode 100644 index 0000000..cfaf0ba --- /dev/null +++ b/src/Client/EndpointResolverListener.php @@ -0,0 +1,13 @@ + $interceptors - */ public function __construct( - string $host, - DelegateHttpClient $client, - Encoding\Encoder $encoder, - Compression\Compressor $compressor, - Decoder $protobuf, - array $interceptors = [], - ) { - $this->interceptor = new Http2\InterceptorComposer([ - ...$interceptors, - new Http2\AppendControlMetadataInterceptor( - $encoder->name(), - $compressor->name(), - ), - ]); - $this->streams = new Http2\StreamFactory( - http: $client, - uri: new Http2\UriFactory($host), - errors: new Http2\ErrorHandler($protobuf), - encoder: $encoder, - compressor: $compressor, - ); - } + private Connection $connection, + ) {} #[\Override] public function invoke( @@ -58,7 +26,7 @@ public function invoke( Metadata $md = new Metadata(), Cancellation $cancellation = new NullCancellation(), ): object { - $stream = $this->createStream( + $stream = $this->connection->createStream( $invoke, $md, $cancellation, @@ -76,11 +44,16 @@ public function createStream( Metadata $md = new Metadata(), Cancellation $cancellation = new NullCancellation(), ): ClientStream { - return $this->interceptor->intercept( // @phpstan-ignore return.type + return $this->connection->createStream( $invoke, $md, $cancellation, - $this->streams->create(...), ); } + + #[\Override] + public function close(): void + { + $this->connection->close(); + } } diff --git a/src/Client/Internal/Connection.php b/src/Client/Internal/Connection.php new file mode 100644 index 0000000..884fb26 --- /dev/null +++ b/src/Client/Internal/Connection.php @@ -0,0 +1,31 @@ + $invoke + * @return ClientStream + */ + public function createStream( + Invoke $invoke, + Metadata $md = new Metadata(), + Cancellation $cancellation = new NullCancellation(), + ): ClientStream; + + public function close(Cancellation $cancellation = new NullCancellation()): void; +} diff --git a/src/Client/Internal/Connection/DefaultConnection.php b/src/Client/Internal/Connection/DefaultConnection.php new file mode 100644 index 0000000..61da9fb --- /dev/null +++ b/src/Client/Internal/Connection/DefaultConnection.php @@ -0,0 +1,87 @@ +deferredCancellation = new DeferredCancellation(); + + $resolution = $resolver->resolve( + $target, + $this, + $this->deferredCancellation->getCancellation(), + ); + + $this->balancer = $loadBalancerFactory->create($resolution->endpoints); + } + + #[\Override] + public function createStream( + Invoke $invoke, + Metadata $md = new Metadata(), + Cancellation $cancellation = new NullCancellation(), + ): ClientStream { + $endpoint = $this->balancer->pick(new PickContext($invoke->method, $md)); + + return $this->interceptor->intercept( // @phpstan-ignore return.type + $invoke, + $md, + $cancellation, + fn(Invoke $invoke, Metadata $md, Cancellation $cancellation) => $this->streams->create( + $invoke, + $endpoint->address, + $md, + $cancellation, + ), + ); + } + + #[\Override] + public function close(Cancellation $cancellation = new NullCancellation()): void + { + $this->deferredCancellation->cancel(); + } + + #[\Override] + public function onResolve(Resolution|\Throwable $result): void + { + if ($result instanceof Resolution) { + $this->balancer->refresh($result->endpoints); + } + } +} diff --git a/src/Client/Internal/Connection/LazyConnection.php b/src/Client/Internal/Connection/LazyConnection.php new file mode 100644 index 0000000..5607c82 --- /dev/null +++ b/src/Client/Internal/Connection/LazyConnection.php @@ -0,0 +1,51 @@ + */ + private ?Future $future = null; + + /** + * @param \Closure(): Connection $factory + */ + public function __construct( + private readonly \Closure $factory, + ) {} + + #[\Override] + public function createStream( + Invoke $invoke, + Metadata $md = new Metadata(), + Cancellation $cancellation = new NullCancellation(), + ): ClientStream { + $this->future ??= async($this->factory); + $connection = $this->future->await($cancellation); + + return $connection->createStream($invoke, $md, $cancellation); + } + + #[\Override] + public function close(Cancellation $cancellation = new NullCancellation()): void + { + $future = $this->future; + $this->future = null; + + $future?->await($cancellation)->close($cancellation); + } +} diff --git a/src/Client/Internal/Http2/StreamFactory.php b/src/Client/Internal/Http2/StreamFactory.php index 4d04292..e5d221d 100644 --- a/src/Client/Internal/Http2/StreamFactory.php +++ b/src/Client/Internal/Http2/StreamFactory.php @@ -14,6 +14,7 @@ use Amp\Http\Client\StreamedContent; use Amp\NullCancellation; use Amp\Pipeline; +use Thesis\Grpc\Client\Address; use Thesis\Grpc\Client\Invoke; use Thesis\Grpc\ClientStream; use Thesis\Grpc\Compression\Compressor; @@ -50,6 +51,7 @@ public function __construct( */ public function create( Invoke $invoke, + Address $address, Metadata $md = new Metadata(), Cancellation $cancellation = new NullCancellation(), ): ClientStream { @@ -60,7 +62,7 @@ public function create( $deferred = new DeferredFuture(); $request = new Request( - uri: $this->uri->create($invoke->method), + uri: $this->uri->create($address, $invoke), method: 'POST', body: StreamedContent::fromStream( new ReadableIterableStream($this->codec->encode($send->iterate(), $cancellation)), diff --git a/src/Client/Internal/Http2/UriFactory.php b/src/Client/Internal/Http2/UriFactory.php index 51cb260..2623c7b 100644 --- a/src/Client/Internal/Http2/UriFactory.php +++ b/src/Client/Internal/Http2/UriFactory.php @@ -4,26 +4,24 @@ namespace Thesis\Grpc\Client\Internal\Http2; +use Thesis\Grpc\Client\Address; +use Thesis\Grpc\Client\Internal\HttpScheme; +use Thesis\Grpc\Client\Invoke; + /** * @internal */ final readonly class UriFactory { - /** - * @param non-empty-string $host - */ public function __construct( - private string $host, + private HttpScheme $scheme, ) {} /** - * @param non-empty-string $method * @return non-empty-string */ - public function create(string $method): string + public function create(Address $address, Invoke $invoke): string { - $clear = static fn(string $path) => trim($path, '/'); - - return \sprintf('%s/%s', $clear($this->host), $clear($method)); + return \sprintf('%s://%s/%s', $this->scheme->value, $address->value, ltrim($invoke->method, '/')); } } diff --git a/src/Client/Internal/HttpScheme.php b/src/Client/Internal/HttpScheme.php new file mode 100644 index 0000000..4f1a92d --- /dev/null +++ b/src/Client/Internal/HttpScheme.php @@ -0,0 +1,14 @@ + $endpoints + */ + public function refresh(array $endpoints): void; + + public function pick(PickContext $context): Endpoint; +} diff --git a/src/Client/LoadBalancer/PickFirst.php b/src/Client/LoadBalancer/PickFirst.php new file mode 100644 index 0000000..7ff7591 --- /dev/null +++ b/src/Client/LoadBalancer/PickFirst.php @@ -0,0 +1,55 @@ + $endpoints + */ + public function __construct( + array $endpoints, + private readonly Randomizer $randomizer, + ) { + $this->current = $this->doPick($endpoints); + } + + #[\Override] + public function refresh(array $endpoints): void + { + $this->current = $this->doPick($endpoints, $this->current); + } + + #[\Override] + public function pick(PickContext $context): Endpoint + { + return $this->current; + } + + /** + * @param non-empty-list $endpoints + */ + private function doPick(array $endpoints, ?Endpoint $current = null): Endpoint + { + /** @var non-empty-list $endpoints */ + $endpoints = $this->randomizer->shuffleArray($endpoints); + + if ($current === null || !array_any($endpoints, $current->equals(...))) { + $current = $endpoints[0]; + } + + return $current; + } +} diff --git a/src/Client/LoadBalancer/PickFirstFactory.php b/src/Client/LoadBalancer/PickFirstFactory.php new file mode 100644 index 0000000..12bedc0 --- /dev/null +++ b/src/Client/LoadBalancer/PickFirstFactory.php @@ -0,0 +1,31 @@ +randomizer); + } +} diff --git a/src/Client/LoadBalancer/RoundRobin.php b/src/Client/LoadBalancer/RoundRobin.php new file mode 100644 index 0000000..c08dea0 --- /dev/null +++ b/src/Client/LoadBalancer/RoundRobin.php @@ -0,0 +1,42 @@ + $endpoints + */ + public function __construct( + private array $endpoints, + ) { + $this->count = \count($endpoints); + } + + #[\Override] + public function refresh(array $endpoints): void + { + $this->endpoints = $endpoints; + $this->count = \count($endpoints); + } + + #[\Override] + public function pick(PickContext $context): Endpoint + { + return $this->endpoints[$this->cursor++ % $this->count]; // @phpstan-ignore offsetAccess.notFound + } +} diff --git a/src/Client/LoadBalancer/RoundRobinFactory.php b/src/Client/LoadBalancer/RoundRobinFactory.php new file mode 100644 index 0000000..a87a6cd --- /dev/null +++ b/src/Client/LoadBalancer/RoundRobinFactory.php @@ -0,0 +1,26 @@ + $endpoints + */ + public function create(array $endpoints): LoadBalancer; +} diff --git a/src/Client/PickContext.php b/src/Client/PickContext.php new file mode 100644 index 0000000..e6666d9 --- /dev/null +++ b/src/Client/PickContext.php @@ -0,0 +1,21 @@ + $endpoints + */ + public function __construct( + public array $endpoints, + ) {} +} diff --git a/src/Client/Scheme.php b/src/Client/Scheme.php new file mode 100644 index 0000000..e0a6026 --- /dev/null +++ b/src/Client/Scheme.php @@ -0,0 +1,17 @@ +value}:"; + + if (str_starts_with($target, $prefix)) { + $addr = substr($target, \strlen($prefix)); + if ($addr === '') { + throw new InvalidTarget($target); + } + + return match ($scheme) { + Scheme::Dns => self::parseDns($addr, $target, $scheme), + Scheme::Passthrough => self::parsePassthrough($addr, $target), + Scheme::Ipv4, Scheme::Ipv6 => new self($scheme, self::parseAddresses($addr, $target), opaque: $addr), + Scheme::Unix => new self($scheme, [self::parseUnix($addr, $target)], opaque: $addr), + }; + } + } + + return new self(Scheme::Dns, self::parseAddresses($target), opaque: $target); + } + + /** + * @internal use {@see Target::parse()} instead + * @param non-empty-list $addresses + * @param non-empty-string $opaque Raw value after scheme prefix + * @param ?non-empty-string $authority DNS server address (only for dns://authority/host form) + */ + public function __construct( + public Scheme $scheme, + public array $addresses, + public string $opaque, + public ?string $authority = null, + ) {} + + /** + * @param non-empty-string $addr + * @param non-empty-string $target + * @throws InvalidTarget + */ + private static function parseDns(string $addr, string $target, Scheme $scheme): self + { + $opaque = $addr; + $authority = null; + + if (str_starts_with($addr, '//')) { + $addr = substr($addr, 2); + $slash = strpos($addr, '/'); + if ($slash === false) { + throw new InvalidTarget($target); + } + + $auth = substr($addr, 0, $slash); + if ($auth !== '') { + $authority = $auth; + } + + $addr = substr($addr, $slash + 1); + if ($addr === '') { + throw new InvalidTarget($target); + } + } + + return new self( + $scheme, + self::parseAddresses($addr, $target), + $opaque, + $authority, + ); + } + + /** + * @param non-empty-string $addr + * @param non-empty-string $target + * @throws InvalidTarget + */ + private static function parsePassthrough(string $addr, string $target): self + { + if (!str_starts_with($addr, '//')) { + throw new InvalidTarget($target); + } + + $slash = strpos($addr, '/', 2); + if ($slash === false) { + throw new InvalidTarget($target); + } + + $addr = substr($addr, $slash + 1); + + if ($addr === '') { + throw new InvalidTarget($target); + } + + return new self(Scheme::Passthrough, [new TargetAddress($addr, 0)], $addr); + } + + /** + * @param non-empty-string $addr + * @param non-empty-string $target + * @throws InvalidTarget + */ + private static function parseUnix(string $addr, string $target): TargetAddress + { + if (str_starts_with($addr, '//')) { + $addr = substr($addr, 2); + } + + if ($addr === '' || $addr[0] !== '/') { + throw new InvalidTarget($target); + } + + return new TargetAddress($addr, 0); + } + + /** + * @param non-empty-string $addr + * @param ?non-empty-string $target + * @return non-empty-list + * @throws InvalidTarget + */ + private static function parseAddresses(string $addr, ?string $target = null): array + { + return array_map( + static fn(string $address) => self::parseAddress(trim($address), $target ?? $addr), + explode(',', $addr), + ); + } + + /** + * @param non-empty-string $target + * @throws InvalidTarget + */ + private static function parseAddress(string $addr, string $target): TargetAddress + { + $addr = urldecode($addr); + $uri = Uri::parse("tcp://{$addr}") ?? throw new InvalidTarget($target); + + $host = $uri->getHost() ?? ''; + + if ($host === '') { + throw new InvalidTarget($target); + } + + $port = $uri->getPort() ?? 0; + + if ($port < 1 || $port > 65_535 || $uri->getPath() !== '') { + throw new InvalidTarget($target); + } + + return new TargetAddress($host, $port); + } +} diff --git a/src/Client/TargetAddress.php b/src/Client/TargetAddress.php new file mode 100644 index 0000000..e932e89 --- /dev/null +++ b/src/Client/TargetAddress.php @@ -0,0 +1,35 @@ + $port + */ + public function __construct( + public string $host, + public int $port, + ) {} + + /** + * @return non-empty-string + */ + #[\Override] + public function __toString(): string + { + $host = $this->host; + + if ($this->port > 0) { + $host = "{$host}:{$this->port}"; + } + + return $host; + } +} diff --git a/src/Internal/Protocol/Frame.php b/src/Internal/Protocol/Frame.php index 58d5995..b33e758 100644 --- a/src/Internal/Protocol/Frame.php +++ b/src/Internal/Protocol/Frame.php @@ -18,7 +18,7 @@ public function __construct( ) {} } -const byteOrder = Endian\Order::big; +const byteOrder = Endian\Order::Big; /** * @internal diff --git a/tests/Client/EndpointResolver/DnsResolverTest.php b/tests/Client/EndpointResolver/DnsResolverTest.php new file mode 100644 index 0000000..8ea4e94 --- /dev/null +++ b/tests/Client/EndpointResolver/DnsResolverTest.php @@ -0,0 +1,176 @@ + $records + * @param non-empty-list $endpoints + */ + #[DataProvider('provideResolveCases')] + public function testResolve(Target $target, array $records, array $endpoints): void + { + $deferredCancellation = new DeferredCancellation(); + + $dnsResolver = $this->createMock(AmphpDnsResolver::class); + $dnsResolver + ->expects(self::once()) + ->method('resolve') + ->willReturn($records); + + $listener = $this->createMock(EndpointResolverListener::class); + $listener + ->expects(self::never()) + ->method('onResolve'); + + $resolver = new DnsResolver($dnsResolver); + $resolution = $resolver->resolve($target, $listener, $deferredCancellation->getCancellation()); + $deferredCancellation->cancel(); + + self::assertEquals($endpoints, $resolution->endpoints); + } + + /** + * @return iterable, non-empty-list}> + */ + public static function provideResolveCases(): iterable + { + yield 'single A record' => [ + new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)], 'myhost:50051'), + [new DnsRecord('192.168.0.1', DnsRecord::A, 300)], + [new Endpoint(new Address('192.168.0.1:50051'))], + ]; + + yield 'multiple A records' => [ + new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)], 'myhost:50051'), + [ + new DnsRecord('192.168.0.1', DnsRecord::A, 300), + new DnsRecord('192.168.0.2', DnsRecord::A, 300), + ], + [ + new Endpoint(new Address('192.168.0.1:50051')), + new Endpoint(new Address('192.168.0.2:50051')), + ], + ]; + + yield 'AAAA record wraps in brackets' => [ + new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)], 'myhost:50051'), + [new DnsRecord('::1', DnsRecord::AAAA, 300)], + [new Endpoint(new Address('[::1]:50051'))], + ]; + + yield 'mixed A and AAAA records' => [ + new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)], 'myhost:50051'), + [ + new DnsRecord('192.168.0.1', DnsRecord::A, 300), + new DnsRecord('::1', DnsRecord::AAAA, 300), + ], + [ + new Endpoint(new Address('192.168.0.1:50051')), + new Endpoint(new Address('[::1]:50051')), + ], + ]; + } + + public function testResolveListener(): void + { + $deferredCancellation = new DeferredCancellation(); + + $dnsResolver = $this->createMock(AmphpDnsResolver::class); + $dnsResolver + ->expects(self::atLeastOnce()) + ->method('resolve') + ->willReturn( + [new DnsRecord('192.168.0.1', DnsRecord::A, 1)], + ); + + $listener = $this->createMock(EndpointResolverListener::class); + $listener + ->expects(self::atLeastOnce()) + ->method('onResolve') + ->with( + self::isInstanceOf(Resolution::class), + ); + + $resolver = new DnsResolver($dnsResolver, minResolveInterval: 0.1, maxResolveInterval: 0.1); + $resolver->resolve( + new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)], 'myhost:50051'), + $listener, + $deferredCancellation->getCancellation(), + ); + + delay(0.25); + $deferredCancellation->cancel(); + } + + public function testResolveStopOnCancellation(): void + { + $target = new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)], 'myhost:50051'); + $deferredCancellation = new DeferredCancellation(); + + $dnsResolver = self::createStub(AmphpDnsResolver::class); + $dnsResolver->method('resolve')->willReturn( + [new DnsRecord('192.168.0.1', DnsRecord::A, 300)], + ); + + $listener = $this->createMock(EndpointResolverListener::class); + $listener + ->expects(self::never()) + ->method('onResolve'); + + $resolver = new DnsResolver($dnsResolver, minResolveInterval: 0.5, maxResolveInterval: 0.5); + $resolver->resolve($target, $listener, $deferredCancellation->getCancellation()); + + $deferredCancellation->cancel(); + delay(0.7); + } + + public function testResolveThrows(): void + { + $target = new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)], 'myhost:50051'); + $deferredCancellation = new DeferredCancellation(); + + $dnsResolver = $this->createMock(AmphpDnsResolver::class); + $dnsResolver + ->expects(self::atLeastOnce()) + ->method('resolve') + ->willReturnOnConsecutiveCalls( + [new DnsRecord('192.168.0.1', DnsRecord::A, 1)], + self::throwException(new DnsException('DNS failed')), + ); + + $listener = $this->createMock(EndpointResolverListener::class); + $listener + ->expects(self::atLeastOnce()) + ->method('onResolve') + ->with( + self::isInstanceOf(\Throwable::class), + ); + + $resolver = new DnsResolver($dnsResolver, minResolveInterval: 0.1, maxResolveInterval: 0.1); + $resolver->resolve($target, $listener, $deferredCancellation->getCancellation()); + + delay(0.25); + $deferredCancellation->cancel(); + } +} diff --git a/tests/Client/EndpointResolver/StaticResolverTest.php b/tests/Client/EndpointResolver/StaticResolverTest.php new file mode 100644 index 0000000..366fa65 --- /dev/null +++ b/tests/Client/EndpointResolver/StaticResolverTest.php @@ -0,0 +1,69 @@ + $endpoints + */ + #[DataProvider('provideResolveCases')] + public function testResolve(Target $target, array $endpoints): void + { + $resolver = new StaticResolver(); + $listener = $this->createMock(EndpointResolverListener::class); + $listener + ->expects(self::never()) + ->method('onResolve'); + + $resolution = $resolver->resolve($target, $listener, new NullCancellation()); + + self::assertEquals($endpoints, $resolution->endpoints); + } + + /** + * @return iterable}> + */ + public static function provideResolveCases(): iterable + { + yield 'ipv4: single address' => [ + new Target(Scheme::Ipv4, [new TargetAddress('192.168.0.1', 50_051)], '192.168.0.1:50051'), + [new Endpoint(new Address('192.168.0.1:50051'))], + ]; + + yield 'ipv4: multiple addresses' => [ + new Target(Scheme::Ipv4, [ + new TargetAddress('192.168.0.1', 50_051), + new TargetAddress('192.168.0.2', 50_052), + ], '192.168.0.1:50051,192.168.0.2:50052'), + [ + new Endpoint(new Address('192.168.0.1:50051')), + new Endpoint(new Address('192.168.0.2:50052')), + ], + ]; + + yield 'ipv6: single address' => [ + new Target(Scheme::Ipv6, [new TargetAddress('[::1]', 50_051)], '[::1]:50051'), + [new Endpoint(new Address('[::1]:50051'))], + ]; + + yield 'unix: socket path' => [ + new Target(Scheme::Unix, [new TargetAddress('/var/run/grpc.sock', 0)], '///var/run/grpc.sock'), + [new Endpoint(new Address('/var/run/grpc.sock'))], + ]; + } +} diff --git a/tests/Client/LoadBalancer/PickFirstTest.php b/tests/Client/LoadBalancer/PickFirstTest.php new file mode 100644 index 0000000..d1a48d4 --- /dev/null +++ b/tests/Client/LoadBalancer/PickFirstTest.php @@ -0,0 +1,125 @@ + $endpoints + */ + #[DataProvider('providePickAlwaysReturnsSameEndpointCases')] + public function testPickAlwaysReturnsSameEndpoint(array $endpoints): void + { + $balancer = new PickFirstFactory()->create($endpoints); + $first = $balancer->pick(self::context()); + + for ($i = 0; $i < 10; ++$i) { + self::assertTrue($first->equals($balancer->pick(self::context()))); + } + } + + /** + * @return iterable}> + */ + public static function providePickAlwaysReturnsSameEndpointCases(): iterable + { + yield 'single endpoint' => [ + [new Endpoint(new Address('10.0.0.1:50051'))], + ]; + + yield 'multiple endpoints' => [ + [ + new Endpoint(new Address('10.0.0.1:50051')), + new Endpoint(new Address('10.0.0.2:50051')), + new Endpoint(new Address('10.0.0.3:50051')), + ], + ]; + } + + /** + * @param non-empty-list $initial + * @param non-empty-list $refreshed + */ + #[DataProvider('provideRefreshKeepsPinnedEndpointCases')] + public function testRefreshKeepsPinnedEndpoint(array $initial, array $refreshed): void + { + $balancer = new PickFirstFactory()->create($initial); + $pinned = $balancer->pick(self::context()); + + $balancer->refresh($refreshed); + + self::assertTrue($pinned->equals($balancer->pick(self::context()))); + } + + /** + * @return iterable, non-empty-list}> + */ + public static function provideRefreshKeepsPinnedEndpointCases(): iterable + { + $a = new Endpoint(new Address('10.0.0.1:50051')); + $b = new Endpoint(new Address('10.0.0.2:50051')); + $c = new Endpoint(new Address('10.0.0.3:50051')); + + yield 'same list' => [ + [$a, $b], [$a, $b], + ]; + + yield 'new endpoint added' => [ + [$a, $b], [$a, $b, $c], + ]; + + yield 'order changed but pinned still present' => [ + [$a, $b, $c], [$c, $b, $a], + ]; + } + + /** + * @param non-empty-list $initial + * @param non-empty-list $refreshed + */ + #[DataProvider('provideRefreshSwitchesPinnedEndpointCases')] + public function testRefreshSwitchesPinnedEndpoint(array $initial, array $refreshed): void + { + $balancer = new PickFirstFactory()->create($initial); + $pinned = $balancer->pick(self::context()); + + $balancer->refresh($refreshed); + + self::assertFalse($pinned->equals($balancer->pick(self::context()))); + } + + /** + * @return iterable, non-empty-list}> + */ + public static function provideRefreshSwitchesPinnedEndpointCases(): iterable + { + $a = new Endpoint(new Address('10.0.0.1:50051')); + $b = new Endpoint(new Address('10.0.0.2:50051')); + $c = new Endpoint(new Address('10.0.0.3:50051')); + + yield 'pinned removed, new endpoints' => [ + [$a], [$b, $c], + ]; + + yield 'completely different list' => [ + [$a, $b], [$c], + ]; + } + + private static function context(): PickContext + { + return new PickContext('/test.Service/Method', new Metadata()); + } +} diff --git a/tests/Client/LoadBalancer/RoundRobinTest.php b/tests/Client/LoadBalancer/RoundRobinTest.php new file mode 100644 index 0000000..1c0fd4d --- /dev/null +++ b/tests/Client/LoadBalancer/RoundRobinTest.php @@ -0,0 +1,111 @@ + $endpoints + * @param list $expectedPicks + */ + #[DataProvider('providePickCyclesCases')] + public function testPickCycles(array $endpoints, array $expectedPicks): void + { + $balancer = new RoundRobinFactory()->create($endpoints); + + foreach ($expectedPicks as $expected) { + self::assertTrue($expected->equals($balancer->pick(self::context()))); + } + } + + /** + * @return iterable, list}> + */ + public static function providePickCyclesCases(): iterable + { + $a = new Endpoint(new Address('10.0.0.1:50051')); + $b = new Endpoint(new Address('10.0.0.2:50051')); + $c = new Endpoint(new Address('10.0.0.3:50051')); + + yield 'single endpoint' => [ + [$a], + [$a, $a, $a], + ]; + + yield 'two endpoints' => [ + [$a, $b], + [$a, $b, $a, $b], + ]; + + yield 'three endpoints, full cycle twice' => [ + [$a, $b, $c], + [$a, $b, $c, $a, $b, $c], + ]; + } + + /** + * @param non-empty-list $initial + * @param non-empty-list $refreshed + * @param list $expectedAfterRefresh + */ + #[DataProvider('provideRefreshCases')] + public function testRefresh(array $initial, int $picksBeforeRefresh, array $refreshed, array $expectedAfterRefresh): void + { + $balancer = new RoundRobinFactory()->create($initial); + + for ($i = 0; $i < $picksBeforeRefresh; ++$i) { + $balancer->pick(self::context()); + } + + $balancer->refresh($refreshed); + + foreach ($expectedAfterRefresh as $expected) { + self::assertTrue($expected->equals($balancer->pick(self::context()))); + } + } + + /** + * @return iterable, int, non-empty-list, list}> + */ + public static function provideRefreshCases(): iterable + { + $a = new Endpoint(new Address('10.0.0.1:50051')); + $b = new Endpoint(new Address('10.0.0.2:50051')); + $c = new Endpoint(new Address('10.0.0.3:50051')); + + yield 'same list, continues rotation' => [ + [$a, $b, $c], 1, + [$a, $b, $c], + [$b, $c, $a], + ]; + + yield 'completely new list' => [ + [$a, $b], 0, + [$c], + [$c, $c, $c], + ]; + + yield 'list grew' => [ + [$a], 0, + [$a, $b, $c], + [$a, $b, $c], + ]; + } + + private static function context(): PickContext + { + return new PickContext('/test.Service/Method', new Metadata()); + } +} diff --git a/tests/Client/TargetTest.php b/tests/Client/TargetTest.php new file mode 100644 index 0000000..f08ebe8 --- /dev/null +++ b/tests/Client/TargetTest.php @@ -0,0 +1,175 @@ + + */ + public static function provideParseTargetCases(): iterable + { + yield 'dns:host:port' => [ + 'dns:myhost:50051', + new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)], 'myhost:50051'), + ]; + + yield 'dns:///host:port' => [ + 'dns:///myhost:50051', + new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)], '///myhost:50051'), + ]; + + yield 'dns://authority/host:port' => [ + 'dns://authority:53/myhost:50051', + new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)], '//authority:53/myhost:50051', 'authority:53'), + ]; + + yield 'dns://authority/host:port without authority port' => [ + 'dns://authority/myhost:50051', + new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)], '//authority/myhost:50051', 'authority'), + ]; + + yield 'dns:///ipv6 with brackets' => [ + 'dns:///[2001:db8:85a3:8d3:1319:8a2e:370:7348]:443', + new Target(Scheme::Dns, [new TargetAddress('[2001:db8:85a3:8d3:1319:8a2e:370:7348]', 443)], '///[2001:db8:85a3:8d3:1319:8a2e:370:7348]:443'), + ]; + + yield 'dns:///ipv6 percent-encoded brackets' => [ + 'dns:///%5B2001:db8:85a3:8d3:1319:8a2e:370:7348%5D:443', + new Target(Scheme::Dns, [new TargetAddress('[2001:db8:85a3:8d3:1319:8a2e:370:7348]', 443)], '///%5B2001:db8:85a3:8d3:1319:8a2e:370:7348%5D:443'), + ]; + + yield 'ipv4:single address' => [ + 'ipv4:192.168.0.1:50051', + new Target(Scheme::Ipv4, [new TargetAddress('192.168.0.1', 50_051)], '192.168.0.1:50051'), + ]; + + yield 'ipv4:multiple addresses' => [ + 'ipv4:192.168.0.1:50051,192.168.0.2:50052', + new Target(Scheme::Ipv4, [ + new TargetAddress('192.168.0.1', 50_051), + new TargetAddress('192.168.0.2', 50_052), + ], '192.168.0.1:50051,192.168.0.2:50052'), + ]; + + yield 'ipv4:multiple addresses with spaces around comma' => [ + 'ipv4:192.168.0.1:50051, 192.168.0.2:50052', + new Target(Scheme::Ipv4, [ + new TargetAddress('192.168.0.1', 50_051), + new TargetAddress('192.168.0.2', 50_052), + ], '192.168.0.1:50051, 192.168.0.2:50052'), + ]; + + yield 'ipv6:single address with port' => [ + 'ipv6:[::1]:50051', + new Target(Scheme::Ipv6, [new TargetAddress('[::1]', 50_051)], '[::1]:50051'), + ]; + + yield 'ipv6:multiple addresses' => [ + 'ipv6:[::1]:50051,[::2]:50052', + new Target(Scheme::Ipv6, [ + new TargetAddress('[::1]', 50_051), + new TargetAddress('[::2]', 50_052), + ], '[::1]:50051,[::2]:50052'), + ]; + + yield 'bare host:port' => [ + 'myhost:50051', + new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)], 'myhost:50051'), + ]; + + yield 'bare localhost:port' => [ + 'localhost:50051', + new Target(Scheme::Dns, [new TargetAddress('localhost', 50_051)], 'localhost:50051'), + ]; + + yield 'unix:///path' => [ + 'unix:///var/run/grpc.sock', + new Target(Scheme::Unix, [new TargetAddress('/var/run/grpc.sock', 0)], '///var/run/grpc.sock'), + ]; + + yield 'unix:/path' => [ + 'unix:/var/run/grpc.sock', + new Target(Scheme::Unix, [new TargetAddress('/var/run/grpc.sock', 0)], '/var/run/grpc.sock'), + ]; + + yield 'unix:///tmp/test.sock' => [ + 'unix:///tmp/test.sock', + new Target(Scheme::Unix, [new TargetAddress('/tmp/test.sock', 0)], '///tmp/test.sock'), + ]; + + yield 'passthrough:///host:port' => [ + 'passthrough:///myhost:50051', + new Target(Scheme::Passthrough, [new TargetAddress('myhost:50051', 0)], 'myhost:50051'), + ]; + + yield 'bare bracketed ipv6' => [ + '[::1]:50051', + new Target(Scheme::Dns, [new TargetAddress('[::1]', 50_051)], '[::1]:50051'), + ]; + } + + /** + * @param non-empty-string $input + */ + #[DataProvider('provideParseTargetThrowsCases')] + public function testParseTargetThrows(string $input): void + { + $this->expectException(InvalidTarget::class); + Target::parse($input); + } + + /** + * @return iterable + */ + public static function provideParseTargetThrowsCases(): iterable + { + yield 'empty string' => ['']; + yield 'uppercase dns scheme is invalid' => ['DNS:myhost:50051']; + yield 'uppercase ipv4 scheme is invalid' => ['IPV4:192.168.0.1:50051']; + yield 'uppercase ipv6 scheme is invalid' => ['IPV6:[::1]:50051']; + yield 'ipv4: no address' => ['ipv4:']; + yield 'ipv6: no address' => ['ipv6:']; + yield 'dns:/// empty host' => ['dns:///']; + yield 'unknown scheme' => ['etcd:myhost:50051']; + yield 'ipv4: trailing comma' => ['ipv4:192.168.0.1:50051,']; + yield 'ipv4: leading comma' => ['ipv4:,192.168.0.1:50051']; + yield 'http scheme' => ['http://localhost:50051']; + yield 'https scheme' => ['https://example.com:443']; + yield 'dns://host without slash' => ['dns://myhost']; + yield 'dns:///host/extra' => ['dns:///myhost:50051/extra']; + yield 'dns: endpoint with spaces' => ['dns:my host:50051']; + yield 'bare host with spaces' => ['my host:50051']; + yield 'unix: no path' => ['unix:']; + yield 'unix: relative path' => ['unix:relative/path.sock']; + yield 'unix:// without absolute path' => ['unix://relative']; + yield 'passthrough:/// empty host' => ['passthrough:///']; + yield 'passthrough: no address' => ['passthrough:']; + yield 'dns: host without port' => ['dns:myhost']; + yield 'dns:///host without port' => ['dns:///myhost']; + yield 'ipv4: address without port' => ['ipv4:10.0.0.1']; + yield 'ipv6: address without port' => ['ipv6:::1']; + yield 'ipv6: bracketed without port' => ['ipv6:[::1]']; + yield 'bare host without port' => ['myhost']; + yield 'passthrough: without slashes' => ['passthrough:myhost:50051']; + yield 'passthrough: host without port' => ['passthrough:myhost']; + yield 'port out of range' => ['dns:myhost:99999']; + } +} diff --git a/tests/GracefulShutdownTest.php b/tests/GracefulShutdownTest.php index 7f09a0a..163810d 100644 --- a/tests/GracefulShutdownTest.php +++ b/tests/GracefulShutdownTest.php @@ -52,7 +52,7 @@ public function testGracefulShutdown(): void $this->server->stop(); $failFuture = async($client->echo(...), new EchoRequest('Hello, gRPC')); self::assertEquals(new EchoResponse('Hello, gRPC'), $successFuture->await()); - $this->expectExceptionMessage("Connection to 'localhost:50051' failed"); + $this->expectExceptionMessage("Connection to '127.0.0.1:50051' failed"); $failFuture->await(); } } diff --git a/tests/Internal/Http2/UriFactoryTest.php b/tests/Internal/Http2/UriFactoryTest.php index 1eca138..7fc5ca4 100644 --- a/tests/Internal/Http2/UriFactoryTest.php +++ b/tests/Internal/Http2/UriFactoryTest.php @@ -7,41 +7,52 @@ use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\Attributes\TestWith; use PHPUnit\Framework\TestCase; +use Thesis\Grpc\Client\Address; use Thesis\Grpc\Client\Internal\Http2\UriFactory; +use Thesis\Grpc\Client\Internal\HttpScheme; +use Thesis\Grpc\Client\Invoke; #[CoversClass(UriFactory::class)] final class UriFactoryTest extends TestCase { /** - * @param non-empty-string $host + * @param non-empty-string $address * @param non-empty-string $method * @param non-empty-string $uri */ #[TestWith([ + 'http', 'localhost:50051', 'test.api.v1.EchoController/Echo', - 'localhost:50051/test.api.v1.EchoController/Echo', + 'http://localhost:50051/test.api.v1.EchoController/Echo', ])] #[TestWith([ + 'http', 'localhost:50051', '/test.api.v1.EchoController/Echo', - 'localhost:50051/test.api.v1.EchoController/Echo', + 'http://localhost:50051/test.api.v1.EchoController/Echo', ])] #[TestWith([ - 'localhost:50051/', - '/test.api.v1.EchoController/Echo', - 'localhost:50051/test.api.v1.EchoController/Echo', + 'https', + 'localhost:50051', + 'test.api.v1.EchoController/Echo', + 'https://localhost:50051/test.api.v1.EchoController/Echo', ])] #[TestWith([ - 'localhost:50051/', + 'http', + '192.168.0.1:50051', 'test.api.v1.EchoController/Echo', - 'localhost:50051/test.api.v1.EchoController/Echo', + 'http://192.168.0.1:50051/test.api.v1.EchoController/Echo', ])] public function testCreate( - string $host, + string $scheme, + string $address, string $method, string $uri, ): void { - self::assertSame($uri, new UriFactory($host)->create($method)); + $factory = new UriFactory(HttpScheme::from($scheme)); + $invoke = new Invoke($method, \stdClass::class); + + self::assertSame($uri, $factory->create(new Address($address), $invoke)); } } diff --git a/tests/LoadBalancingTest.php b/tests/LoadBalancingTest.php new file mode 100644 index 0000000..e053523 --- /dev/null +++ b/tests/LoadBalancingTest.php @@ -0,0 +1,84 @@ +server1 = new Server\Builder() + ->withAddresses('0.0.0.0:50051') + ->withServices(new EchoServiceServerRegistry(new IdentifyingEchoServer('server-1'))) + ->build(); + + $this->server2 = new Server\Builder() + ->withAddresses('0.0.0.0:50052') + ->withServices(new EchoServiceServerRegistry(new IdentifyingEchoServer('server-2'))) + ->build(); + + $this->server1->start(); + $this->server2->start(); + } + + protected function tearDown(): void + { + $this->server1->stop(); + $this->server2->stop(); + } + + public function testRoundRobinDistributesRequests(): void + { + $client = new EchoServiceClient( + new Client\Builder() + ->withHost('ipv4:127.0.0.1:50051,127.0.0.1:50052') + ->withLoadBalancer(new RoundRobinFactory()) + ->build(), + ); + + $responses = []; + for ($i = 0; $i < 10; ++$i) { + $responses[] = $client->echo(new EchoRequest('ping'))->sentence; + } + + self::assertCount(5, array_filter($responses, static fn(string $s) => str_contains($s, 'server-1'))); + self::assertCount(5, array_filter($responses, static fn(string $s) => str_contains($s, 'server-2'))); + } +} + +/** + * @internal + */ +final readonly class IdentifyingEchoServer implements EchoServiceServer +{ + public function __construct( + private string $serverId, + ) {} + + #[\Override] + public function echo(EchoRequest $request, Metadata $md, Cancellation $cancellation): EchoResponse + { + return new EchoResponse("[{$this->serverId}] {$request->sentence}"); + } +}