Skip to content

Commit

Permalink
Use custom codec for vtproto within connect-go (#3310)
Browse files Browse the repository at this point in the history
This implements a custom coded within connect-go.
  • Loading branch information
simonswine committed May 24, 2024
1 parent b79c573 commit 616e81a
Show file tree
Hide file tree
Showing 24 changed files with 205 additions and 42 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ $(BIN)/protoc-gen-go: Makefile go.mod

$(BIN)/protoc-gen-connect-go: Makefile go.mod
@mkdir -p $(@D)
GOBIN=$(abspath $(@D)) $(GO) install connectrpc.com/connect/cmd/protoc-gen-connect-go@v1.14.0
GOBIN=$(abspath $(@D)) $(GO) install connectrpc.com/connect/cmd/protoc-gen-connect-go@v1.16.2

$(BIN)/protoc-gen-connect-go-mux: Makefile go.mod
@mkdir -p $(@D)
Expand Down
2 changes: 1 addition & 1 deletion api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/grafana/pyroscope/api
go 1.21

require (
connectrpc.com/connect v1.14.0
connectrpc.com/connect v1.16.2
github.com/gorilla/mux v1.8.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0
github.com/planetscale/vtprotobuf v0.6.0
Expand Down
4 changes: 2 additions & 2 deletions api/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
connectrpc.com/connect v1.14.0 h1:PDS+J7uoz5Oui2VEOMcfz6Qft7opQM9hPiKvtGC01pA=
connectrpc.com/connect v1.14.0/go.mod h1:uoAq5bmhhn43TwhaKdGKN/bZcGtzPW1v+ngDTn5u+8s=
connectrpc.com/connect v1.16.2 h1:ybd6y+ls7GOlb7Bh5C8+ghA6SvCBajHwxssO2CGFjqE=
connectrpc.com/connect v1.16.2/go.mod h1:n2kgwskMHXC+lVqb18wngEpF95ldBHXjZYJussz5FRc=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
Expand Down
4 changes: 4 additions & 0 deletions cmd/profilecli/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/grafana/pyroscope/api/gen/proto/go/querier/v1/querierv1connect"
"github.com/grafana/pyroscope/api/gen/proto/go/storegateway/v1/storegatewayv1connect"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
connectapi "github.com/grafana/pyroscope/pkg/api/connect"
"github.com/grafana/pyroscope/pkg/operations"
"github.com/k0kubun/pp/v3"
"github.com/klauspost/compress/gzip"
Expand All @@ -37,20 +38,23 @@ func (c *phlareClient) queryClient() querierv1connect.QuerierServiceClient {
return querierv1connect.NewQuerierServiceClient(
c.httpClient(),
c.URL,
connectapi.DefaultClientOptions()...,
)
}

func (c *phlareClient) storeGatewayClient() storegatewayv1connect.StoreGatewayServiceClient {
return storegatewayv1connect.NewStoreGatewayServiceClient(
c.httpClient(),
c.URL,
connectapi.DefaultClientOptions()...,
)
}

func (c *phlareClient) ingesterClient() ingesterv1connect.IngesterServiceClient {
return ingesterv1connect.NewIngesterServiceClient(
c.httpClient(),
c.URL,
connectapi.DefaultClientOptions()...,
)
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/profilecli/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
"github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect"
connectapi "github.com/grafana/pyroscope/pkg/api/connect"
"github.com/grafana/pyroscope/pkg/model"
"github.com/grafana/pyroscope/pkg/pprof"
)
Expand All @@ -19,6 +20,7 @@ func (c *phlareClient) pusherClient() pushv1connect.PusherServiceClient {
return pushv1connect.NewPusherServiceClient(
c.httpClient(),
c.URL,
connectapi.DefaultClientOptions()...,
)
}

Expand Down
6 changes: 3 additions & 3 deletions ebpf/cmd/playground/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,30 @@ import (
"encoding/json"
"flag"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"os"
"strconv"
"strings"
"time"

"connectrpc.com/connect"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/pyroscope/ebpf/cpp/demangle"
ebpfmetrics "github.com/grafana/pyroscope/ebpf/metrics"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
commonconfig "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"

"github.com/go-kit/log/level"
pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
"github.com/grafana/pyroscope/api/gen/proto/go/push/v1/pushv1connect"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
ebpfspy "github.com/grafana/pyroscope/ebpf"
"github.com/grafana/pyroscope/ebpf/pprof"
"github.com/grafana/pyroscope/ebpf/sd"
"github.com/grafana/pyroscope/ebpf/symtab"
commonconfig "github.com/prometheus/common/config"
)

var configFile = flag.String("config", "", "config file path")
Expand Down
2 changes: 1 addition & 1 deletion ebpf/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/grafana/pyroscope/ebpf
go 1.21

require (
connectrpc.com/connect v1.14.0
connectrpc.com/connect v1.16.2
github.com/avvmoto/buf-readerat v0.0.0-20171115124131-a17c8cb89270
github.com/cespare/xxhash/v2 v2.2.0
github.com/cilium/ebpf v0.11.0
Expand Down
4 changes: 2 additions & 2 deletions ebpf/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
connectrpc.com/connect v1.14.0 h1:PDS+J7uoz5Oui2VEOMcfz6Qft7opQM9hPiKvtGC01pA=
connectrpc.com/connect v1.14.0/go.mod h1:uoAq5bmhhn43TwhaKdGKN/bZcGtzPW1v+ngDTn5u+8s=
connectrpc.com/connect v1.16.2 h1:ybd6y+ls7GOlb7Bh5C8+ghA6SvCBajHwxssO2CGFjqE=
connectrpc.com/connect v1.16.2/go.mod h1:n2kgwskMHXC+lVqb18wngEpF95ldBHXjZYJussz5FRc=
github.com/avvmoto/buf-readerat v0.0.0-20171115124131-a17c8cb89270 h1:JIxGEMs4E5Zb6R7z2C5IgecI0mkqS97WAEF31wUbYTM=
github.com/avvmoto/buf-readerat v0.0.0-20171115124131-a17c8cb89270/go.mod h1:2XtVRGCw/HthOLxU0Qw6o6jSJrcEoOb2OCCl8gQYvGw=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/grafana/pyroscope
go 1.21

require (
connectrpc.com/connect v1.14.0
connectrpc.com/connect v1.16.2
connectrpc.com/grpchealth v1.3.0
github.com/PuerkitoBio/goquery v1.8.1
github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.36.0 h1:P0mOkAcaJxhCTvAkMhxMfrTKiNcub4YmmPBtlhAyTr8=
cloud.google.com/go/storage v1.36.0/go.mod h1:M6M/3V/D3KpzMTJyPOR/HU6n2Si5QdaXYEsng2xgOs8=
connectrpc.com/connect v1.14.0 h1:PDS+J7uoz5Oui2VEOMcfz6Qft7opQM9hPiKvtGC01pA=
connectrpc.com/connect v1.14.0/go.mod h1:uoAq5bmhhn43TwhaKdGKN/bZcGtzPW1v+ngDTn5u+8s=
connectrpc.com/connect v1.16.2 h1:ybd6y+ls7GOlb7Bh5C8+ghA6SvCBajHwxssO2CGFjqE=
connectrpc.com/connect v1.16.2/go.mod h1:n2kgwskMHXC+lVqb18wngEpF95ldBHXjZYJussz5FRc=
connectrpc.com/grpchealth v1.3.0 h1:FA3OIwAvuMokQIXQrY5LbIy8IenftksTP/lG4PbYN+E=
connectrpc.com/grpchealth v1.3.0/go.mod h1:3vpqmX25/ir0gVgW6RdnCPPZRcR6HvqtXX5RNPmDXHM=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
Expand Down
6 changes: 4 additions & 2 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,8 @@ cloud.google.com/go/workflows v1.12.3 h1:qocsqETmLAl34mSa01hKZjcqAvt699gaoFbooGG
cloud.google.com/go/workflows v1.12.3/go.mod h1:fmOUeeqEwPzIU81foMjTRQIdwQHADi/vEr1cx9R1m5g=
cloud.google.com/go/workflows v1.12.4 h1:uHNmUiatTbPQ4H1pabwfzpfEYD4BBnqDHqMm2IesOh4=
cloud.google.com/go/workflows v1.12.4/go.mod h1:yQ7HUqOkdJK4duVtMeBCAOPiN1ZF1E9pAMX51vpwB/w=
connectrpc.com/connect v1.16.2 h1:ybd6y+ls7GOlb7Bh5C8+ghA6SvCBajHwxssO2CGFjqE=
connectrpc.com/connect v1.16.2/go.mod h1:n2kgwskMHXC+lVqb18wngEpF95ldBHXjZYJussz5FRc=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9 h1:VpgP7xuJadIUuKccphEpTJnWhS2jkQyMt6Y7pJCD7fY=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.1/go.mod h1:RKUqNu35KJYcVG/fqTRqmuXJZYNhYkBrnC/hX7yGbTA=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.1/go.mod h1:s4kgfzA0covAXNicZHDMN58jExvcng2mC/DepXiF1EI=
Expand Down Expand Up @@ -1122,6 +1124,7 @@ golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20201109201403-9fd604954f58/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
Expand Down Expand Up @@ -1317,8 +1320,7 @@ google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/errgo.v2 v2.1.0 h1:0vLT13EuvQ0hNvakwLuFZ/jYrLp5F3kcWHXdRggjCE8=
gopkg.in/gcfg.v1 v1.2.3 h1:m8OOJ4ccYHnx2f4gQwpno8nAX5OGOh7RLaaz0pj3Ogs=
gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o=
Expand Down
35 changes: 24 additions & 11 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/grafana/pyroscope/api/gen/proto/go/version/v1/versionv1connect"
"github.com/grafana/pyroscope/api/openapiv2"
"github.com/grafana/pyroscope/pkg/adhocprofiles"
connectapi "github.com/grafana/pyroscope/pkg/api/connect"
"github.com/grafana/pyroscope/pkg/compactor"
"github.com/grafana/pyroscope/pkg/distributor"
"github.com/grafana/pyroscope/pkg/frontend"
Expand Down Expand Up @@ -198,7 +199,7 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc, userL
}

func (a *API) RegisterTenantSettings(ts *settings.TenantSettings) {
settingsv1connect.RegisterSettingsServiceHandler(a.server.HTTP, ts, a.grpcAuthMiddleware, a.recoveryMiddleware)
settingsv1connect.RegisterSettingsServiceHandler(a.server.HTTP, ts, a.connectOptionsAuthRecovery()...)
}

// RegisterOverridesExporter registers the endpoints associated with the overrides exporter.
Expand All @@ -214,7 +215,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor) {
pyroscopeHandler := pyroscope.NewPyroscopeIngestHandler(d, a.logger)
a.RegisterRoute("/ingest", pyroscopeHandler, true, true, "POST")
a.RegisterRoute("/pyroscope/ingest", pyroscopeHandler, true, true, "POST")
pushv1connect.RegisterPusherServiceHandler(a.server.HTTP, d, a.grpcAuthMiddleware, a.recoveryMiddleware)
pushv1connect.RegisterPusherServiceHandler(a.server.HTTP, d, a.connectOptionsAuthRecovery()...)
a.RegisterRoute("/distributor/ring", d, false, true, "GET", "POST")
a.indexPage.AddLinks(defaultWeight, "Distributor", []IndexPageLink{
{Desc: "Ring status", Path: "/distributor/ring"},
Expand Down Expand Up @@ -244,8 +245,8 @@ type QuerierSvc interface {

// RegisterQuerier registers the endpoints associated with the querier.
func (a *API) RegisterQuerier(svc QuerierSvc) {
querierv1connect.RegisterQuerierServiceHandler(a.server.HTTP, svc, a.grpcAuthMiddleware, a.grpcLogMiddleware, a.recoveryMiddleware)
vcsv1connect.RegisterVCSServiceHandler(a.server.HTTP, svc, a.grpcAuthMiddleware, a.grpcLogMiddleware, a.recoveryMiddleware)
querierv1connect.RegisterQuerierServiceHandler(a.server.HTTP, svc, a.connectOptionsAuthLogRecovery()...)
vcsv1connect.RegisterVCSServiceHandler(a.server.HTTP, svc, a.connectOptionsAuthLogRecovery()...)
}

func (a *API) RegisterPyroscopeHandlers(client querierv1connect.QuerierServiceClient) {
Expand All @@ -257,11 +258,11 @@ func (a *API) RegisterPyroscopeHandlers(client querierv1connect.QuerierServiceCl

// RegisterIngester registers the endpoints associated with the ingester.
func (a *API) RegisterIngester(svc *ingester.Ingester) {
ingesterv1connect.RegisterIngesterServiceHandler(a.server.HTTP, svc, a.grpcAuthMiddleware, a.recoveryMiddleware)
ingesterv1connect.RegisterIngesterServiceHandler(a.server.HTTP, svc, a.connectOptionsAuthRecovery()...)
}

func (a *API) RegisterStoreGateway(svc *storegateway.StoreGateway) {
storegatewayv1connect.RegisterStoreGatewayServiceHandler(a.server.HTTP, svc, a.grpcAuthMiddleware, a.recoveryMiddleware)
storegatewayv1connect.RegisterStoreGatewayServiceHandler(a.server.HTTP, svc, a.connectOptionsAuthRecovery()...)

a.indexPage.AddLinks(defaultWeight, "Store-gateway", []IndexPageLink{
{Desc: "Ring status", Path: "/store-gateway/ring"},
Expand All @@ -282,18 +283,18 @@ func (a *API) RegisterCompactor(c *compactor.MultitenantCompactor) {

// RegisterQueryFrontend registers the endpoints associated with the query frontend.
func (a *API) RegisterQueryFrontend(frontendSvc *frontend.Frontend) {
frontendpbconnect.RegisterFrontendForQuerierHandler(a.server.HTTP, frontendSvc, a.grpcAuthMiddleware, a.recoveryMiddleware)
frontendpbconnect.RegisterFrontendForQuerierHandler(a.server.HTTP, frontendSvc, a.connectOptionsAuthRecovery()...)
}

// RegisterVersion registers the endpoints associated with the versions service.
func (a *API) RegisterVersion(svc versionv1connect.VersionHandler) {
versionv1connect.RegisterVersionHandler(a.server.HTTP, svc, a.recoveryMiddleware)
versionv1connect.RegisterVersionHandler(a.server.HTTP, svc, a.connectOptionsRecovery()...)
}

// RegisterQueryScheduler registers the endpoints associated with the query scheduler.
func (a *API) RegisterQueryScheduler(s *scheduler.Scheduler) {
schedulerpbconnect.RegisterSchedulerForFrontendHandler(a.server.HTTP, s, a.recoveryMiddleware)
schedulerpbconnect.RegisterSchedulerForQuerierHandler(a.server.HTTP, s, a.recoveryMiddleware)
schedulerpbconnect.RegisterSchedulerForFrontendHandler(a.server.HTTP, s, a.connectOptionsRecovery()...)
schedulerpbconnect.RegisterSchedulerForQuerierHandler(a.server.HTTP, s, a.connectOptionsRecovery()...)
}

// RegisterFlags registers api-related flags.
Expand All @@ -312,5 +313,17 @@ func (a *API) RegisterAdmin(ad *operations.Admin) {
}

func (a *API) RegisterAdHocProfiles(ahp *adhocprofiles.AdHocProfiles) {
adhocprofilesv1connect.RegisterAdHocProfileServiceHandler(a.server.HTTP, ahp, a.grpcAuthMiddleware, a.recoveryMiddleware)
adhocprofilesv1connect.RegisterAdHocProfileServiceHandler(a.server.HTTP, ahp, a.connectOptionsAuthRecovery()...)
}

func (a *API) connectOptionsRecovery() []connect.HandlerOption {
return append(connectapi.DefaultHandlerOptions(), a.recoveryMiddleware)
}

func (a *API) connectOptionsAuthRecovery() []connect.HandlerOption {
return append(connectapi.DefaultHandlerOptions(), []connect.HandlerOption{a.grpcAuthMiddleware, a.recoveryMiddleware}...)
}

func (a *API) connectOptionsAuthLogRecovery() []connect.HandlerOption {
return append(connectapi.DefaultHandlerOptions(), []connect.HandlerOption{a.grpcAuthMiddleware, a.grpcLogMiddleware, a.recoveryMiddleware}...)
}
99 changes: 99 additions & 0 deletions pkg/api/connect/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package connectapi

import (
"fmt"

"connectrpc.com/connect"
"google.golang.org/protobuf/proto"
)

// Name is the name registered for the proto compressor.
const Name = "proto"

var ProtoCodec connect.Codec = vtprotoCodec{}

type vtprotoCodec struct{}

// growcap scales up the capacity of a slice.
//
// Given a slice with a current capacity of oldcap and a desired
// capacity of wantcap, growcap returns a new capacity >= wantcap.
//
// The algorithm is mostly identical to the one used by append as of Go 1.14.
func growcap(oldcap, wantcap int) (newcap int) {
if wantcap > oldcap*2 {
newcap = wantcap
} else if oldcap < 1024 {
// The Go 1.14 runtime takes this case when len(s) < 1024,
// not when cap(s) < 1024. The difference doesn't seem
// significant here.
newcap = oldcap * 2
} else {
newcap = oldcap
for 0 < newcap && newcap < wantcap {
newcap += newcap / 4
}
if newcap <= 0 {
newcap = wantcap
}
}
return newcap
}

type vtprotoMessage interface {
MarshalVT() ([]byte, error)
MarshalToSizedBufferVT([]byte) (int, error)
SizeVT() (n int)
UnmarshalVT([]byte) error
}

func (vtprotoCodec) Marshal(v any) ([]byte, error) {
switch v := v.(type) {
case vtprotoMessage:
return v.MarshalVT()
case proto.Message:
return proto.Marshal(v)
default:
return nil, fmt.Errorf("failed to marshal, message is %T, must satisfy the vtprotoMessage interface or want proto.Message", v)
}
}

func (vtprotoCodec) MarshalAppend(data []byte, v any) ([]byte, error) {
switch v := v.(type) {
case vtprotoMessage:
if v == nil {
return data, nil
}

n := v.SizeVT()
if cap(data) < len(data)+n {
ndata := make([]byte, len(data), growcap(cap(data), len(data)+n))
copy(ndata, data)
data = ndata
}
_, err := v.MarshalToSizedBufferVT(data[len(data) : len(data)+n])
if err != nil {
return nil, err
}
return data[:len(data)+n], nil
case proto.Message:
return proto.MarshalOptions{}.MarshalAppend(data, v)
default:
return nil, fmt.Errorf("failed to marshalAppend, message is %T, must satisfy the vtprotoMessage interface or want proto.Message", v)
}
}

func (vtprotoCodec) Unmarshal(data []byte, v any) error {
switch v := v.(type) {
case vtprotoMessage:
return v.UnmarshalVT(data)
case proto.Message:
return proto.Unmarshal(data, v)
default:
return fmt.Errorf("failed to unmarshal, message is %T, must satisfy the vtprotoMessage interface or want proto.Message", v)
}
}

func (vtprotoCodec) Name() string {
return Name
}
17 changes: 17 additions & 0 deletions pkg/api/connect/connect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package connectapi

import (
"connectrpc.com/connect"
)

func DefaultClientOptions() []connect.ClientOption {
return []connect.ClientOption{
connect.WithCodec(ProtoCodec),
}
}

func DefaultHandlerOptions() []connect.HandlerOption {
return []connect.HandlerOption{
connect.WithCodec(ProtoCodec),
}
}
2 changes: 1 addition & 1 deletion pkg/clientpool/store_gateway_client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/grafana/pyroscope/pkg/util"
)

func NeStoreGatewayPool(ring ring.ReadRing, factory ring_client.PoolFactory, clientsMetric prometheus.Gauge, logger log.Logger, options ...connect.ClientOption) *ring_client.Pool {
func NewStoreGatewayPool(ring ring.ReadRing, factory ring_client.PoolFactory, clientsMetric prometheus.Gauge, logger log.Logger, options ...connect.ClientOption) *ring_client.Pool {
if factory == nil {
factory = newStoreGatewayPoolFactory(options...)
}
Expand Down
Loading

0 comments on commit 616e81a

Please sign in to comment.