Skip to content

Commit

Permalink
opt: Optimize RESP parse mem usage, use mmap to read aof file
Browse files Browse the repository at this point in the history
  • Loading branch information
satoshi-099 committed Jun 4, 2024
1 parent dc431e3 commit 40df7d3
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 47 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ test-cover:
go tool cover -html=coverage.txt -o coverage.html

pprof:
go tool pprof -http=:18081 "http://192.168.1.6:6060/debug/pprof/profile?seconds=30"
go tool pprof -http=:18081 "http://172.17.21.2:6060/debug/pprof/profile?seconds=30"

heap:
go tool pprof http://localhost:6060/debug/pprof/heap
31 changes: 21 additions & 10 deletions aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,33 @@ import (
"os"
"sync"
"time"

"github.com/tidwall/mmap"
)

const (
KB = 1024
MB = 1024 * KB
)

// Aof manages an append-only file system for storing data.
type Aof struct {
file *os.File
buf *bytes.Buffer
mu sync.Mutex
filePath string
file *os.File
buf *bytes.Buffer
mu sync.Mutex
}

func NewAof(path string) (*Aof, error) {
f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
return nil, err
}

aof := &Aof{
file: f,
buf: bytes.NewBuffer(make([]byte, 0, 1024)),
file: fd,
filePath: path,
buf: bytes.NewBuffer(make([]byte, 0, MB)),
}

go func() {
Expand Down Expand Up @@ -57,13 +66,15 @@ func (aof *Aof) Read(fn func(value Value)) error {
aof.mu.Lock()
defer aof.mu.Unlock()

// Ensure the file pointer is at the start.
_, err := aof.file.Seek(0, io.SeekStart)
// Read file data by mmap.
data, err := mmap.Open(aof.filePath, false)
if len(data) == 0 {
return nil
}
if err != nil {
return err
}

reader := NewResp(aof.file)
reader := NewResp(data)

// Iterate over the records in the file, applying the function to each.
for {
Expand Down
2 changes: 2 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ var (
ErrWrongType = errors.New("WRONGTYPE Operation against a key holding the wrong kind of value")

ErrUnknownType = errors.New("ERR unknown value type")

ErrCRLFNotFound = errors.New("ERR CRLF not found in line")
)

func ErrWrongArgs(cmd string) error {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/redis/go-redis/v9 v9.5.2
github.com/sakeven/RbTree v0.0.0-20240321014605-9899538dc980
github.com/stretchr/testify v1.9.0
github.com/tidwall/mmap v0.3.0
github.com/xgzlucario/GigaCache v0.0.0-20240531152919-576765cef731
github.com/xgzlucario/quicklist v0.0.0-20240530174658-6f1a884f579b
golang.org/x/sys v0.20.0
Expand All @@ -18,6 +19,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80N
github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ=
github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand All @@ -35,12 +37,15 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tidwall/mmap v0.3.0 h1:XXt1YsiXCF5/UAu3pLbu6g7iulJ9jsbs6vt7UpiV0sY=
github.com/tidwall/mmap v0.3.0/go.mod h1:2/dNzF5zA+te/JVHfrqNLcRkb8LjdH3c80vYHFQEZRk=
github.com/xgzlucario/GigaCache v0.0.0-20240531152919-576765cef731 h1:frRQxMZFCPWfoiWau4bPcYmNDGNVPLqM9nqnsp6Uakg=
github.com/xgzlucario/GigaCache v0.0.0-20240531152919-576765cef731/go.mod h1:sPwGPAuvd9WdiONTmusXGNocqcY5L/J7+st1upAMlX8=
github.com/xgzlucario/quicklist v0.0.0-20240530174658-6f1a884f579b h1:C/+nN/kFJ6yrmEhIu+5Ra2jx/W8w+Ayu8pTiZfuU5Xc=
github.com/xgzlucario/quicklist v0.0.0-20240530174658-6f1a884f579b/go.mod h1:1ZgyZNk91XIllYdOPpwP+9L2RCw6QGSy6alTYF+Z0iU=
golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc h1:O9NuF4s+E/PvMIy+9IUZB9znFwUIXEWSstNjek6VpVg=
golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
45 changes: 22 additions & 23 deletions resp.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"bufio"
"bytes"
"fmt"
"io"
Expand Down Expand Up @@ -36,12 +35,12 @@ type Value struct {

// Resp is a parser for RESP encoded data.
type Resp struct {
reader *bufio.Reader
b []byte
}

// NewResp creates a new Resp object with a buffered reader.
func NewResp(rd io.Reader) *Resp {
return &Resp{reader: bufio.NewReader(rd)}
func NewResp(b []byte) *Resp {
return &Resp{b: b}
}

func newErrValue(err error) Value {
Expand All @@ -64,19 +63,13 @@ func newArrayValue(value []Value) Value {
}

// readLine reads a line ending with CRLF from the reader.
func (r *Resp) readLine() (line []byte, n int, err error) {
for {
b, err := r.reader.ReadByte()
if err != nil {
return nil, 0, err
}
n += 1
line = append(line, b)
if len(line) >= 2 && bytes.HasSuffix(line, CRLF) {
break
}
func (r *Resp) readLine() ([]byte, int, error) {
before, after, found := bytes.Cut(r.b, CRLF)
if found {
r.b = after
return before, len(before) + 2, nil
}
return line[:len(line)-2], n, nil // Trim the CRLF at the end
return nil, 0, ErrCRLFNotFound
}

// readInteger reads an integer value following the ':' prefix.
Expand All @@ -92,9 +85,18 @@ func (r *Resp) readInteger() (x int, n int, err error) {
return int(i64), n, nil
}

func (r *Resp) readByte() (byte, error) {
if len(r.b) == 0 {
return 0, io.EOF
}
b := r.b[0]
r.b = r.b[1:]
return b, nil
}

// Read parses the next RESP value from the stream.
func (r *Resp) Read() (Value, error) {
_type, err := r.reader.ReadByte()
_type, err := r.readByte()
if err != nil {
return Value{}, err
}
Expand Down Expand Up @@ -146,12 +148,9 @@ func (r *Resp) readBulk() (Value, error) {
return Value{typ: NULL}, nil
}

bulk := make([]byte, len)
_, err = io.ReadFull(r.reader, bulk) // Use ReadFull to ensure we read exactly 'len' bytes
if err != nil {
return v, err
}
v.bulk = bulk
v.bulk = r.b[:len]
r.b = r.b[len:]

r.readLine() // Read the trailing CRLF

return v, nil
Expand Down
19 changes: 9 additions & 10 deletions resp_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"bytes"
"errors"
"strconv"
"strings"
Expand All @@ -19,7 +18,7 @@ func TestValue(t *testing.T) {
data := value.Marshal()
assert.Equal(string(data), "+OK\r\n")

_, err := NewResp(bytes.NewReader(data)).Read()
_, err := NewResp(data).Read()
assert.NotNil(err)
})

Expand All @@ -28,7 +27,7 @@ func TestValue(t *testing.T) {
data := value.Marshal()
assert.Equal(string(data), "-err message\r\n")

_, err := NewResp(bytes.NewReader(data)).Read()
_, err := NewResp(data).Read()
assert.NotNil(err)
})

Expand All @@ -37,7 +36,7 @@ func TestValue(t *testing.T) {
data := value.Marshal()
assert.Equal(string(data), "$5\r\nhello\r\n")

value2, err := NewResp(bytes.NewReader(data)).Read()
value2, err := NewResp(data).Read()
assert.Nil(err)
assert.Equal(value, value2)

Expand All @@ -46,7 +45,7 @@ func TestValue(t *testing.T) {
data = value.Marshal()
assert.Equal(string(data), "$0\r\n\r\n")

value2, err = NewResp(bytes.NewReader(data)).Read()
value2, err = NewResp(data).Read()
assert.Nil(err)
assert.Equal(value, value2)

Expand All @@ -55,7 +54,7 @@ func TestValue(t *testing.T) {
data = value.Marshal()
assert.Equal(string(data), "$-1\r\n")

value2, err = NewResp(bytes.NewReader(data)).Read()
value2, err = NewResp(data).Read()
assert.Nil(err)
assert.Equal(value, value2)
})
Expand All @@ -65,7 +64,7 @@ func TestValue(t *testing.T) {
data := value.Marshal()
assert.Equal(string(data), ":1\r\n")

value2, err := NewResp(bytes.NewReader(data)).Read()
value2, err := NewResp(data).Read()
assert.Nil(err)
assert.Equal(value, value2)
})
Expand All @@ -81,19 +80,19 @@ func TestValue(t *testing.T) {
data := value.Marshal()
assert.Equal(string(data), "*5\r\n:1\r\n:2\r\n:3\r\n$5\r\nhello\r\n$5\r\nworld\r\n")

value2, err := NewResp(bytes.NewReader(data)).Read()
value2, err := NewResp(data).Read()
assert.Nil(err)
assert.Equal(value, value2)
})

t.Run("error-value", func(t *testing.T) {
// read nil
_, err := NewResp(bytes.NewReader(nil)).Read()
_, err := NewResp(nil).Read()
assert.NotNil(err)

for _, prefix := range []byte{BULK, INTEGER, ARRAY} {
data := append([]byte{prefix}, "an error message"...)
_, err := NewResp(bytes.NewReader(data)).Read()
_, err := NewResp(data).Read()
assert.NotNil(err)
}

Expand Down
9 changes: 6 additions & 3 deletions rotom.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"bytes"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -100,7 +99,7 @@ func InitDB(config *Config) (err error) {
log.Printf("start loading aof file...")

// Load the initial data into memory by processing each stored command.
db.aof.Read(func(value Value) {
err = db.aof.Read(func(value Value) {
command := value.array[0].bulk
args := value.array[1:]

Expand All @@ -109,6 +108,10 @@ func InitDB(config *Config) (err error) {
cmd.processCommand(args)
}
})
if err != nil {
log.Println("read appendonly file error:", err)
return
}
}

return nil
Expand Down Expand Up @@ -172,7 +175,7 @@ func (server *Server) RunServe() {
}

func (server *Server) handleConnection(buf []byte, conn net.Conn) {
resp := NewResp(bytes.NewReader(buf))
resp := NewResp(buf)
for {
value, err := resp.Read()
if err != nil {
Expand Down

0 comments on commit 40df7d3

Please sign in to comment.