Skip to content

Commit

Permalink
grpc reflection metadata
Browse files Browse the repository at this point in the history
No description

---
3c551d4d5499fff815a67cd15bf452956057e256

Pull Request resolved: #188
  • Loading branch information
notanonymousenough authored and robot-piglet committed Apr 27, 2024
1 parent 57b3522 commit 32f96c6
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 24 deletions.
18 changes: 9 additions & 9 deletions components/guns/grpc/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ type GrpcDialOptions struct {
}

type GunConfig struct {
Target string `validate:"required"`
ReflectPort int64 `config:"reflect_port"`
Timeout time.Duration `config:"timeout"` // grpc request timeout
TLS bool `config:"tls"`
DialOptions GrpcDialOptions `config:"dial_options"`
AnswLog AnswLogConfig `config:"answlog"`
SharedClient struct {
Target string `validate:"required"`
ReflectPort int64 `config:"reflect_port"`
ReflectMetadata metadata.MD `config:"reflect_metadata"`
Timeout time.Duration `config:"timeout"` // grpc request timeout
TLS bool `config:"tls"`
DialOptions GrpcDialOptions `config:"dial_options"`
AnswLog AnswLogConfig `config:"answlog"`
SharedClient struct {
ClientNumber int `config:"client-number,omitempty"`
Enabled bool `config:"enabled"`
} `config:"shared-client,omitempty"`
Expand Down Expand Up @@ -110,8 +111,7 @@ func (g *Gun) prepareMethodList(opts *warmup.Options) (map[string]desc.MethodDes
}
defer conn.Close()

meta := make(metadata.MD)
refCtx := metadata.NewOutgoingContext(context.Background(), meta)
refCtx := metadata.NewOutgoingContext(context.Background(), g.Conf.ReflectMetadata)
refClient := grpcreflect.NewClientAuto(refCtx, conn)
listServices, err := refClient.ListServices()
if err != nil {
Expand Down
22 changes: 12 additions & 10 deletions components/guns/grpc/scenario/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
const defaultTimeout = time.Second * 15

type GunConfig struct {
Target string `validate:"required"`
ReflectPort int64 `config:"reflect_port"`
Timeout time.Duration `config:"timeout"` // grpc request timeout
TLS bool `config:"tls"`
DialOptions GrpcDialOptions `config:"dial_options"`
AnswLog AnswLogConfig `config:"answlog"`
Target string `validate:"required"`
ReflectPort int64 `config:"reflect_port"`
ReflectMetadata metadata.MD `config:"reflect_metadata"`
Timeout time.Duration `config:"timeout"` // grpc request timeout
TLS bool `config:"tls"`
DialOptions GrpcDialOptions `config:"dial_options"`
AnswLog AnswLogConfig `config:"answlog"`
}

type GrpcDialOptions struct {
Expand Down Expand Up @@ -57,10 +58,11 @@ func NewGun(conf GunConfig) *Gun {
return &Gun{
templ: NewTextTemplater(),
gun: &grpcgun.Gun{Conf: grpcgun.GunConfig{
Target: conf.Target,
ReflectPort: conf.ReflectPort,
Timeout: conf.Timeout,
TLS: conf.TLS,
Target: conf.Target,
ReflectPort: conf.ReflectPort,
ReflectMetadata: conf.ReflectMetadata,
Timeout: conf.Timeout,
TLS: conf.TLS,
DialOptions: grpcgun.GrpcDialOptions{
Authority: conf.DialOptions.Authority,
Timeout: conf.DialOptions.Timeout,
Expand Down
13 changes: 8 additions & 5 deletions tests/acceptance/config_model.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package acceptance

import "google.golang.org/grpc/metadata"

type PandoraConfigLog struct {
Level string `yaml:"level"`
}
Expand All @@ -11,11 +13,12 @@ type PandoraConfigMonitoring struct {
ExpVar PandoraConfigMonitoringExpVar `yaml:"expvar"`
}
type PandoraConfigGRPCGun struct {
Type string `yaml:"type"`
Target string `yaml:"target"`
TLS bool `yaml:"tls"`
ReflectPort *int64 `yaml:"reflect_port,omitempty"`
SharedClient struct {
Type string `yaml:"type"`
Target string `yaml:"target"`
TLS bool `yaml:"tls"`
ReflectPort *int64 `yaml:"reflect_port,omitempty"`
ReflectMetadata *metadata.MD `yaml:"reflect_metadata,omitempty"`
SharedClient struct {
ClientNumber int `yaml:"client-number,omitempty"`
Enabled bool `yaml:"enabled"`
} `yaml:"shared-client,omitempty"`
Expand Down
107 changes: 107 additions & 0 deletions tests/acceptance/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package acceptance

import (
"context"
"fmt"
"log/slog"
"net"
"os"
"testing"
"time"

"github.com/pkg/errors"
"github.com/spf13/afero"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand All @@ -21,6 +23,7 @@ import (
"github.com/yandex/pandora/lib/testutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"
"gopkg.in/yaml.v2"
)
Expand Down Expand Up @@ -116,6 +119,89 @@ func TestCheckGRPCReflectServer(t *testing.T) {
require.NoError(t, err)
require.Equal(t, int64(8), st.Hello)
})

t.Run("reflect with custom metadata", func(t *testing.T) {
metadataKey := "testKey"
metadataValue := "testValue"
wrongMDValuesLengthError := errors.New("wrong metadata values length")
wrongMDValueError := errors.New("wrong metadata value")
metadataChecker := func(ctx context.Context) (context.Context, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, wrongMDValuesLengthError
}
vals := md.Get(metadataKey)
if len(vals) != 1 {
return nil, wrongMDValuesLengthError
}
if vals[0] != metadataValue {
return nil, wrongMDValueError
}
return ctx, nil
}
grpcServer := grpc.NewServer(
grpc.UnaryInterceptor(MetadataServerInterceptor(metadataChecker)),
grpc.StreamInterceptor(MetadataServerStreamInterceptor(metadataChecker)))
srv := server.NewServer(logger, time.Now().UnixNano())
server.RegisterTargetServiceServer(grpcServer, srv)
grpcAddress := "localhost:18888"
reflection.Register(grpcServer)
l, err := net.Listen("tcp", grpcAddress)
require.NoError(t, err)
go func() {
err = grpcServer.Serve(l)
require.NoError(t, err)
}()

defer func() {
grpcServer.Stop()
}()

cases := []struct {
name string
conf *cli.CliConfig
err error
}{
{
name: "success",
conf: parseFileContentToCliConfig(t, baseFile, func(c *PandoraConfigGRPC) {
md := metadata.New(map[string]string{metadataKey: metadataValue})
c.Pools[0].Gun.ReflectMetadata = &md
}),
},
{
name: "no metadata",
conf: parseFileContentToCliConfig(t, baseFile, nil),
err: wrongMDValuesLengthError,
},
{
name: "wrong metadata value",
conf: parseFileContentToCliConfig(t, baseFile, func(c *PandoraConfigGRPC) {
md := metadata.New(map[string]string{metadataKey: "wrong-value"})
c.Pools[0].Gun.ReflectMetadata = &md
}),
err: wrongMDValueError,
},
}

for _, cc := range cases {
t.Run(cc.name, func(t *testing.T) {
require.Equal(t, 1, len(cc.conf.Engine.Pools))
aggr := &aggregator{}
cc.conf.Engine.Pools[0].Aggregator = aggr

pandora := engine.New(pandoraLogger, pandoraMetrics, cc.conf.Engine)
err = pandora.Run(context.Background())

if cc.err == nil {
require.NoError(t, err)
} else {
require.Error(t, err)
require.Contains(t, err.Error(), cc.err.Error())
}
})
}
})
}

func TestGrpcGunSuite(t *testing.T) {
Expand Down Expand Up @@ -211,3 +297,24 @@ func parseFileContentToCliConfig(t *testing.T, baseFile []byte, overwrite func(c

return decodeConfig(t, mapCfg)
}

func MetadataServerInterceptor(metadataChecker func(ctx context.Context) (context.Context, error)) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
ctx, err = metadataChecker(ctx)
if err != nil {
return nil, fmt.Errorf("metadata checker: %w", err)
}
return handler(ctx, req)
}
}

func MetadataServerStreamInterceptor(metadataChecker func(ctx context.Context) (context.Context, error)) grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
ctx := ss.Context()
ctx, err = metadataChecker(ctx)
if err != nil {
return fmt.Errorf("metadata checker: %w", err)
}
return handler(srv, ss)
}
}

0 comments on commit 32f96c6

Please sign in to comment.