summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAntoine Tollenaere <atollena@gmail.com>2024-06-12 00:23:38 +0200
committerGitHub <noreply@github.com>2024-06-11 15:23:38 -0700
commit4dd7f552b8a49203d4c3aa5b1471cd60a6f38b85 (patch)
treecb9be1d7e63540fa7e89fa3d17db234d1f4fb5b1
parentde51a630c1d05844f1a70543b87c174234fdf7df (diff)
downloadgrpc-grpc-go-upstream-master.tar.gz
ringhash: port e2e tests from c-core (#7271)upstream-master
-rw-r--r--internal/testutils/blocking_context_dialer.go56
-rw-r--r--internal/testutils/xds/e2e/clientresources.go14
-rw-r--r--xds/internal/balancer/cdsbalancer/cdsbalancer_test.go2
-rw-r--r--xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go22
-rw-r--r--xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go870
5 files changed, 910 insertions, 54 deletions
diff --git a/internal/testutils/blocking_context_dialer.go b/internal/testutils/blocking_context_dialer.go
new file mode 100644
index 00000000..ea7a8519
--- /dev/null
+++ b/internal/testutils/blocking_context_dialer.go
@@ -0,0 +1,56 @@
+/*
+ *
+ * Copyright 2024 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package testutils
+
+import (
+ "context"
+ "net"
+)
+
+// BlockingDialer is a dialer that waits for Resume() to be called before
+// dialing.
+type BlockingDialer struct {
+ dialer *net.Dialer
+ blockCh chan struct{}
+}
+
+// NewBlockingDialer returns a dialer that waits for Resume() to be called
+// before dialing.
+func NewBlockingDialer() *BlockingDialer {
+ return &BlockingDialer{
+ dialer: &net.Dialer{},
+ blockCh: make(chan struct{}),
+ }
+}
+
+// DialContext implements a context dialer for use with grpc.WithContextDialer
+// dial option for a BlockingDialer.
+func (d *BlockingDialer) DialContext(ctx context.Context, addr string) (net.Conn, error) {
+ select {
+ case <-d.blockCh:
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
+ return d.dialer.DialContext(ctx, "tcp", addr)
+}
+
+// Resume unblocks the dialer. It panics if called more than once.
+func (d *BlockingDialer) Resume() {
+ close(d.blockCh)
+}
diff --git a/internal/testutils/xds/e2e/clientresources.go b/internal/testutils/xds/e2e/clientresources.go
index 175490e9..d511b473 100644
--- a/internal/testutils/xds/e2e/clientresources.go
+++ b/internal/testutils/xds/e2e/clientresources.go
@@ -675,6 +675,8 @@ type LocalityOptions struct {
Weight uint32
// Backends is a set of backends belonging to this locality.
Backends []BackendOptions
+ // Priority is the priority of the locality. Defaults to 0.
+ Priority uint32
}
// BackendOptions contains options to configure individual backends in a
@@ -686,6 +688,8 @@ type BackendOptions struct {
// Health status of the backend. Default is UNKNOWN which is treated the
// same as HEALTHY.
HealthStatus v3corepb.HealthStatus
+ // Weight sets the backend weight. Defaults to 1.
+ Weight uint32
}
// EndpointOptions contains options to configure an Endpoint (or
@@ -708,7 +712,7 @@ type EndpointOptions struct {
func DefaultEndpoint(clusterName string, host string, ports []uint32) *v3endpointpb.ClusterLoadAssignment {
var bOpts []BackendOptions
for _, p := range ports {
- bOpts = append(bOpts, BackendOptions{Port: p})
+ bOpts = append(bOpts, BackendOptions{Port: p, Weight: 1})
}
return EndpointResourceWithOptions(EndpointOptions{
ClusterName: clusterName,
@@ -729,6 +733,10 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
for i, locality := range opts.Localities {
var lbEndpoints []*v3endpointpb.LbEndpoint
for _, b := range locality.Backends {
+ // Weight defaults to 1.
+ if b.Weight == 0 {
+ b.Weight = 1
+ }
lbEndpoints = append(lbEndpoints, &v3endpointpb.LbEndpoint{
HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
@@ -740,7 +748,7 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
}},
}},
HealthStatus: b.HealthStatus,
- LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
+ LoadBalancingWeight: &wrapperspb.UInt32Value{Value: b.Weight},
})
}
@@ -752,7 +760,7 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
},
LbEndpoints: lbEndpoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: locality.Weight},
- Priority: 0,
+ Priority: locality.Priority,
})
}
diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
index 20588bbc..82f4fb5f 100644
--- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
+++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
@@ -196,7 +196,7 @@ func registerWrappedCDSPolicy(t *testing.T) chan balancer.Balancer {
// - the nodeID expected by the management server
// - the grpc channel to the test backend service
// - the manual resolver configured on the channel
-// - the xDS cient used the grpc channel
+// - the xDS client used the grpc channel
// - a channel on which requested cluster resource names are sent
// - a channel used to signal that previously requested cluster resources are
// no longer requested
diff --git a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go
index 1fb8ea73..88659318 100644
--- a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go
+++ b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go
@@ -79,26 +79,18 @@ func makeLogicalDNSClusterResource(name, dnsHost string, dnsPort uint32) *v3clus
// Returns the following:
// - a channel onto which the DNS target being resolved is written to by the
// mock DNS resolver
-// - a channel to notify close of the DNS resolver
-// - a channel to notify re-resolution requests to the DNS resolver
// - a manual resolver which is used to mock the actual DNS resolution
-// - a cleanup function which re-registers the original DNS resolver
-func setupDNS() (chan resolver.Target, chan struct{}, chan resolver.ResolveNowOptions, *manual.Resolver, func()) {
+func setupDNS(t *testing.T) (chan resolver.Target, *manual.Resolver) {
targetCh := make(chan resolver.Target, 1)
- closeCh := make(chan struct{}, 1)
- resolveNowCh := make(chan resolver.ResolveNowOptions, 1)
mr := manual.NewBuilderWithScheme("dns")
- mr.BuildCallback = func(target resolver.Target, _ resolver.ClientConn, _ resolver.BuildOptions) {
- targetCh <- target
- }
- mr.CloseCallback = func() { closeCh <- struct{}{} }
- mr.ResolveNowCallback = func(opts resolver.ResolveNowOptions) { resolveNowCh <- opts }
+ mr.BuildCallback = func(target resolver.Target, _ resolver.ClientConn, _ resolver.BuildOptions) { targetCh <- target }
dnsResolverBuilder := resolver.Get("dns")
resolver.Register(mr)
- return targetCh, closeCh, resolveNowCh, mr, func() { resolver.Register(dnsResolverBuilder) }
+ t.Cleanup(func() { resolver.Register(dnsResolverBuilder) })
+ return targetCh, mr
}
// TestAggregateCluster_WithTwoEDSClusters tests the case where the top-level
@@ -471,8 +463,7 @@ func (s) TestAggregateCluster_WithOneDNSCluster_HostnameChange(t *testing.T) {
// cluster. The test verifies that RPCs fail until both clusters are resolved to
// endpoints, and RPCs are routed to the higher priority EDS cluster.
func (s) TestAggregateCluster_WithEDSAndDNS(t *testing.T) {
- dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
- defer cleanup1()
+ dnsTargetCh, dnsR := setupDNS(t)
// Start an xDS management server that pushes the name of the requested EDS
// resource onto a channel.
@@ -661,8 +652,7 @@ func (s) TestAggregateCluster_SwitchEDSAndDNS(t *testing.T) {
// still successful. This is the expected behavior because the cluster resolver
// policy eats errors from DNS Resolver after it has returned an error.
func (s) TestAggregateCluster_BadEDS_GoodToBadDNS(t *testing.T) {
- dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
- defer cleanup1()
+ dnsTargetCh, dnsR := setupDNS(t)
// Start an xDS management server.
managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
diff --git a/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go b/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go
index 5eb8ffd1..2ca3b5ad 100644
--- a/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go
+++ b/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go
@@ -20,21 +20,40 @@ package ringhash_test
import (
"context"
+ "fmt"
+ "math"
+ "net"
"testing"
"time"
+ "github.com/google/go-cmp/cmp"
+ "github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc"
+ "google.golang.org/grpc/backoff"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpctest"
+ "google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
+ "google.golang.org/grpc/internal/testutils/xds/e2e"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
+ v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
+ v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
+ v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
+ v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
+ v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
+ v3ringhashpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/ring_hash/v3"
+ v3matcherpb "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
+ "google.golang.org/protobuf/types/known/wrapperspb"
- _ "google.golang.org/grpc/xds/internal/balancer/ringhash" // Register the ring_hash_experimental LB policy.
+ _ "google.golang.org/grpc/xds"
)
type s struct {
@@ -46,22 +65,16 @@ func Test(t *testing.T) {
}
const (
- defaultTestTimeout = 10 * time.Second
- defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
-)
+ defaultTestTimeout = 10 * time.Second
-type testService struct {
- testgrpc.TestServiceServer
-}
+ errorTolerance = .05 // For tests that rely on statistical significance.
-func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) {
- return &testpb.Empty{}, nil
-}
+ virtualHostName = "test.server"
+)
-// TestRingHash_ReconnectToMoveOutOfTransientFailure tests the case where the
-// ring contains a single subConn, and verifies that when the server goes down,
-// the LB policy on the client automatically reconnects until the subChannel
-// moves out of TRANSIENT_FAILURE.
+// Tests the case where the ring contains a single subConn, and verifies that
+// when the server goes down, the LB policy on the client automatically
+// reconnects until the subChannel moves out of TRANSIENT_FAILURE.
func (s) TestRingHash_ReconnectToMoveOutOfTransientFailure(t *testing.T) {
// Create a restartable listener to simulate server being down.
l, err := testutils.LocalTCPListener()
@@ -69,16 +82,11 @@ func (s) TestRingHash_ReconnectToMoveOutOfTransientFailure(t *testing.T) {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
lis := testutils.NewRestartableListener(l)
-
- // Start a server backend exposing the test service.
- server := grpc.NewServer()
- defer server.Stop()
- testgrpc.RegisterTestServiceServer(server, &testService{})
- go func() {
- if err := server.Serve(lis); err != nil {
- t.Errorf("Serve() failed: %v", err)
- }
- }()
+ srv := stubserver.StartTestService(t, &stubserver.StubServer{
+ Listener: lis,
+ EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil },
+ })
+ defer srv.Stop()
// Create a clientConn with a manual resolver (which is used to push the
// address of the test backend), and a default service config pointing to
@@ -90,14 +98,14 @@ func (s) TestRingHash_ReconnectToMoveOutOfTransientFailure(t *testing.T) {
grpc.WithResolvers(r),
grpc.WithDefaultServiceConfig(ringHashServiceConfig),
}
- cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
+ cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
if err != nil {
- t.Fatalf("failed to dial local test server: %v", err)
+ t.Fatalf("Failed to dial local test server: %v", err)
}
defer cc.Close()
// Push the address of the test backend through the manual resolver.
- r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})
+ r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
@@ -128,17 +136,811 @@ func (s) TestRingHash_ReconnectToMoveOutOfTransientFailure(t *testing.T) {
// attempt to reconnect on its own and come out of TRANSIENT_FAILURE, even
// without an RPC attempt.
lis.Restart()
- for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
- if cc.GetState() == connectivity.Ready {
+ testutils.AwaitState(ctx, t, cc, connectivity.Ready)
+
+ // An RPC at this point is expected to succeed.
+ if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
+ t.Fatalf("rpc EmptyCall() failed: %v", err)
+ }
+}
+
+// startTestServiceBackends starts num stub servers.
+func startTestServiceBackends(t *testing.T, num int) ([]*stubserver.StubServer, func()) {
+ t.Helper()
+
+ var servers []*stubserver.StubServer
+ for i := 0; i < num; i++ {
+ servers = append(servers, stubserver.StartTestService(t, nil))
+ }
+
+ return servers, func() {
+ for _, server := range servers {
+ server.Stop()
+ }
+ }
+}
+
+// backendOptions returns a slice of e2e.BackendOptions for the given stub
+// servers.
+func backendOptions(t *testing.T, servers []*stubserver.StubServer) []e2e.BackendOptions {
+ t.Helper()
+
+ var backendOpts []e2e.BackendOptions
+ for _, server := range servers {
+ backendOpts = append(backendOpts, e2e.BackendOptions{
+ Port: testutils.ParsePort(t, server.Address),
+ Weight: 1,
+ })
+ }
+ return backendOpts
+}
+
+// channelIDHashRoute returns a RouteConfiguration with a hash policy that
+// hashes based on the channel ID.
+func channelIDHashRoute(routeName, virtualHostDomain, clusterName string) *v3routepb.RouteConfiguration {
+ route := e2e.DefaultRouteConfig(routeName, virtualHostDomain, clusterName)
+ hashPolicy := v3routepb.RouteAction_HashPolicy{
+ PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{
+ FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{
+ Key: "io.grpc.channel_id",
+ },
+ },
+ }
+ action := route.VirtualHosts[0].Routes[0].Action.(*v3routepb.Route_Route)
+ action.Route.HashPolicy = []*v3routepb.RouteAction_HashPolicy{&hashPolicy}
+ return route
+}
+
+// checkRPCSendOK sends num RPCs to the client. It returns a map of backend
+// addresses as keys and number of RPCs sent to this address as value. Abort the
+// test if any RPC fails.
+func checkRPCSendOK(t *testing.T, ctx context.Context, client testpb.TestServiceClient, num int) map[string]int {
+ t.Helper()
+
+ backendCount := make(map[string]int)
+ for i := 0; i < num; i++ {
+ var remote peer.Peer
+ if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&remote)); err != nil {
+ t.Fatalf("rpc EmptyCall() failed: %v", err)
+ }
+ backendCount[remote.Addr.String()]++
+ }
+ return backendCount
+}
+
+// makeNonExistentBackends returns a slice of e2e.BackendOptions with num
+// listeners, each of which is closed immediately. Useful to simulate servers
+// that are unreachable.
+func makeNonExistentBackends(t *testing.T, num int) []e2e.BackendOptions {
+ closedListeners := make([]net.Listener, 0, num)
+ for i := 0; i < num; i++ {
+ lis, err := testutils.LocalTCPListener()
+ if err != nil {
+ t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
+ }
+ closedListeners = append(closedListeners, lis)
+ }
+
+ // Stop the servers that we want to be unreachable and collect their
+ // addresses.
+ backendOptions := make([]e2e.BackendOptions, 0, num)
+ for _, lis := range closedListeners {
+ backendOptions = append(backendOptions, e2e.BackendOptions{
+ Port: testutils.ParsePort(t, lis.Addr().String()),
+ Weight: 1,
+ })
+ lis.Close()
+ }
+ return backendOptions
+}
+
+// Tests that when an aggregate cluster is configured with ring hash policy, and
+// the first cluster is in transient failure, all RPCs are sent to the second
+// cluster using the ring hash policy.
+func (s) TestRingHash_AggregateClusterFallBackFromRingHashAtStartup(t *testing.T) {
+ xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
+ defer stop()
+
+ servers, stop := startTestServiceBackends(t, 2)
+ defer stop()
+
+ const primaryClusterName = "new_cluster_1"
+ const primaryServiceName = "new_eds_service_1"
+ const secondaryClusterName = "new_cluster_2"
+ const secondaryServiceName = "new_eds_service_2"
+ const clusterName = "aggregate_cluster"
+
+ ep1 := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
+ ClusterName: primaryServiceName,
+ Localities: []e2e.LocalityOptions{{
+ Name: "locality0",
+ Weight: 1,
+ Backends: makeNonExistentBackends(t, 2),
+ }},
+ })
+ ep2 := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
+ ClusterName: secondaryServiceName,
+ Localities: []e2e.LocalityOptions{{
+ Name: "locality0",
+ Weight: 1,
+ Backends: backendOptions(t, servers),
+ }},
+ })
+ primaryCluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
+ ClusterName: primaryClusterName,
+ ServiceName: primaryServiceName,
+ })
+ secundaryCluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
+ ClusterName: secondaryClusterName,
+ ServiceName: secondaryServiceName,
+ })
+ cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
+ ClusterName: clusterName,
+ Type: e2e.ClusterTypeAggregate,
+ // TODO: when "A75: xDS Aggregate Cluster Behavior Fixes" is implemented, the
+ // policy will have to be set on the child clusters.
+ Policy: e2e.LoadBalancingPolicyRingHash,
+ ChildNames: []string{primaryClusterName, secondaryClusterName},
+ })
+ route := channelIDHashRoute("new_route", virtualHostName, clusterName)
+ listener := e2e.DefaultClientListener(virtualHostName, route.Name)
+
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+
+ err := xdsServer.Update(ctx, e2e.UpdateOptions{
+ NodeID: nodeID,
+ Endpoints: []*v3endpointpb.ClusterLoadAssignment{ep1, ep2},
+ Clusters: []*v3clusterpb.Cluster{cluster, primaryCluster, secundaryCluster},
+ Routes: []*v3routepb.RouteConfiguration{route},
+ Listeners: []*v3listenerpb.Listener{listener},
+ })
+ if err != nil {
+ t.Fatalf("Failed to update xDS resources: %v", err)
+ }
+
+ conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials()))
+ if err != nil {
+ t.Fatalf("Failed to create client: %s", err)
+ }
+ defer conn.Close()
+ client := testgrpc.NewTestServiceClient(conn)
+
+ gotPerBackend := checkRPCSendOK(t, ctx, client, 100)
+
+ // Since this is using ring hash with the channel ID as the key, all RPCs
+ // are routed to the same backend of the secondary locality.
+ if len(gotPerBackend) != 1 {
+ t.Errorf("Got RPCs routed to %v backends, want %v", len(gotPerBackend), 1)
+ }
+
+ var backend string
+ var got int
+ for backend, got = range gotPerBackend {
+ }
+ found := false
+ for _, server := range servers {
+ if backend == server.Address {
+ found = true
break
}
}
- if err := ctx.Err(); err != nil {
- t.Fatalf("Timeout waiting for channel to reach READT after server restart: %v", err)
+ if !found {
+ t.Errorf("Got RPCs routed to an unexpected backend: %v, want one of %v", backend, servers)
+ }
+ if got != 100 {
+ t.Errorf("Got %v RPCs routed to a backend, want %v", got, 100)
}
+}
- // An RPC at this point is expected to fail.
- if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
- t.Fatalf("rpc EmptyCall() failed: %v", err)
+func replaceDNSResolver(t *testing.T) *manual.Resolver {
+ mr := manual.NewBuilderWithScheme("dns")
+
+ dnsResolverBuilder := resolver.Get("dns")
+ resolver.Register(mr)
+
+ t.Cleanup(func() { resolver.Register(dnsResolverBuilder) })
+ return mr
+}
+
+// Tests that when an aggregate cluster is configured with ring hash policy, and
+// the first is an EDS cluster in transient failure, and the fallback is a
+// logical DNS cluster, all RPCs are sent to the second cluster using the ring
+// hash policy.
+func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup(t *testing.T) {
+ xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
+ defer stop()
+
+ const edsClusterName = "eds_cluster"
+ const logicalDNSClusterName = "logical_dns_cluster"
+ const clusterName = "aggregate_cluster"
+
+ backends, stop := startTestServiceBackends(t, 1)
+ defer stop()
+
+ endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
+ ClusterName: edsClusterName,
+ Localities: []e2e.LocalityOptions{{
+ Name: "locality0",
+ Weight: 1,
+ Backends: makeNonExistentBackends(t, 1),
+ Priority: 0,
+ }},
+ })
+ edsCluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
+ ClusterName: edsClusterName,
+ ServiceName: edsClusterName,
+ })
+
+ logicalDNSCluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
+ Type: e2e.ClusterTypeLogicalDNS,
+ ClusterName: logicalDNSClusterName,
+ // The DNS values are not used because we fake DNS later on, but they
+ // are required to be present for the resource to be valid.
+ DNSHostName: "server.example.com",
+ DNSPort: 443,
+ })
+ cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
+ ClusterName: clusterName,
+ Type: e2e.ClusterTypeAggregate,
+ // TODO: when "A75: xDS Aggregate Cluster Behavior Fixes" is merged, the
+ // policy will have to be set on the child clusters.
+ Policy: e2e.LoadBalancingPolicyRingHash,
+ ChildNames: []string{edsClusterName, logicalDNSClusterName},
+ })
+ route := channelIDHashRoute("new_route", virtualHostName, clusterName)
+ listener := e2e.DefaultClientListener(virtualHostName, route.Name)
+
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+
+ err := xdsServer.Update(ctx, e2e.UpdateOptions{
+ NodeID: nodeID,
+ Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints},
+ Clusters: []*v3clusterpb.Cluster{cluster, edsCluster, logicalDNSCluster},
+ Routes: []*v3routepb.RouteConfiguration{route},
+ Listeners: []*v3listenerpb.Listener{listener},
+ })
+ if err != nil {
+ t.Fatalf("Failed to update xDS resources: %v", err)
+ }
+
+ dnsR := replaceDNSResolver(t)
+ dnsR.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0].Address}}})
+
+ conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials()))
+ if err != nil {
+ t.Fatalf("Failed to create client: %s", err)
+ }
+ defer conn.Close()
+ client := testgrpc.NewTestServiceClient(conn)
+
+ gotPerBackend := checkRPCSendOK(t, ctx, client, 1)
+ var got string
+ for got = range gotPerBackend {
+ }
+ if want := backends[0].Address; got != want {
+ t.Errorf("Got RPCs routed to an unexpected got: %v, want %v", got, want)
+ }
+}
+
+// Tests that when an aggregate cluster is configured with ring hash policy, and
+// it's first child is in transient failure, and the fallback is a logical DNS,
+// the later recovers from transient failure when its backend becomes available.
+func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupNoFailedRPCs(t *testing.T) {
+ // https://github.com/grpc/grpc/blob/083bbee4805c14ce62e6c9535fe936f68b854c4f/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc#L225
+ xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
+ defer stop()
+
+ const edsClusterName = "eds_cluster"
+ const logicalDNSClusterName = "logical_dns_cluster"
+ const clusterName = "aggregate_cluster"
+
+ backends, stop := startTestServiceBackends(t, 1)
+ defer stop()
+
+ endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
+ ClusterName: edsClusterName,
+ Localities: []e2e.LocalityOptions{{
+ Name: "locality0",
+ Weight: 1,
+ Backends: makeNonExistentBackends(t, 1),
+ Priority: 0,
+ }},
+ })
+ edsCluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
+ ClusterName: edsClusterName,
+ ServiceName: edsClusterName,
+ })
+
+ logicalDNSCluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
+ Type: e2e.ClusterTypeLogicalDNS,
+ ClusterName: logicalDNSClusterName,
+ // The DNS values are not used because we fake DNS later on, but they
+ // are required to be present for the resource to be valid.
+ DNSHostName: "server.example.com",
+ DNSPort: 443,
+ })
+ cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
+ ClusterName: clusterName,
+ Type: e2e.ClusterTypeAggregate,
+ // TODO: when "A75: xDS Aggregate Cluster Behavior Fixes" is merged, the
+ // policy will have to be set on the child clusters.
+ Policy: e2e.LoadBalancingPolicyRingHash,
+ ChildNames: []string{edsClusterName, logicalDNSClusterName},
+ })
+ route := channelIDHashRoute("new_route", virtualHostName, clusterName)
+ listener := e2e.DefaultClientListener(virtualHostName, route.Name)
+
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+
+ err := xdsServer.Update(ctx, e2e.UpdateOptions{
+ NodeID: nodeID,
+ Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints},
+ Clusters: []*v3clusterpb.Cluster{cluster, edsCluster, logicalDNSCluster},
+ Routes: []*v3routepb.RouteConfiguration{route},
+ Listeners: []*v3listenerpb.Listener{listener},
+ })
+ if err != nil {
+ t.Fatalf("Failed to update xDS resources: %v", err)
+ }
+
+ dnsR := replaceDNSResolver(t)
+ dnsR.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0].Address}}})
+
+ dialer := testutils.NewBlockingDialer()
+ cp := grpc.ConnectParams{
+ // Increase backoff time, so that subconns stay in TRANSIENT_FAILURE
+ // for long enough to trigger potential problems.
+ Backoff: backoff.Config{
+ BaseDelay: defaultTestTimeout,
+ },
+ MinConnectTimeout: 0,
+ }
+ opts := []grpc.DialOption{
+ grpc.WithResolvers(xdsResolver),
+ grpc.WithTransportCredentials(insecure.NewCredentials()),
+ grpc.WithContextDialer(dialer.DialContext),
+ grpc.WithConnectParams(cp)}
+ conn, err := grpc.NewClient("xds:///test.server", opts...)
+ if err != nil {
+ t.Fatalf("Failed to create client: %s", err)
+ }
+ defer conn.Close()
+ client := testgrpc.NewTestServiceClient(conn)
+
+ errCh := make(chan error, 2)
+ go func() {
+ if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
+ errCh <- fmt.Errorf("first rpc UnaryCall() failed: %v", err)
+ return
+ }
+ errCh <- nil
+ }()
+
+ testutils.AwaitState(ctx, t, conn, connectivity.Connecting)
+
+ go func() {
+ // Start a second RPC at this point, which should be queued as well.
+ // This will fail if the priority policy fails to update the picker to
+ // point to the LOGICAL_DNS child; if it leaves it pointing to the EDS
+ // priority 1, then the RPC will fail, because all subchannels are in
+ // transient failure.
+ //
+ // Note that sending only the first RPC does not catch this case,
+ // because if the priority policy fails to update the picker, then the
+ // pick for the first RPC will not be retried.
+ if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
+ errCh <- fmt.Errorf("second UnaryCall() failed: %v", err)
+ return
+ }
+ errCh <- nil
+ }()
+
+ // Allow the connection attempts to complete.
+ dialer.Resume()
+
+ // RPCs should complete successfully.
+ for range []int{0, 1} {
+ select {
+ case err := <-errCh:
+ if err != nil {
+ t.Errorf("Expected 2 rpc to succeed, but failed: %v", err)
+ }
+ case <-ctx.Done():
+ t.Fatalf("Timed out waiting for RPCs to complete")
+ }
+ }
+}
+
+// Tests that ring hash policy that hashes using channel id ensures all RPCs to
+// go 1 particular backend.
+func (s) TestRingHash_ChannelIdHashing(t *testing.T) {
+ backends, stop := startTestServiceBackends(t, 4)
+ defer stop()
+
+ xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
+ defer stop()
+ const clusterName = "cluster"
+ endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
+ ClusterName: clusterName,
+ Localities: []e2e.LocalityOptions{{
+ Backends: backendOptions(t, backends),
+ Weight: 1,
+ }},
+ })
+ cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
+ ClusterName: clusterName,
+ ServiceName: clusterName,
+ Policy: e2e.LoadBalancingPolicyRingHash,
+ })
+ route := channelIDHashRoute("new_route", virtualHostName, clusterName)
+ listener := e2e.DefaultClientListener(virtualHostName, route.Name)
+
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+
+ err := xdsServer.Update(ctx, e2e.UpdateOptions{
+ NodeID: nodeID,
+ Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints},
+ Clusters: []*v3clusterpb.Cluster{cluster},
+ Routes: []*v3routepb.RouteConfiguration{route},
+ Listeners: []*v3listenerpb.Listener{listener},
+ })
+ if err != nil {
+ t.Fatalf("Failed to update xDS resources: %v", err)
+ }
+
+ conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials()))
+ if err != nil {
+ t.Fatalf("Failed to create client: %s", err)
+ }
+ defer conn.Close()
+ client := testgrpc.NewTestServiceClient(conn)
+
+ received := checkRPCSendOK(t, ctx, client, 100)
+ if len(received) != 1 {
+ t.Errorf("Got RPCs routed to %v backends, want %v", len(received), 1)
+ }
+ var count int
+ for _, count = range received {
+ }
+ if count != 100 {
+ t.Errorf("Got %v RPCs routed to a backend, want %v", count, 100)
+ }
+}
+
+// headerHashRoute creates a RouteConfiguration with a hash policy that uses the
+// provided header.
+func headerHashRoute(routeName, virtualHostName, clusterName, header string) *v3routepb.RouteConfiguration {
+ route := e2e.DefaultRouteConfig(routeName, virtualHostName, clusterName)
+ hashPolicy := v3routepb.RouteAction_HashPolicy{
+ PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{
+ Header: &v3routepb.RouteAction_HashPolicy_Header{
+ HeaderName: header,
+ },
+ },
+ }
+ action := route.VirtualHosts[0].Routes[0].Action.(*v3routepb.Route_Route)
+ action.Route.HashPolicy = []*v3routepb.RouteAction_HashPolicy{&hashPolicy}
+ return route
+}
+
+// Tests that ring hash policy that hashes using a header value can spread RPCs
+// across all the backends.
+func (s) TestRingHash_HeaderHashing(t *testing.T) {
+ backends, stop := startTestServiceBackends(t, 4)
+ defer stop()
+
+ // We must set the host name socket address in EDS, as the ring hash policy
+ // uses it to construct the ring.
+ host, _, err := net.SplitHostPort(backends[0].Address)
+ if err != nil {
+ t.Fatalf("Failed to split host and port from stubserver: %v", err)
+ }
+
+ xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
+ defer stop()
+ const clusterName = "cluster"
+ endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
+ ClusterName: clusterName,
+ Host: host,
+ Localities: []e2e.LocalityOptions{{
+ Backends: backendOptions(t, backends),
+ Weight: 1,
+ }},
+ })
+ cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
+ ClusterName: clusterName,
+ ServiceName: clusterName,
+ Policy: e2e.LoadBalancingPolicyRingHash,
+ })
+ route := headerHashRoute("new_route", virtualHostName, clusterName, "address_hash")
+ listener := e2e.DefaultClientListener(virtualHostName, route.Name)
+
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+
+ err = xdsServer.Update(ctx, e2e.UpdateOptions{
+ NodeID: nodeID,
+ Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints},
+ Clusters: []*v3clusterpb.Cluster{cluster},
+ Routes: []*v3routepb.RouteConfiguration{route},
+ Listeners: []*v3listenerpb.Listener{listener},
+ })
+ if err != nil {
+ t.Fatalf("Failed to update xDS resources: %v", err)
+ }
+
+ conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials()))
+ if err != nil {
+ t.Fatalf("Failed to create client: %s", err)
+ }
+ defer conn.Close()
+ client := testgrpc.NewTestServiceClient(conn)
+
+ // Note each type of RPC contains a header value that will always be hashed
+ // to a specific backend as the header value matches the value used to
+ // create the entry in the ring.
+ for _, backend := range backends {
+ ctx := metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", backend.Address+"_0"))
+ reqPerBackend := checkRPCSendOK(t, ctx, client, 1)
+ if reqPerBackend[backend.Address] != 1 {
+ t.Errorf("Got RPC routed to backend %v, want %v", reqPerBackend, backend.Address)
+ }
+ }
+}
+
+// Tests that ring hash policy that hashes using a header value and regex
+// rewrite to aggregate RPCs to 1 backend.
+func (s) TestRingHash_HeaderHashingWithRegexRewrite(t *testing.T) {
+ backends, stop := startTestServiceBackends(t, 4)
+ defer stop()
+
+ // We must set the host name socket address in EDS, as the ring hash policy
+ // uses it to construct the ring.
+ host, _, err := net.SplitHostPort(backends[0].Address)
+ if err != nil {
+ t.Fatalf("Failed to split host and port from stubserver: %v", err)
+ }
+
+ xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
+ defer stop()
+ clusterName := "cluster"
+ endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
+ ClusterName: clusterName,
+ Host: host,
+ Localities: []e2e.LocalityOptions{{
+ Backends: backendOptions(t, backends),
+ Weight: 1,
+ }},
+ })
+ cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
+ ClusterName: clusterName,
+ ServiceName: clusterName,
+ Policy: e2e.LoadBalancingPolicyRingHash,
+ })
+ route := headerHashRoute("new_route", virtualHostName, clusterName, "address_hash")
+ action := route.VirtualHosts[0].Routes[0].Action.(*v3routepb.Route_Route)
+ action.Route.HashPolicy[0].GetHeader().RegexRewrite = &v3matcherpb.RegexMatchAndSubstitute{
+ Pattern: &v3matcherpb.RegexMatcher{
+ EngineType: &v3matcherpb.RegexMatcher_GoogleRe2{},
+ Regex: "[0-9]+",
+ },
+ Substitution: "foo",
+ }
+ listener := e2e.DefaultClientListener(virtualHostName, route.Name)
+
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+
+ err = xdsServer.Update(ctx, e2e.UpdateOptions{
+ NodeID: nodeID,
+ Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints},
+ Clusters: []*v3clusterpb.Cluster{cluster},
+ Routes: []*v3routepb.RouteConfiguration{route},
+ Listeners: []*v3listenerpb.Listener{listener},
+ })
+ if err != nil {
+ t.Fatalf("Failed to update xDS resources: %v", err)
+ }
+
+ conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials()))
+ if err != nil {
+ t.Fatalf("Failed to create client: %s", err)
+ }
+ defer conn.Close()
+ client := testgrpc.NewTestServiceClient(conn)
+
+ // Note each type of RPC contains a header value that would always be hashed
+ // to a specific backend as the header value matches the value used to
+ // create the entry in the ring. However, the regex rewrites all numbers to
+ // "foo", and header values only differ by numbers, so they all end up
+ // hashing to the same value.
+ gotPerBackend := make(map[string]int)
+ for _, backend := range backends {
+ ctx := metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", backend.Address+"_0"))
+ res := checkRPCSendOK(t, ctx, client, 100)
+ for addr, count := range res {
+ gotPerBackend[addr] += count
+ }
+ }
+ if want := 1; len(gotPerBackend) != want {
+ t.Errorf("Got RPCs routed to %v backends, want %v", len(gotPerBackend), want)
+ }
+ var got int
+ for _, got = range gotPerBackend {
+ }
+ if want := 400; got != want {
+ t.Errorf("Got %v RPCs routed to a backend, want %v", got, want)
+ }
+}
+
+// computeIdealNumberOfRPCs computes the ideal number of RPCs to send so that
+// we can observe an event happening with probability p, and the result will
+// have value p with the given error tolerance.
+//
+// See https://github.com/grpc/grpc/blob/4f6e13bdda9e8c26d6027af97db4b368ca2b3069/test/cpp/end2end/xds/xds_end2end_test_lib.h#L941
+// for an explanation of the formula.
+func computeIdealNumberOfRPCs(t *testing.T, p, errorTolerance float64) int {
+ if p < 0 || p > 1 {
+ t.Fatal("p must be in (0, 1)")
+ }
+ numRPCs := math.Ceil(p * (1 - p) * 5. * 5. / errorTolerance / errorTolerance)
+ return int(numRPCs + 1000.) // add 1k as a buffer to avoid flakyness.
+}
+
+// setRingHashLBPolicyWithHighMinRingSize sets the ring hash policy with a high
+// minimum ring size to ensure that the ring is large enough to distribute
+// requests more uniformly across endpoints when a random hash is used.
+func setRingHashLBPolicyWithHighMinRingSize(t *testing.T, cluster *v3clusterpb.Cluster) {
+ minRingSize := uint64(100000)
+ oldVal := envconfig.RingHashCap
+ envconfig.RingHashCap = minRingSize
+ t.Cleanup(func() {
+ envconfig.RingHashCap = oldVal
+ })
+ // Increasing min ring size for random distribution.
+ config := testutils.MarshalAny(t, &v3ringhashpb.RingHash{
+ HashFunction: v3ringhashpb.RingHash_XX_HASH,
+ MinimumRingSize: &wrapperspb.UInt64Value{Value: minRingSize},
+ })
+ cluster.LoadBalancingPolicy = &v3clusterpb.LoadBalancingPolicy{
+ Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{{
+ TypedExtensionConfig: &v3corepb.TypedExtensionConfig{
+ Name: "envoy.load_balancing_policies.ring_hash",
+ TypedConfig: config,
+ },
+ }},
+ }
+}
+
+// Tests that ring hash policy that hashes using a random value.
+func (s) TestRingHash_NoHashPolicy(t *testing.T) {
+ backends, stop := startTestServiceBackends(t, 2)
+ defer stop()
+ numRPCs := computeIdealNumberOfRPCs(t, .5, errorTolerance)
+
+ xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
+ defer stop()
+ const clusterName = "cluster"
+ endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
+ ClusterName: clusterName,
+ Localities: []e2e.LocalityOptions{{
+ Backends: backendOptions(t, backends),
+ Weight: 1,
+ }},
+ })
+ cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
+ ClusterName: clusterName,
+ ServiceName: clusterName,
+ })
+ setRingHashLBPolicyWithHighMinRingSize(t, cluster)
+ route := e2e.DefaultRouteConfig("new_route", virtualHostName, clusterName)
+ listener := e2e.DefaultClientListener(virtualHostName, route.Name)
+
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+
+ err := xdsServer.Update(ctx, e2e.UpdateOptions{
+ NodeID: nodeID,
+ Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints},
+ Clusters: []*v3clusterpb.Cluster{cluster},
+ Routes: []*v3routepb.RouteConfiguration{route},
+ Listeners: []*v3listenerpb.Listener{listener},
+ })
+ if err != nil {
+ t.Fatalf("Failed to update xDS resources: %v", err)
+ }
+
+ conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials()))
+ if err != nil {
+ t.Fatalf("Failed to create client: %s", err)
+ }
+ defer conn.Close()
+ client := testgrpc.NewTestServiceClient(conn)
+
+ // Send a large number of RPCs and check that they are distributed randomly.
+ gotPerBackend := checkRPCSendOK(t, ctx, client, numRPCs)
+ for _, backend := range backends {
+ got := float64(gotPerBackend[backend.Address]) / float64(numRPCs)
+ want := .5
+ if !cmp.Equal(got, want, cmpopts.EquateApprox(0, errorTolerance)) {
+ t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[2].Address, got, want, errorTolerance)
+ }
+ }
+}
+
+// Tests that we observe endpoint weights.
+func (s) TestRingHash_EndpointWeights(t *testing.T) {
+ backends, stop := startTestServiceBackends(t, 3)
+ defer stop()
+ xdsServer, nodeID, _, xdsResolver, stop := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
+ defer stop()
+ const clusterName = "cluster"
+ backendOpts := []e2e.BackendOptions{
+ {Port: testutils.ParsePort(t, backends[0].Address)},
+ {Port: testutils.ParsePort(t, backends[1].Address)},
+ {Port: testutils.ParsePort(t, backends[2].Address), Weight: 2},
+ }
+
+ endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
+ ClusterName: clusterName,
+ Localities: []e2e.LocalityOptions{{
+ Backends: backendOpts,
+ Weight: 1,
+ }},
+ })
+ endpoints.Endpoints[0].LbEndpoints[0].LoadBalancingWeight = wrapperspb.UInt32(uint32(1))
+ endpoints.Endpoints[0].LbEndpoints[1].LoadBalancingWeight = wrapperspb.UInt32(uint32(1))
+ endpoints.Endpoints[0].LbEndpoints[2].LoadBalancingWeight = wrapperspb.UInt32(uint32(2))
+ cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
+ ClusterName: clusterName,
+ ServiceName: clusterName,
+ })
+ // Increasing min ring size for random distribution.
+ setRingHashLBPolicyWithHighMinRingSize(t, cluster)
+ route := e2e.DefaultRouteConfig("new_route", virtualHostName, clusterName)
+ listener := e2e.DefaultClientListener(virtualHostName, route.Name)
+
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
+ defer cancel()
+
+ err := xdsServer.Update(ctx, e2e.UpdateOptions{
+ NodeID: nodeID,
+ Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints},
+ Clusters: []*v3clusterpb.Cluster{cluster},
+ Routes: []*v3routepb.RouteConfiguration{route},
+ Listeners: []*v3listenerpb.Listener{listener},
+ })
+ if err != nil {
+ t.Fatalf("Failed to update xDS resources: %v", err)
+ }
+
+ conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials()))
+ if err != nil {
+ t.Fatalf("Failed to create client: %s", err)
+ }
+ defer conn.Close()
+ client := testgrpc.NewTestServiceClient(conn)
+
+ // Send a large number of RPCs and check that they are distributed randomly.
+ numRPCs := computeIdealNumberOfRPCs(t, .25, errorTolerance)
+ gotPerBackend := checkRPCSendOK(t, ctx, client, numRPCs)
+
+ got := float64(gotPerBackend[backends[0].Address]) / float64(numRPCs)
+ want := .25
+ if !cmp.Equal(got, want, cmpopts.EquateApprox(0, errorTolerance)) {
+ t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[0].Address, got, want, errorTolerance)
+ }
+ got = float64(gotPerBackend[backends[1].Address]) / float64(numRPCs)
+ if !cmp.Equal(got, want, cmpopts.EquateApprox(0, errorTolerance)) {
+ t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[1].Address, got, want, errorTolerance)
+ }
+ got = float64(gotPerBackend[backends[2].Address]) / float64(numRPCs)
+ want = .50
+ if !cmp.Equal(got, want, cmpopts.EquateApprox(0, errorTolerance)) {
+ t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[2].Address, got, want, errorTolerance)
}
}