From 371dddbc4bc20ee051adcabf24036aa195ab335c Mon Sep 17 00:00:00 2001 From: John Beisley Date: Wed, 13 Nov 2024 18:45:03 +0000 Subject: [PATCH] Optional specification of stream content type (#4926) * feat: `runtime.Marshaler` can support distinct content type for streamed responses * test: Distinct stream content type on `runtime.Marshaler`. * doc: document the Delimited and StreamContentType interfaces --- docs/docs/mapping/custom_marshalers.md | 72 +++++++++++++++++++ docs/docs/mapping/customizing_your_gateway.md | 5 +- runtime/handler.go | 8 ++- runtime/handler_test.go | 61 ++++++++++++---- runtime/marshaler.go | 8 +++ 5 files changed, 137 insertions(+), 17 deletions(-) create mode 100644 docs/docs/mapping/custom_marshalers.md diff --git a/docs/docs/mapping/custom_marshalers.md b/docs/docs/mapping/custom_marshalers.md new file mode 100644 index 00000000000..4a469638a9e --- /dev/null +++ b/docs/docs/mapping/custom_marshalers.md @@ -0,0 +1,72 @@ +--- +layout: default +title: Custom marshalers +nav_order: 6 +parent: Mapping +--- + +# Custom marshalers + +[`Marshaler`](https://pkg.go.dev/github.com/grpc-ecosystem/grpc-gateway/runtime?tab=doc#Marshaler) +implementations can implement optional additional methods to customize their +behaviour beyond the methods required by the core interface. + +## Stream delimiters + +By default, a streamed response delimits each response body with a single +newline (`"\n"`). You can change this delimiter by having your marshaler +implement +[`Delimited`](https://pkg.go.dev/github.com/grpc-ecosystem/grpc-gateway/runtime#Delimited). + +For example, to separate each entry with a pipe (`"|"`) instead: + +```go +type YourMarshaler struct { + // ... +} + +// ... + +func (*YourMarshaler) Delimited() []byte { + return []byte("|") +} +``` + +## Stream content type + +By default, a streamed response emits a `Content-Type` header that is the same +for a unary response, from the `ContentType()` method of the +[`Marshaler`](https://pkg.go.dev/github.com/grpc-ecosystem/grpc-gateway/runtime?tab=doc#Marshaler) +interface. + +If you require the server to declare a distinct content type for stream +responses versus unary responses, the marshaler must implement +[`StreamContentType`](https://pkg.go.dev/github.com/grpc-ecosystem/grpc-gateway/runtime#StreamContentType). +This provides the MIME type when specifically responding to a streaming +response. + +For example, by default the +[`JSONPb`](https://pkg.go.dev/github.com/grpc-ecosystem/grpc-gateway/runtime#JSONPb) +marshaler results in `application/json` for its `Content-Type` response header, +irrespective of unary versus streaming. This can be changed for streaming +endpoints by wrapping the marshaler with a custom marshaler that implements +[`StreamContentType`](https://pkg.go.dev/github.com/grpc-ecosystem/grpc-gateway/runtime#StreamContentType) +to return the [NDJSON](https://github.com/ndjson/ndjson-spec) MIME type for +streaming response endpoints: + +```go +type CustomJSONPb struct { + runtime.JSONPb +} + +func (*CustomJSONPb) Delimited() []byte { + // Strictly speaking this is already the default delimiter for JSONPb, but + // providing it here for completeness with an NDJSON marshaler all in one + // place. + return []byte("\n") +} + +func (*CustomJSONPb) StreamContentType(interface{}) string { + return "application/x-ndjson" +} +``` diff --git a/docs/docs/mapping/customizing_your_gateway.md b/docs/docs/mapping/customizing_your_gateway.md index b5c0f08def0..97f0b77c3a1 100644 --- a/docs/docs/mapping/customizing_your_gateway.md +++ b/docs/docs/mapping/customizing_your_gateway.md @@ -13,7 +13,10 @@ parent: Mapping You might want to serialize request/response messages in MessagePack instead of JSON, for example: -1. Write a custom implementation of [`Marshaler`](https://pkg.go.dev/github.com/grpc-ecosystem/grpc-gateway/runtime?tab=doc#Marshaler). +1. Write a custom implementation of + [`Marshaler`](https://pkg.go.dev/github.com/grpc-ecosystem/grpc-gateway/runtime?tab=doc#Marshaler). + See [Custom marshalers](custom_marshalers.md) for some additional + customization options. 2. Register your marshaler with [`WithMarshalerOption`](https://pkg.go.dev/github.com/grpc-ecosystem/grpc-gateway/runtime?tab=doc#WithMarshalerOption). diff --git a/runtime/handler.go b/runtime/handler.go index 036c3ff7e4e..0fa90765661 100644 --- a/runtime/handler.go +++ b/runtime/handler.go @@ -64,7 +64,13 @@ func ForwardResponseStream(ctx context.Context, mux *ServeMux, marshaler Marshal } if !wroteHeader { - w.Header().Set("Content-Type", marshaler.ContentType(respRw)) + var contentType string + if sct, ok := marshaler.(StreamContentType); ok { + contentType = sct.StreamContentType(respRw) + } else { + contentType = marshaler.ContentType(respRw) + } + w.Header().Set("Content-Type", contentType) } var buf []byte diff --git a/runtime/handler_test.go b/runtime/handler_test.go index 4ef78e7f0ef..34d70539f6e 100644 --- a/runtime/handler_test.go +++ b/runtime/handler_test.go @@ -178,36 +178,68 @@ func (c *CustomMarshaler) NewDecoder(r io.Reader) runtime.Decoder { return c func (c *CustomMarshaler) NewEncoder(w io.Writer) runtime.Encoder { return c.m.NewEncoder(w) } func (c *CustomMarshaler) ContentType(v interface{}) string { return "Custom-Content-Type" } +// marshalerStreamContentType implements Marshaler, but with the addition of a custom StreamContentType. +type marshalerStreamContentType struct { + runtime.Marshaler + CustomStreamContentType string +} + +func (m marshalerStreamContentType) StreamContentType(interface{}) string { + return m.CustomStreamContentType +} + func TestForwardResponseStreamCustomMarshaler(t *testing.T) { type msg struct { pb proto.Message err error } + marshaler := &CustomMarshaler{&runtime.JSONPb{}} + tests := []struct { - name string - msgs []msg - statusCode int + name string + marshaler runtime.Marshaler + msgs []msg + statusCode int + wantContentType string }{{ - name: "encoding", + name: "encoding", + marshaler: marshaler, msgs: []msg{ {&pb.SimpleMessage{Id: "One"}, nil}, {&pb.SimpleMessage{Id: "Two"}, nil}, }, - statusCode: http.StatusOK, + statusCode: http.StatusOK, + wantContentType: "Custom-Content-Type", }, { name: "empty", + marshaler: marshaler, statusCode: http.StatusOK, }, { - name: "error", - msgs: []msg{{nil, status.Errorf(codes.OutOfRange, "400")}}, - statusCode: http.StatusBadRequest, + name: "error", + marshaler: marshaler, + msgs: []msg{{nil, status.Errorf(codes.OutOfRange, "400")}}, + statusCode: http.StatusBadRequest, + wantContentType: "Custom-Content-Type", }, { - name: "stream_error", + name: "stream_error", + marshaler: marshaler, msgs: []msg{ {&pb.SimpleMessage{Id: "One"}, nil}, {nil, status.Errorf(codes.OutOfRange, "400")}, }, - statusCode: http.StatusOK, + statusCode: http.StatusOK, + wantContentType: "Custom-Content-Type", + }, { + name: "stream_content_type", + marshaler: marshalerStreamContentType{ + Marshaler: marshaler, + CustomStreamContentType: "Stream-Content-Type", + }, + msgs: []msg{ + {&pb.SimpleMessage{Id: "One"}, nil}, + }, + statusCode: http.StatusOK, + wantContentType: "Stream-Content-Type", }} newTestRecv := func(t *testing.T, msgs []msg) func() (proto.Message, error) { @@ -224,14 +256,13 @@ func TestForwardResponseStreamCustomMarshaler(t *testing.T) { } } ctx := runtime.NewServerMetadataContext(context.Background(), runtime.ServerMetadata{}) - marshaler := &CustomMarshaler{&runtime.JSONPb{}} for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { recv := newTestRecv(t, tt.msgs) req := httptest.NewRequest("GET", "http://example.com/foo", nil) resp := httptest.NewRecorder() - runtime.ForwardResponseStream(ctx, runtime.NewServeMux(), marshaler, resp, req, recv) + runtime.ForwardResponseStream(ctx, runtime.NewServeMux(), tt.marshaler, resp, req, recv) w := resp.Result() if w.StatusCode != tt.statusCode { @@ -245,8 +276,8 @@ func TestForwardResponseStreamCustomMarshaler(t *testing.T) { t.Errorf("Failed to read response body with %v", err) } w.Body.Close() - if len(body) > 0 && w.Header.Get("Content-Type") != "Custom-Content-Type" { - t.Errorf("Content-Type %s want Custom-Content-Type", w.Header.Get("Content-Type")) + if w.Header.Get("Content-Type") != tt.wantContentType { + t.Errorf("Content-Type %q want %q", w.Header.Get("Content-Type"), tt.wantContentType) } var want []byte @@ -254,7 +285,7 @@ func TestForwardResponseStreamCustomMarshaler(t *testing.T) { if msg.err != nil { t.Skip("checking error encodings") } - b, err := marshaler.Marshal(map[string]proto.Message{"result": msg.pb}) + b, err := tt.marshaler.Marshal(map[string]proto.Message{"result": msg.pb}) if err != nil { t.Errorf("marshaler.Marshal() failed %v", err) } diff --git a/runtime/marshaler.go b/runtime/marshaler.go index 2c0d25ff493..b1dfc37af9b 100644 --- a/runtime/marshaler.go +++ b/runtime/marshaler.go @@ -48,3 +48,11 @@ type Delimited interface { // Delimiter returns the record separator for the stream. Delimiter() []byte } + +// StreamContentType defines the streaming content type. +type StreamContentType interface { + // StreamContentType returns the content type for a stream. This shares the + // same behaviour as for `Marshaler.ContentType`, but is called, if present, + // in the case of a streamed response. + StreamContentType(v interface{}) string +}