From f11f5f3b2164882e7d52adf1cc372eb507be61d9 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 26 Apr 2024 19:21:40 -0400 Subject: [PATCH 1/7] fix Signed-off-by: Yuri Shkuro --- cmd/agent/app/reporter/grpc/builder.go | 3 +- cmd/agent/app/reporter/grpc/builder_test.go | 110 ++++++-------------- 2 files changed, 34 insertions(+), 79 deletions(-) diff --git a/cmd/agent/app/reporter/grpc/builder.go b/cmd/agent/app/reporter/grpc/builder.go index 5853ef7912b..9290107dd3a 100644 --- a/cmd/agent/app/reporter/grpc/builder.go +++ b/cmd/agent/app/reporter/grpc/builder.go @@ -102,8 +102,7 @@ func (b *ConnBuilder) CreateConnection(ctx context.Context, logger *zap.Logger, dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(b.MaxRetry)))) dialOptions = append(dialOptions, b.AdditionalDialOptions...) - // TODO: Need to replace grpc.Dial with grpc.NewClient and pass test - conn, err := grpc.Dial(dialTarget, dialOptions...) + conn, err := grpc.NewClient(dialTarget, dialOptions...) if err != nil { return nil, err } diff --git a/cmd/agent/app/reporter/grpc/builder_test.go b/cmd/agent/app/reporter/grpc/builder_test.go index 1206cb6ab78..d241cd350e4 100644 --- a/cmd/agent/app/reporter/grpc/builder_test.go +++ b/cmd/agent/app/reporter/grpc/builder_test.go @@ -24,7 +24,6 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" yaml "gopkg.in/yaml.v2" @@ -69,78 +68,52 @@ func TestBuilderFromConfig(t *testing.T) { func TestBuilderWithCollectors(t *testing.T) { spanHandler1 := &mockSpanHandler{} - s1, addr1 := initializeGRPCTestServer(t, func(s *grpc.Server) { + s1, _ := initializeGRPCTestServer(t, func(s *grpc.Server) { api_v2.RegisterCollectorServiceServer(s, spanHandler1) }) defer s1.Stop() tests := []struct { - target string - name string - hostPorts []string - checkSuffixOnly bool - notifier discovery.Notifier - discoverer discovery.Discoverer - expectedError string - checkConnectionState bool - expectedState string + target string + name string + hostPorts []string + checkSuffixOnly bool + notifier discovery.Notifier + discoverer discovery.Discoverer + expectedError string }{ { - target: "///round_robin", - name: "with roundrobin schema", - hostPorts: []string{"127.0.0.1:9876", "127.0.0.1:9877", "127.0.0.1:9878"}, - checkSuffixOnly: true, - notifier: nil, - discoverer: nil, - checkConnectionState: false, + target: "///round_robin", + name: "with roundrobin schema", + hostPorts: []string{"127.0.0.1:9876", "127.0.0.1:9877", "127.0.0.1:9878"}, + checkSuffixOnly: true, + notifier: nil, + discoverer: nil, }, { - target: "127.0.0.1:9876", - name: "with single host", - hostPorts: []string{"127.0.0.1:9876"}, - checkSuffixOnly: false, - notifier: nil, - discoverer: nil, - checkConnectionState: false, + target: "127.0.0.1:9876", + name: "with single host", + hostPorts: []string{"127.0.0.1:9876"}, + checkSuffixOnly: false, + notifier: nil, + discoverer: nil, }, { - target: "///round_robin", - name: "with custom resolver and fixed discoverer", - hostPorts: []string{"dns://random_stuff"}, - checkSuffixOnly: true, - notifier: noopNotifier{}, - discoverer: discovery.FixedDiscoverer{}, - checkConnectionState: false, + target: "///round_robin", + name: "with custom resolver and fixed discoverer", + hostPorts: []string{"dns://random_stuff"}, + checkSuffixOnly: true, + notifier: noopNotifier{}, + discoverer: discovery.FixedDiscoverer{}, }, { - target: "", - name: "without collectorPorts and resolver", - hostPorts: nil, - checkSuffixOnly: false, - notifier: nil, - discoverer: nil, - expectedError: "at least one collector hostPort address is required when resolver is not available", - checkConnectionState: false, - }, - { - target: addr1.String(), - name: "with collector connection status ready", - hostPorts: []string{addr1.String()}, - checkSuffixOnly: false, - notifier: nil, - discoverer: nil, - checkConnectionState: true, - expectedState: "READY", - }, - { - target: "random_stuff", - name: "with collector connection status failure", - hostPorts: []string{"random_stuff"}, - checkSuffixOnly: false, - notifier: nil, - discoverer: nil, - checkConnectionState: true, - expectedState: "TRANSIENT_FAILURE", + target: "", + name: "without collectorPorts and resolver", + hostPorts: nil, + checkSuffixOnly: false, + notifier: nil, + discoverer: nil, + expectedError: "at least one collector hostPort address is required when resolver is not available", }, } @@ -159,9 +132,6 @@ func TestBuilderWithCollectors(t *testing.T) { require.NoError(t, err) defer conn.Close() require.NotNil(t, conn) - if test.checkConnectionState { - assertConnectionState(t, conn, test.expectedState) - } if test.checkSuffixOnly { assert.True(t, strings.HasSuffix(conn.Target(), test.target)) } else { @@ -395,20 +365,6 @@ func TestProxyClientTLS(t *testing.T) { } } -func assertConnectionState(t *testing.T, conn *grpc.ClientConn, expectedState string) { - for { - s := conn.GetState() - if s == connectivity.Ready { - assert.Equal(t, expectedState, s.String()) - break - } - if s == connectivity.TransientFailure { - assert.Equal(t, expectedState, s.String()) - break - } - } -} - type fakeInterceptor struct { isCalled bool } From b8f750de81513d85263025d917ba27eca19562b2 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 26 Apr 2024 19:24:01 -0400 Subject: [PATCH 2/7] fix Signed-off-by: Yuri Shkuro --- cmd/query/app/grpc_handler_test.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/cmd/query/app/grpc_handler_test.go b/cmd/query/app/grpc_handler_test.go index 72c524f79af..bec65d371b3 100644 --- a/cmd/query/app/grpc_handler_test.go +++ b/cmd/query/app/grpc_handler_test.go @@ -174,10 +174,7 @@ func newGRPCServer(t *testing.T, q *querysvc.QueryService, mq querysvc.MetricsQu } func newGRPCClient(t *testing.T, addr string) *grpcClient { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - // TODO: Need to replace grpc.DialContext with grpc.NewClient and pass test - conn, err := grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) return &grpcClient{ From 384c6689f66d9c7609b518b4ffd985427644ef09 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 26 Apr 2024 19:25:23 -0400 Subject: [PATCH 3/7] fix Signed-off-by: Yuri Shkuro --- cmd/query/app/server_test.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index 4a08c7b4f29..df1c8d6bc83 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -415,17 +415,13 @@ func TestServerHTTPTLS(t *testing.T) { } func newGRPCClientWithTLS(t *testing.T, addr string, creds credentials.TransportCredentials) *grpcClient { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() var conn *grpc.ClientConn var err error if creds != nil { - // TODO: Need to replace grpc.DialContext with grpc.NewClient and pass test - conn, err = grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(creds)) + conn, err = grpc.NewClient(addr, grpc.WithTransportCredentials(creds)) } else { - // TODO: Need to replace grpc.DialContext with grpc.NewClient and pass test - conn, err = grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err = grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) } require.NoError(t, err) From 5da003db9c6cf26ef7ef6ed0af5bb7ca0102468d Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 26 Apr 2024 19:27:15 -0400 Subject: [PATCH 4/7] fix Signed-off-by: Yuri Shkuro --- plugin/storage/grpc/config/config.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/plugin/storage/grpc/config/config.go b/plugin/storage/grpc/config/config.go index c22e9f9d6f1..6585118b242 100644 --- a/plugin/storage/grpc/config/config.go +++ b/plugin/storage/grpc/config/config.go @@ -15,7 +15,6 @@ package config import ( - "context" "fmt" "os/exec" "time" @@ -109,17 +108,13 @@ func (c *Configuration) buildRemote(logger *zap.Logger, tracerProvider trace.Tra opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } - ctx, cancel := context.WithTimeout(context.Background(), c.RemoteConnectTimeout) - defer cancel() - tenancyMgr := tenancy.NewManager(&c.TenancyOpts) if tenancyMgr.Enabled { opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr))) opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr))) } var err error - // TODO: Need to replace grpc.DialContext with grpc.NewClient and pass test - c.remoteConn, err = grpc.DialContext(ctx, c.RemoteServerAddr, opts...) + c.remoteConn, err = grpc.NewClient(c.RemoteServerAddr, opts...) if err != nil { return nil, fmt.Errorf("error connecting to remote storage: %w", err) } From 053634ebf5a51bb2f75e2606eb053d48fa3c555a Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 26 Apr 2024 21:01:43 -0400 Subject: [PATCH 5/7] fix Signed-off-by: Yuri Shkuro --- plugin/storage/grpc/config/config.go | 10 ++++++---- plugin/storage/grpc/config/config_test.go | 23 +++++++++++++++++++++++ plugin/storage/grpc/factory_test.go | 10 ++++------ 3 files changed, 33 insertions(+), 10 deletions(-) create mode 100644 plugin/storage/grpc/config/config_test.go diff --git a/plugin/storage/grpc/config/config.go b/plugin/storage/grpc/config/config.go index 6585118b242..fe96ee35582 100644 --- a/plugin/storage/grpc/config/config.go +++ b/plugin/storage/grpc/config/config.go @@ -76,7 +76,7 @@ func (c *Configuration) Build(logger *zap.Logger, tracerProvider trace.TracerPro if c.PluginBinary != "" { return c.buildPlugin(logger, tracerProvider) } else { - return c.buildRemote(logger, tracerProvider) + return c.buildRemote(logger, tracerProvider, grpc.NewClient) } } @@ -92,7 +92,9 @@ func (c *Configuration) Close() error { return c.RemoteTLS.Close() } -func (c *Configuration) buildRemote(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error) { +type newClientFn func(target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) + +func (c *Configuration) buildRemote(logger *zap.Logger, tracerProvider trace.TracerProvider, newClient newClientFn) (*ClientPluginServices, error) { opts := []grpc.DialOption{ grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(tracerProvider))), grpc.WithBlock(), @@ -114,9 +116,9 @@ func (c *Configuration) buildRemote(logger *zap.Logger, tracerProvider trace.Tra opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr))) } var err error - c.remoteConn, err = grpc.NewClient(c.RemoteServerAddr, opts...) + c.remoteConn, err = newClient(c.RemoteServerAddr, opts...) if err != nil { - return nil, fmt.Errorf("error connecting to remote storage: %w", err) + return nil, fmt.Errorf("error creating remote storage client: %w", err) } grpcClient := shared.NewGRPCClient(c.remoteConn) diff --git a/plugin/storage/grpc/config/config_test.go b/plugin/storage/grpc/config/config_test.go new file mode 100644 index 00000000000..64f54e58418 --- /dev/null +++ b/plugin/storage/grpc/config/config_test.go @@ -0,0 +1,23 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package config + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +func TestBuildRemoteNewClientError(t *testing.T) { + // this is a silly test to verify handling of error from grpc.NewClient, which cannot be induced via params. + c := &Configuration{} + _, err := c.buildRemote(zap.NewNop(), nil, func(target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) { + return nil, errors.New("test error") + }) + require.Error(t, err) + require.Contains(t, err.Error(), "error creating remote storage client") +} diff --git a/plugin/storage/grpc/factory_test.go b/plugin/storage/grpc/factory_test.go index 62699eb2e2a..f9033964d4f 100644 --- a/plugin/storage/grpc/factory_test.go +++ b/plugin/storage/grpc/factory_test.go @@ -148,10 +148,6 @@ func TestGRPCStorageFactory(t *testing.T) { } func TestGRPCStorageFactoryWithConfig(t *testing.T) { - cfg := grpcConfig.Configuration{} - _, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop()) - require.ErrorContains(t, err, "grpc-plugin builder failed to create a store: error connecting to remote storage") - lis, err := net.Listen("tcp", ":0") require.NoError(t, err, "failed to listen") @@ -163,8 +159,10 @@ func TestGRPCStorageFactoryWithConfig(t *testing.T) { }() defer s.Stop() - cfg.RemoteServerAddr = lis.Addr().String() - cfg.RemoteConnectTimeout = 1 * time.Second + cfg := grpcConfig.Configuration{ + RemoteServerAddr: lis.Addr().String(), + RemoteConnectTimeout: 1 * time.Second, + } f, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop()) require.NoError(t, err) require.NoError(t, f.Close()) From 7b72e75258ac984af2a38a7f2ce1442aa0ab8a50 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 26 Apr 2024 21:32:51 -0400 Subject: [PATCH 6/7] fix Signed-off-by: Yuri Shkuro --- cmd/query/app/server_test.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index df1c8d6bc83..3e8d82097d3 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -344,6 +344,7 @@ func TestServerHTTPTLS(t *testing.T) { jtracer.NoOp()) require.NoError(t, err) require.NoError(t, server.Start()) + defer server.Close() var clientError error var clientClose func() error @@ -408,8 +409,8 @@ func TestServerHTTPTLS(t *testing.T) { require.NoError(t, err2) } } - server.Close() - assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) + // server.Close() + // assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) }) } } @@ -490,6 +491,7 @@ func TestServerGRPCTLS(t *testing.T) { jtracer.NoOp()) require.NoError(t, err) require.NoError(t, server.Start()) + defer server.Close() var clientError error var client *grpcClient @@ -500,12 +502,11 @@ func TestServerGRPCTLS(t *testing.T) { defer test.clientTLS.Close() creds := credentials.NewTLS(clientTLSCfg) client = newGRPCClientWithTLS(t, ports.PortToHostPort(ports.QueryGRPC), creds) - } else { client = newGRPCClientWithTLS(t, ports.PortToHostPort(ports.QueryGRPC), nil) } - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() res, clientError := client.GetServices(ctx, &api_v2.GetServicesRequest{}) @@ -517,8 +518,8 @@ func TestServerGRPCTLS(t *testing.T) { assert.Equal(t, expectedServices, res.Services) } require.NoError(t, client.conn.Close()) - server.Close() - assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) + // server.Close() + // assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) }) } } From a42ba182826ce007fb488e18f3baa17bffb71e2a Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sun, 28 Apr 2024 18:18:10 -0400 Subject: [PATCH 7/7] fix Signed-off-by: Yuri Shkuro --- cmd/query/app/grpc_handler_test.go | 5 ++++- cmd/query/app/server_test.go | 21 ++++++++++++--------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/cmd/query/app/grpc_handler_test.go b/cmd/query/app/grpc_handler_test.go index bec65d371b3..72c524f79af 100644 --- a/cmd/query/app/grpc_handler_test.go +++ b/cmd/query/app/grpc_handler_test.go @@ -174,7 +174,10 @@ func newGRPCServer(t *testing.T, q *querysvc.QueryService, mq querysvc.MetricsQu } func newGRPCClient(t *testing.T, addr string) *grpcClient { - conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + // TODO: Need to replace grpc.DialContext with grpc.NewClient and pass test + conn, err := grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) return &grpcClient{ diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index 3e8d82097d3..4a08c7b4f29 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -344,7 +344,6 @@ func TestServerHTTPTLS(t *testing.T) { jtracer.NoOp()) require.NoError(t, err) require.NoError(t, server.Start()) - defer server.Close() var clientError error var clientClose func() error @@ -409,20 +408,24 @@ func TestServerHTTPTLS(t *testing.T) { require.NoError(t, err2) } } - // server.Close() - // assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) + server.Close() + assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) }) } } func newGRPCClientWithTLS(t *testing.T, addr string, creds credentials.TransportCredentials) *grpcClient { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() var conn *grpc.ClientConn var err error if creds != nil { - conn, err = grpc.NewClient(addr, grpc.WithTransportCredentials(creds)) + // TODO: Need to replace grpc.DialContext with grpc.NewClient and pass test + conn, err = grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(creds)) } else { - conn, err = grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + // TODO: Need to replace grpc.DialContext with grpc.NewClient and pass test + conn, err = grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials())) } require.NoError(t, err) @@ -491,7 +494,6 @@ func TestServerGRPCTLS(t *testing.T) { jtracer.NoOp()) require.NoError(t, err) require.NoError(t, server.Start()) - defer server.Close() var clientError error var client *grpcClient @@ -502,11 +504,12 @@ func TestServerGRPCTLS(t *testing.T) { defer test.clientTLS.Close() creds := credentials.NewTLS(clientTLSCfg) client = newGRPCClientWithTLS(t, ports.PortToHostPort(ports.QueryGRPC), creds) + } else { client = newGRPCClientWithTLS(t, ports.PortToHostPort(ports.QueryGRPC), nil) } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() res, clientError := client.GetServices(ctx, &api_v2.GetServicesRequest{}) @@ -518,8 +521,8 @@ func TestServerGRPCTLS(t *testing.T) { assert.Equal(t, expectedServices, res.Services) } require.NoError(t, client.conn.Close()) - // server.Close() - // assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) + server.Close() + assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) }) } }