Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update implementation to use non-deprecated api calls #36

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
28 changes: 21 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,48 @@ is a generic gRPC reverse proxy handler.
## Proxy Handler

The package [`proxy`](proxy/) contains a generic gRPC reverse proxy handler that allows a gRPC server to
not know about registered handlers or their data types. Please consult the docs, here's an exaple usage.
not know about registered handlers or their data types. Please consult the docs, here's an example usage.

Defining a `StreamDirector` that decides where (if at all) to send the request
```go
director = func(ctx context.Context, fullMethodName string) (*grpc.ClientConn, error) {
// Make sure we never forward internal services.
if strings.HasPrefix(fullMethodName, "/com.example.internal.") {
return nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
return nil, status.Errorf(codes.Unimplemented, "Unknown method")
}
md, ok := metadata.FromContext(ctx)
if ok {
// Decide on which backend to dial
if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" {
// Make sure we use DialContext so the dialing can be cancelled/time out together with the context.
return grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec()))
conn, err := grpc.DialContext(
ctx,
"api-service.staging.svc.local",
grpc.WithDefaultCallOptions(grpc.CallContentSubtype((&codec.Proxy{}).Name())),
)
return outCtx, conn, err
} else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" {
return grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithCodec(proxy.Codec()))
conn, err := grpc.DialContext(
ctx,
"api-service.prod.svc.local",
grpc.WithDefaultCallOptions(grpc.CallContentSubtype((&codec.Proxy{}).Name())),
)
return outCtx, conn, err
}
}
return nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
return nil, status.Errorf(codes.Unimplemented, "Unknown method")
}
```
Then you need to register it with a `grpc.Server`. The server may have other handlers that will be served
Then you need to register it with a `grpc.Server`. The proxy codec is automatically registered by importing the codec package. The server may have other handlers that will be served
locally:

```go

import codec "github.com/mwitkow/grpc-proxy/proxy/codec"

...

server := grpc.NewServer(
grpc.CustomCodec(proxy.Codec()),
grpc.UnknownServiceHandler(proxy.TransparentHandler(director)))
pb_test.RegisterTestServiceServer(server, &testImpl{})
```
Expand Down
70 changes: 0 additions & 70 deletions proxy/codec.go

This file was deleted.

89 changes: 89 additions & 0 deletions proxy/codec/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package codec

import (
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/encoding"
)

// Name is the name by which the proxy codec is registered in the encoding codec registry
// We have to say that we are the "proto" codec otherwise marshaling will fail!
const Name = "proto"

func init() {
Register()
}

// Register manually registers the codec
func Register() {
encoding.RegisterCodec(codec())
}

// codec returns a proxying grpc.codec with the default protobuf codec as parent.
//
// See CodecWithParent.
func codec() encoding.Codec {
// since we have registered the default codec by importing it,
// we can fetch it from the registry and use it as our parent
// and overwrite the existing codec in the registry
return codecWithParent(&protoCodec{})
}

// CodecWithParent returns a proxying grpc.Codec with a user provided codec as parent.
//
// This codec is *crucial* to the functioning of the proxy. It allows the proxy server to be oblivious
// to the schema of the forwarded messages. It basically treats a gRPC message frame as raw bytes.
// However, if the server handler, or the client caller are not proxy-internal functions it will fall back
// to trying to decode the message using a fallback codec.
func codecWithParent(fallback encoding.Codec) encoding.Codec {
return &Proxy{parentCodec: fallback}
}

// Proxy satisfies the encoding.Codec interface
type Proxy struct {
parentCodec encoding.Codec
}

// Frame holds the proxy transported data
type Frame struct {
payload []byte
}

// Marshal implents the encoding.Codec interface method
func (p *Proxy) Marshal(v interface{}) ([]byte, error) {
out, ok := v.(*Frame)
if !ok {
return p.parentCodec.Marshal(v)
}
return out.payload, nil

}

// Unmarshal implents the encoding.Codec interface method
func (p *Proxy) Unmarshal(data []byte, v interface{}) error {
dst, ok := v.(*Frame)
if !ok {
return p.parentCodec.Unmarshal(data, v)
}
dst.payload = data
return nil
}

// Name implents the encoding.Codec interface method
func (*Proxy) Name() string {
return Name
}

// protoCodec is a Codec implementation with protobuf. It is the default rawCodec for gRPC.
type protoCodec struct{}

func (*protoCodec) Marshal(v interface{}) ([]byte, error) {
return proto.Marshal(v.(proto.Message))
}

func (*protoCodec) Unmarshal(data []byte, v interface{}) error {
return proto.Unmarshal(data, v.(proto.Message))
}

func (*protoCodec) Name() string {
return "proxy>proto"
}
52 changes: 52 additions & 0 deletions proxy/codec/codec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package codec_test

import (
"testing"

_ "github.com/gogo/protobuf/proto"
codec "github.com/mwitkow/grpc-proxy/proxy/codec"
pb "github.com/mwitkow/grpc-proxy/testservice"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/encoding"
)

func TestCodec_ReadYourWrites(t *testing.T) {
framePtr := &codec.Frame{}
data := []byte{0xDE, 0xAD, 0xBE, 0xEF}
codec.Register()
codec := encoding.GetCodec((&codec.Proxy{}).Name())
require.NotNil(t, codec, "codec must be registered")
require.NoError(t, codec.Unmarshal(data, framePtr), "unmarshalling must go ok")
out, err := codec.Marshal(framePtr)
require.NoError(t, err, "no marshal error")
require.Equal(t, data, out, "output and data must be the same")

// reuse
require.NoError(t, codec.Unmarshal([]byte{0x55}, framePtr), "unmarshalling must go ok")
out, err = codec.Marshal(framePtr)
require.NoError(t, err, "no marshal error")
require.Equal(t, []byte{0x55}, out, "output and data must be the same")

}

func TestProtoCodec_ReadYourWrites(t *testing.T) {
p1 := &pb.PingRequest{
Value: "test-ping",
}
proxyCd := encoding.GetCodec((&codec.Proxy{}).Name())

require.NotNil(t, proxyCd, "proxy codec must not be nil")

out1p1, err := proxyCd.Marshal(p1)
require.NoError(t, err, "marshalling must go ok")
out2p1, err := proxyCd.Marshal(p1)
require.NoError(t, err, "marshalling must go ok")

p2 := &pb.PingRequest{}
err = proxyCd.Unmarshal(out1p1, p2)
require.NoError(t, err, "unmarshalling must go ok")
err = proxyCd.Unmarshal(out2p1, p2)
require.NoError(t, err, "unmarshalling must go ok")

require.Equal(t, *p1, *p2)
}
24 changes: 0 additions & 24 deletions proxy/codec_test.go

This file was deleted.

13 changes: 7 additions & 6 deletions proxy/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"strings"

"github.com/mwitkow/grpc-proxy/proxy"
codec "github.com/mwitkow/grpc-proxy/proxy/codec"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

var (
Expand All @@ -19,7 +21,7 @@ var (

func ExampleRegisterService() {
// A gRPC server with the proxying codec enabled.
server := grpc.NewServer(grpc.CustomCodec(proxy.Codec()))
server := grpc.NewServer()
// Register a TestService with 4 of its methods explicitly.
proxy.RegisterService(server, director,
"mwitkow.testproto.TestService",
Expand All @@ -28,7 +30,6 @@ func ExampleRegisterService() {

func ExampleTransparentHandler() {
grpc.NewServer(
grpc.CustomCodec(proxy.Codec()),
grpc.UnknownServiceHandler(proxy.TransparentHandler(director)))
}

Expand All @@ -38,7 +39,7 @@ func ExampleStreamDirector() {
director = func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
// Make sure we never forward internal services.
if strings.HasPrefix(fullMethodName, "/com.example.internal.") {
return nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
return nil, nil, status.Errorf(codes.Unimplemented, "Unknown method")
}
md, ok := metadata.FromIncomingContext(ctx)
// Copy the inbound metadata explicitly.
Expand All @@ -48,13 +49,13 @@ func ExampleStreamDirector() {
// Decide on which backend to dial
if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" {
// Make sure we use DialContext so the dialing can be cancelled/time out together with the context.
conn, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec()))
conn, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithDefaultCallOptions(grpc.CallContentSubtype((&codec.Proxy{}).Name())))
return outCtx, conn, err
} else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" {
conn, err := grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithCodec(proxy.Codec()))
conn, err := grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithDefaultCallOptions(grpc.CallContentSubtype((&codec.Proxy{}).Name())))
return outCtx, conn, err
}
}
return nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
return nil, nil, status.Errorf(codes.Unimplemented, "Unknown method")
}
}
Loading