From eda4f5841154371a3d7be8429875f71c1e6a399a Mon Sep 17 00:00:00 2001 From: Rachel Chen Date: Mon, 7 Feb 2022 21:32:57 -0800 Subject: [PATCH] buffer: use libp2p buffer pool implementation as global buffer pool with 8k aligned --- gateway/buffer_pool.go | 19 ++++++------------- go.mod | 1 + multiplexer/pipe.go | 16 +++++----------- 3 files changed, 12 insertions(+), 24 deletions(-) diff --git a/gateway/buffer_pool.go b/gateway/buffer_pool.go index efbee1b..daf823d 100644 --- a/gateway/buffer_pool.go +++ b/gateway/buffer_pool.go @@ -2,34 +2,27 @@ package gateway import ( "net/http/httputil" - "sync" + + pool "github.com/libp2p/go-buffer-pool" ) const ( - bufferSize = 16 * 1024 + bufferSize = 8 * 1024 ) type bufferPool struct { - pool *sync.Pool } func newBufferPool() *bufferPool { - return &bufferPool{ - pool: &sync.Pool{ - New: func() interface{} { - b := make([]byte, bufferSize) - return &b - }, - }, - } + return &bufferPool{} } var _ httputil.BufferPool = &bufferPool{} func (b *bufferPool) Get() []byte { - return *b.pool.Get().(*[]byte) + return pool.Get(bufferSize) } func (b *bufferPool) Put(buf []byte) { - b.pool.Put(&buf) + pool.Put(buf) } diff --git a/go.mod b/go.mod index 8028351..09d68c7 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/hashicorp/go-sockaddr v1.0.2 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/hashicorp/memberlist v0.3.1 + github.com/libp2p/go-buffer-pool v0.0.2 // indirect github.com/libp2p/go-mplex v0.4.0 github.com/libp2p/go-yamux/v3 v3.0.2 github.com/lucas-clemente/quic-go v0.25.0 diff --git a/multiplexer/pipe.go b/multiplexer/pipe.go index 397567e..93c72f8 100644 --- a/multiplexer/pipe.go +++ b/multiplexer/pipe.go @@ -11,21 +11,15 @@ import ( "github.com/zllovesuki/t/profiler" "github.com/zllovesuki/t/util" + pool "github.com/libp2p/go-buffer-pool" "github.com/libp2p/go-yamux/v3" "github.com/pkg/errors" ) const ( - bufferSize = 16 * 1024 + bufferSize = 8 * 1024 ) -var copyBufPool = sync.Pool{ - New: func() interface{} { - b := make([]byte, bufferSize) - return &b - }, -} - func IsTimeout(err error) bool { t := errors.Is(err, context.DeadlineExceeded) || errors.Is(err, yamux.ErrTimeout) if t { @@ -56,14 +50,14 @@ func Connect(ctx context.Context, dst, src net.Conn) <-chan error { func pipe(ctx context.Context, wg *sync.WaitGroup, errChan chan<- error, dir string, dst, src net.Conn) { defer wg.Done() - pBuf := copyBufPool.Get().(*[]byte) - defer copyBufPool.Put(pBuf) + pBuf := pool.Get(bufferSize) + defer pool.Put(pBuf) // io.Copy yeet the EOF from reader and turns it into nil. // therefore, in the previous iteration, one side of pipe // never returns and therefore keeping the pipe from closing. // Here we will forcibly close both ends as soon as io.Copy returns. - n, err := io.CopyBuffer(dst, util.NewCtxReader(ctx, src), *pBuf) + n, err := io.CopyBuffer(dst, util.NewCtxReader(ctx, src), pBuf) log.Printf("CopyBuffer: (%s <-> %s) %s", src.LocalAddr(), dst.RemoteAddr(), err) src.Close()