Skip to content

Commit

Permalink
[query/tests] Use grpc.NewClient (#5391)
Browse files Browse the repository at this point in the history
  • Loading branch information
yurishkuro committed May 12, 2024
1 parent 84ea40e commit a2a9188
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 85 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci-unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ jobs:
go-version: 1.22.x
cache-dependency-path: ./go.sum

# download dependencies separately to keep unit test step's output cleaner
- name: go mod download
run: go mod download

- name: Install test deps
# even though the same target runs from test-ci, running it separately makes for cleaner log in GH workflow
run: make install-test-tools
Expand Down
5 changes: 1 addition & 4 deletions cmd/query/app/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,7 @@ func newGRPCServer(t *testing.T, q *querysvc.QueryService, mq querysvc.MetricsQu
}

func newGRPCClient(t *testing.T, addr string) *grpcClient {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
// TODO: Need to replace grpc.DialContext with grpc.NewClient and pass test
conn, err := grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)

return &grpcClient{
Expand Down
3 changes: 2 additions & 1 deletion cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (s *Server) Start() error {
if err != nil && !errors.Is(err, http.ErrServerClosed) && !errors.Is(err, cmux.ErrListenerClosed) && !errors.Is(err, cmux.ErrServerClosed) {
s.logger.Error("Could not start HTTP server", zap.Error(err))
}

s.logger.Info("HTTP server stopped", zap.Int("port", httpPort), zap.String("addr", s.queryOptions.HTTPHostPort))
s.healthCheck.Set(healthcheck.Unavailable)
s.bgFinished.Done()
}()
Expand All @@ -321,6 +321,7 @@ func (s *Server) Start() error {
if err := s.grpcServer.Serve(s.grpcConn); err != nil {
s.logger.Error("Could not start GRPC server", zap.Error(err))
}
s.logger.Info("GRPC server stopped", zap.Int("port", grpcPort), zap.String("addr", s.queryOptions.GRPCHostPort))
s.healthCheck.Set(healthcheck.Unavailable)
s.bgFinished.Done()
}()
Expand Down
170 changes: 90 additions & 80 deletions cmd/query/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"go.uber.org/zap/zaptest/observer"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -66,7 +67,7 @@ func TestCreateTLSServerSinglePortError(t *testing.T) {
ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem",
}

_, err := NewServer(zap.NewNop(), healthcheck.New(), &querysvc.QueryService{}, nil,
_, err := NewServer(zaptest.NewLogger(t), healthcheck.New(), &querysvc.QueryService{}, nil,
&QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8080", TLSGRPC: tlsCfg, TLSHTTP: tlsCfg},
tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp())
require.Error(t, err)
Expand All @@ -80,7 +81,7 @@ func TestCreateTLSGrpcServerError(t *testing.T) {
ClientCAPath: "invalid/path",
}

_, err := NewServer(zap.NewNop(), healthcheck.New(), &querysvc.QueryService{}, nil,
_, err := NewServer(zaptest.NewLogger(t), healthcheck.New(), &querysvc.QueryService{}, nil,
&QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSGRPC: tlsCfg},
tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp())
require.Error(t, err)
Expand All @@ -94,7 +95,7 @@ func TestCreateTLSHttpServerError(t *testing.T) {
ClientCAPath: "invalid/path",
}

_, err := NewServer(zap.NewNop(), healthcheck.New(), &querysvc.QueryService{}, nil,
_, err := NewServer(zaptest.NewLogger(t), healthcheck.New(), &querysvc.QueryService{}, nil,
&QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSHTTP: tlsCfg},
tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp())
require.Error(t, err)
Expand Down Expand Up @@ -283,6 +284,27 @@ var testCases = []struct {
},
}

type fakeQueryService struct {
qs *querysvc.QueryService
spanReader *spanstoremocks.Reader
dependencyReader *depsmocks.Reader
expectedServices []string
}

func makeQuerySvc() *fakeQueryService {
spanReader := &spanstoremocks.Reader{}
dependencyReader := &depsmocks.Reader{}
expectedServices := []string{"test"}
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)
qs := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
return &fakeQueryService{
qs: qs,
spanReader: spanReader,
dependencyReader: dependencyReader,
expectedServices: expectedServices,
}
}

func TestServerHTTPTLS(t *testing.T) {
testlen := len(testCases)

Expand Down Expand Up @@ -331,29 +353,25 @@ func TestServerHTTPTLS(t *testing.T) {
},
}
flagsSvc := flags.NewService(ports.QueryAdminHTTP)
flagsSvc.Logger = zap.NewNop()
flagsSvc.Logger = zaptest.NewLogger(t)

spanReader := &spanstoremocks.Reader{}
dependencyReader := &depsmocks.Reader{}
expectedServices := []string{"test"}
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc,
querySvc := makeQuerySvc()
server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc.qs,
nil, serverOptions, tenancy.NewManager(&tenancy.Options{}),
jtracer.NoOp())
require.NoError(t, err)
require.NoError(t, server.Start())
t.Cleanup(func() {
require.NoError(t, server.Close())
})

var clientError error
var clientClose func() error
var clientTLSCfg *tls.Config

if serverOptions.TLSHTTP.Enabled {

var err0 error

clientTLSCfg, err0 = test.clientTLS.Config(zap.NewNop())
clientTLSCfg, err0 = test.clientTLS.Config(flagsSvc.Logger)
defer test.clientTLS.Close()

require.NoError(t, err0)
Expand Down Expand Up @@ -390,8 +408,7 @@ func TestServerHTTPTLS(t *testing.T) {
TLSClientConfig: clientTLSCfg,
},
}
readMock := spanReader
readMock.On("FindTraces", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceQueryParameters")).Return([]*model.Trace{mockTrace}, nil).Once()
querySvc.spanReader.On("FindTraces", mock.Anything, mock.Anything).Return([]*model.Trace{mockTrace}, nil).Once()
queryString := "/api/traces?service=service&start=0&end=0&operation=operation&limit=200&minDuration=20ms"
req, err := http.NewRequest(http.MethodGet, "https://localhost:"+fmt.Sprintf("%d", ports.QueryHTTP)+queryString, nil)
require.NoError(t, err)
Expand All @@ -408,24 +425,18 @@ func TestServerHTTPTLS(t *testing.T) {
require.NoError(t, err2)
}
}
server.Close()
assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get())
})
}
}

func newGRPCClientWithTLS(t *testing.T, addr string, creds credentials.TransportCredentials) *grpcClient {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
var conn *grpc.ClientConn
var err error

if creds != nil {
// TODO: Need to replace grpc.DialContext with grpc.NewClient and pass test
conn, err = grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(creds))
conn, err = grpc.NewClient(addr, grpc.WithTransportCredentials(creds))
} else {
// TODO: Need to replace grpc.DialContext with grpc.NewClient and pass test
conn, err = grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err = grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

require.NoError(t, err)
Expand Down Expand Up @@ -481,69 +492,67 @@ func TestServerGRPCTLS(t *testing.T) {
},
}
flagsSvc := flags.NewService(ports.QueryAdminHTTP)
flagsSvc.Logger = zap.NewNop()

spanReader := &spanstoremocks.Reader{}
dependencyReader := &depsmocks.Reader{}
expectedServices := []string{"test"}
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)
flagsSvc.Logger = zaptest.NewLogger(t)

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc,
querySvc := makeQuerySvc()
server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc.qs,
nil, serverOptions, tenancy.NewManager(&tenancy.Options{}),
jtracer.NoOp())
require.NoError(t, err)
require.NoError(t, server.Start())
t.Cleanup(func() {
require.NoError(t, server.Close())
})

var clientError error
var client *grpcClient

if serverOptions.TLSGRPC.Enabled {
clientTLSCfg, err0 := test.clientTLS.Config(zap.NewNop())
clientTLSCfg, err0 := test.clientTLS.Config(flagsSvc.Logger)
require.NoError(t, err0)
defer test.clientTLS.Close()
creds := credentials.NewTLS(clientTLSCfg)
client = newGRPCClientWithTLS(t, ports.PortToHostPort(ports.QueryGRPC), creds)

} else {
client = newGRPCClientWithTLS(t, ports.PortToHostPort(ports.QueryGRPC), nil)
}
t.Cleanup(func() {
require.NoError(t, client.conn.Close())
})

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
// using generous timeout since grpc.NewClient no longer does a handshake.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

flagsSvc.Logger.Info("calling client.GetServices()")
res, clientError := client.GetServices(ctx, &api_v2.GetServicesRequest{})
flagsSvc.Logger.Info("returned from GetServices()")

if test.expectClientError {
require.Error(t, clientError)
} else {
require.NoError(t, clientError)
assert.Equal(t, expectedServices, res.Services)
assert.Equal(t, querySvc.expectedServices, res.Services)
}
require.NoError(t, client.conn.Close())
server.Close()
assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get())
})
}
}

func TestServerBadHostPort(t *testing.T) {
_, err := NewServer(zap.NewNop(), healthcheck.New(), &querysvc.QueryService{}, nil,
_, err := NewServer(zaptest.NewLogger(t), healthcheck.New(), &querysvc.QueryService{}, nil,
&QueryOptions{
HTTPHostPort: "8080",
HTTPHostPort: "8080", // bad string, not :port
GRPCHostPort: "127.0.0.1:8081",
QueryOptionsBase: QueryOptionsBase{
BearerTokenPropagation: true,
},
},
tenancy.NewManager(&tenancy.Options{}),
jtracer.NoOp())

require.Error(t, err)
_, err = NewServer(zap.NewNop(), healthcheck.New(), &querysvc.QueryService{}, nil,

_, err = NewServer(zaptest.NewLogger(t), healthcheck.New(), &querysvc.QueryService{}, nil,
&QueryOptions{
HTTPHostPort: "127.0.0.1:8081",
GRPCHostPort: "9123",
GRPCHostPort: "9123", // bad string, not :port
QueryOptionsBase: QueryOptionsBase{
BearerTokenPropagation: true,
},
Expand Down Expand Up @@ -571,7 +580,7 @@ func TestServerInUseHostPort(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
server, err := NewServer(
zap.NewNop(),
zaptest.NewLogger(t),
healthcheck.New(),
&querysvc.QueryService{},
nil,
Expand All @@ -586,26 +595,18 @@ func TestServerInUseHostPort(t *testing.T) {
jtracer.NoOp(),
)
require.NoError(t, err)

err = server.Start()
require.Error(t, err)

require.Error(t, server.Start())
server.Close()
})
}
}

func TestServerSinglePort(t *testing.T) {
flagsSvc := flags.NewService(ports.QueryAdminHTTP)
flagsSvc.Logger = zap.NewNop()
flagsSvc.Logger = zaptest.NewLogger(t)
hostPort := ports.GetAddressFromCLIOptions(ports.QueryHTTP, "")
spanReader := &spanstoremocks.Reader{}
dependencyReader := &depsmocks.Reader{}
expectedServices := []string{"test"}
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc, nil,
querySvc := makeQuerySvc()
server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc.qs, nil,
&QueryOptions{
GRPCHostPort: hostPort,
HTTPHostPort: hostPort,
Expand All @@ -617,19 +618,22 @@ func TestServerSinglePort(t *testing.T) {
jtracer.NoOp())
require.NoError(t, err)
require.NoError(t, server.Start())
t.Cleanup(func() {
require.NoError(t, server.Close())
})

client := newGRPCClient(t, hostPort)
defer client.conn.Close()
t.Cleanup(func() {
require.NoError(t, client.conn.Close())
})

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
// using generous timeout since grpc.NewClient no longer does a handshake.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

res, err := client.GetServices(ctx, &api_v2.GetServicesRequest{})
require.NoError(t, err)
assert.Equal(t, expectedServices, res.Services)

server.Close()
assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get())
assert.Equal(t, querySvc.expectedServices, res.Services)
}

func TestServerGracefulExit(t *testing.T) {
Expand All @@ -641,20 +645,27 @@ func TestServerGracefulExit(t *testing.T) {
flagsSvc.Logger = zap.New(zapCore)
hostPort := ports.PortToHostPort(ports.QueryAdminHTTP)

querySvc := &querysvc.QueryService{}
tracer := jtracer.NoOp()

server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc, nil,
querySvc := makeQuerySvc()
server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc.qs, nil,
&QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort},
tenancy.NewManager(&tenancy.Options{}), tracer)
tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp())
require.NoError(t, err)
require.NoError(t, server.Start())

// Wait for servers to come up before we can call .Close()
// TODO Find a way to wait only as long as necessary. Unconditional sleep slows down the tests.
time.Sleep(1 * time.Second)
server.Close()
{
client := newGRPCClient(t, hostPort)
t.Cleanup(func() {
require.NoError(t, client.conn.Close())
})
// using generous timeout since grpc.NewClient no longer does a handshake.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err := client.GetServices(ctx, &api_v2.GetServicesRequest{})
require.NoError(t, err)
}

server.Close()
for _, logEntry := range logs.All() {
assert.NotEqual(t, zap.ErrorLevel, logEntry.Level,
"Error log found on server exit: %v", logEntry)
Expand Down Expand Up @@ -724,15 +735,15 @@ func TestServerHTTPTenancy(t *testing.T) {
},
}
tenancyMgr := tenancy.NewManager(&serverOptions.Tenancy)

spanReader := &spanstoremocks.Reader{}
dependencyReader := &depsmocks.Reader{}

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
server, err := NewServer(zap.NewNop(), healthcheck.New(), querySvc,
querySvc := makeQuerySvc()
querySvc.spanReader.On("FindTraces", mock.Anything, mock.Anything).Return([]*model.Trace{mockTrace}, nil).Once()
server, err := NewServer(zaptest.NewLogger(t), healthcheck.New(), querySvc.qs,
nil, serverOptions, tenancyMgr, jtracer.NoOp())
require.NoError(t, err)
require.NoError(t, server.Start())
t.Cleanup(func() {
require.NoError(t, server.Close())
})

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
Expand Down Expand Up @@ -766,5 +777,4 @@ func TestServerHTTPTenancy(t *testing.T) {
}
})
}
server.Close()
}

0 comments on commit a2a9188

Please sign in to comment.