Skip to content

Commit

Permalink
buffer: use libp2p buffer pool implementation as global buffer pool with
Browse files Browse the repository at this point in the history
8k aligned
  • Loading branch information
zllovesuki committed Feb 8, 2022
1 parent 3d8c615 commit eda4f58
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 24 deletions.
19 changes: 6 additions & 13 deletions gateway/buffer_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,27 @@ package gateway

import (
"net/http/httputil"
"sync"

pool "github.com/libp2p/go-buffer-pool"
)

const (
bufferSize = 16 * 1024
bufferSize = 8 * 1024
)

type bufferPool struct {
pool *sync.Pool
}

func newBufferPool() *bufferPool {
return &bufferPool{
pool: &sync.Pool{
New: func() interface{} {
b := make([]byte, bufferSize)
return &b
},
},
}
return &bufferPool{}
}

var _ httputil.BufferPool = &bufferPool{}

func (b *bufferPool) Get() []byte {
return *b.pool.Get().(*[]byte)
return pool.Get(bufferSize)
}

func (b *bufferPool) Put(buf []byte) {
b.pool.Put(&buf)
pool.Put(buf)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/memberlist v0.3.1
github.com/libp2p/go-buffer-pool v0.0.2 // indirect
github.com/libp2p/go-mplex v0.4.0
github.com/libp2p/go-yamux/v3 v3.0.2
github.com/lucas-clemente/quic-go v0.25.0
Expand Down
16 changes: 5 additions & 11 deletions multiplexer/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,15 @@ import (
"github.com/zllovesuki/t/profiler"
"github.com/zllovesuki/t/util"

pool "github.com/libp2p/go-buffer-pool"
"github.com/libp2p/go-yamux/v3"
"github.com/pkg/errors"
)

const (
bufferSize = 16 * 1024
bufferSize = 8 * 1024
)

var copyBufPool = sync.Pool{
New: func() interface{} {
b := make([]byte, bufferSize)
return &b
},
}

func IsTimeout(err error) bool {
t := errors.Is(err, context.DeadlineExceeded) || errors.Is(err, yamux.ErrTimeout)
if t {
Expand Down Expand Up @@ -56,14 +50,14 @@ func Connect(ctx context.Context, dst, src net.Conn) <-chan error {

func pipe(ctx context.Context, wg *sync.WaitGroup, errChan chan<- error, dir string, dst, src net.Conn) {
defer wg.Done()
pBuf := copyBufPool.Get().(*[]byte)
defer copyBufPool.Put(pBuf)
pBuf := pool.Get(bufferSize)
defer pool.Put(pBuf)

// io.Copy yeet the EOF from reader and turns it into nil.
// therefore, in the previous iteration, one side of pipe
// never returns and therefore keeping the pipe from closing.
// Here we will forcibly close both ends as soon as io.Copy returns.
n, err := io.CopyBuffer(dst, util.NewCtxReader(ctx, src), *pBuf)
n, err := io.CopyBuffer(dst, util.NewCtxReader(ctx, src), pBuf)
log.Printf("CopyBuffer: (%s <-> %s) %s", src.LocalAddr(), dst.RemoteAddr(), err)

src.Close()
Expand Down

0 comments on commit eda4f58

Please sign in to comment.