Skip to content

Commit

Permalink
perf: RESP & aeloop performance optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
xgzlucario committed Aug 4, 2024
1 parent 5483c91 commit 5c7a712
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 75 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ English | [中文](README_CN.md)

## Introduction

This is rotom, a tiny Redis Server written in Go. It replicates the core event loop mechanism AeLoop in Redis based on I/O multiplexing.
This is rotom, a high performance, low latency tiny Redis Server written in Go. It replicates the core event loop mechanism AeLoop in Redis based on I/O multiplexing.

## Features

Expand Down
2 changes: 1 addition & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

## 介绍

这里是 rotom,一个使用 Go 编写的 tiny Redis Server。基于 IO 多路复用还原了 Redis 中的 AeLoop 核心事件循环机制。
这里是 rotom,一个使用 Go 编写的高性能,低延迟的 tiny Redis Server。基于 IO 多路复用还原了 Redis 中的 AeLoop 核心事件循环机制。

## 特性

Expand Down
5 changes: 4 additions & 1 deletion ae.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type AeLoop struct {
fileEventFd int
timeEventNextId int
stop bool

_fevents []*AeFileEvent // fes cache
}

func (loop *AeLoop) AddRead(fd int, proc FileProc, extra interface{}) {
Expand Down Expand Up @@ -139,6 +141,7 @@ func AeLoopCreate() (*AeLoop, error) {
fileEventFd: epollFd,
timeEventNextId: 1,
stop: false,
_fevents: make([]*AeFileEvent, 128), // pre alloc
}, nil
}

Expand Down Expand Up @@ -171,7 +174,7 @@ retry:
}

// collect file events
fes = make([]*AeFileEvent, 0, n)
fes = loop._fevents[:0]
for _, ev := range events[:n] {
if ev.Events&unix.EPOLLIN != 0 {
fe := loop.FileEvents[int(ev.Fd)]
Expand Down
28 changes: 10 additions & 18 deletions command.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"bytes"
"fmt"
"strconv"
"strings"
Expand All @@ -13,7 +12,7 @@ import (
)

var (
WITH_SCORES = []byte("WITHSCORES")
WITH_SCORES = "WITHSCORES"
)

type Command struct {
Expand Down Expand Up @@ -65,10 +64,6 @@ func equalFold(a, b string) bool {
return len(a) == len(b) && strings.EqualFold(a, b)
}

func equalFoldBytes(a, b []byte) bool {
return len(a) == len(b) && bytes.EqualFold(a, b)
}

func lookupCommand(name string) (*Command, error) {
for _, c := range cmdTable {
if equalFold(name, c.name) {
Expand Down Expand Up @@ -98,11 +93,11 @@ func setCommand(writer *RESPWriter, args []RESP) {
}

func incrCommand(writer *RESPWriter, args []RESP) {
key := args[0].ToString()
key := args[0].ToStringUnsafe()

object, ttl := db.dict.Get(key)
if ttl == dict.KEY_NOT_EXIST {
db.dict.Set(key, 1)
db.dict.Set(strings.Clone(key), 1)
writer.WriteInteger(1)
return
}
Expand All @@ -114,6 +109,7 @@ func incrCommand(writer *RESPWriter, args []RESP) {
writer.WriteInteger(num)

case dict.TypeString:
// conv to integer
bytes := object.Data().([]byte)
num, err := RESP(bytes).ToInt()
if err != nil {
Expand Down Expand Up @@ -345,14 +341,12 @@ func saddCommand(writer *RESPWriter, args []RESP) {
}

func sremCommand(writer *RESPWriter, args []RESP) {
key := args[0].ToString()

key := args[0].ToStringUnsafe()
set, err := fetchSet(key)
if err != nil {
writer.WriteError(err)
return
}

var count int
for _, arg := range args[1:] {
if set.Remove(arg.ToStringUnsafe()) {
Expand All @@ -363,17 +357,15 @@ func sremCommand(writer *RESPWriter, args []RESP) {
}

func spopCommand(writer *RESPWriter, args []RESP) {
key := args[0].ToString()

key := args[0].ToStringUnsafe()
set, err := fetchSet(key)
if err != nil {
writer.WriteError(err)
return
}

item, ok := set.Pop()
member, ok := set.Pop()
if ok {
writer.WriteBulkString(item)
writer.WriteBulkString(member)
} else {
writer.WriteNull()
}
Expand Down Expand Up @@ -461,7 +453,7 @@ func zrangeCommand(writer *RESPWriter, args []RESP) {
}
start = min(start, stop)

withScores := len(args) == 4 && equalFoldBytes(args[3], WITH_SCORES)
withScores := len(args) == 4 && equalFold(args[3].ToStringUnsafe(), WITH_SCORES)
if withScores {
writer.WriteArrayHead((stop - start) * 2)
zset.Range(start, stop, func(key string, score float64) {
Expand All @@ -478,7 +470,7 @@ func zrangeCommand(writer *RESPWriter, args []RESP) {
}

func zpopminCommand(writer *RESPWriter, args []RESP) {
key := args[0].ToString()
key := args[0].ToStringUnsafe()
count := 1
var err error
if len(args) > 1 {
Expand Down
57 changes: 25 additions & 32 deletions resp.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package main

import (
"bytes"
"io"
"slices"
"strconv"
"unsafe"
)
Expand All @@ -28,20 +28,23 @@ func NewReader(input []byte) *RESPReader {
return &RESPReader{b: input}
}

// cutByCRLF splits the buffer by the first occurrence of CRLF.
func cutByCRLF(buf []byte) (before, after []byte, found bool) {
n := len(buf)
if n <= 2 {
return
}
for i, b := range buf[:n-1] {
// parseInt parse first integer from buf.
// input "3\r\nHELLO" -> (3, "HELLO", nil).
func parseInt(buf []byte) (n int, after []byte, err error) {
for i, b := range buf {
if b >= '0' && b <= '9' {
n = n*10 + int(b-'0')
continue
}
if b == '\r' {
if buf[i+1] == '\n' {
return buf[:i], buf[i+2:], true
if len(buf) > i+1 && buf[i+1] == '\n' {
return n, buf[i+2:], nil
}
break
}
return 0, nil, errParseInteger
}
return
return 0, nil, errCRLFNotFound
}

// ReadNextCommand reads the next RESP command from the RESPReader.
Expand All @@ -55,45 +58,37 @@ func (r *RESPReader) ReadNextCommand(argsBuf []RESP) (args []RESP, err error) {
switch r.b[0] {
case ARRAY:
// command_bulk format
before, after, ok := cutByCRLF(r.b[1:])
if !ok {
return nil, errCRLFNotFound
}
count, err := strconv.Atoi(b2s(before))
num, after, err := parseInt(r.b[1:])
if err != nil {
return nil, err
}
r.b = after

// read bulk strings for range
for i := 0; i < count; i++ {
for i := 0; i < num; i++ {
if len(r.b) == 0 || r.b[0] != BULK {
return nil, errInvalidArguments
}

// read CRLF
before, after, ok := cutByCRLF(r.b[1:])
if !ok {
return nil, errCRLFNotFound
}
count, err := strconv.Atoi(b2s(before))
num, after, err := parseInt(r.b[1:])
if err != nil {
return nil, err
}
r.b = after

// bound check
if count < 0 || count+2 > len(r.b) {
if num < 0 || num+2 > len(after) {
return nil, errInvalidArguments
}

args = append(args, r.b[:count])
r.b = r.b[count+2:]
args = append(args, after[:num])

// skip CRLF
r.b = after[num+2:]
}

default:
// command_inline format
before, after, ok := cutByCRLF(r.b)
before, after, ok := bytes.Cut(r.b, CRLF)
if !ok {
return nil, errInvalidArguments
}
Expand Down Expand Up @@ -179,8 +174,6 @@ func (r RESP) ToInt() (int, error) { return strconv.Atoi(b2s(r)) }

func (r RESP) ToFloat() (float64, error) { return strconv.ParseFloat(b2s(r), 64) }

func (r RESP) Clone() []byte { return slices.Clone(r) }
func (r RESP) Clone() []byte { return bytes.Clone(r) }

func b2s(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}
func b2s(b []byte) string { return *(*string)(unsafe.Pointer(&b)) }
44 changes: 22 additions & 22 deletions resp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,26 +61,26 @@ func TestReader(t *testing.T) {
}
})

t.Run("cutByCRLF", func(t *testing.T) {
before, after, ok := cutByCRLF([]byte("123\r\n456"))
assert.Equal(string(before), "123")
assert.Equal(string(after), "456")
assert.True(ok)

before, after, ok = cutByCRLF([]byte("1234\r\n5678"))
assert.Equal(string(before), "1234")
assert.Equal(string(after), "5678")
assert.True(ok)

// error cases
_, _, ok = cutByCRLF([]byte("A"))
assert.False(ok)

_, _, ok = cutByCRLF([]byte("ABC"))
assert.False(ok)

_, _, ok = cutByCRLF([]byte("1234\r"))
assert.False(ok)
t.Run("parseInt", func(t *testing.T) {
n, after, err := parseInt([]byte("3\r\nHELLO"))
assert.Equal(n, 3)
assert.Equal(after, []byte("HELLO"))
assert.Nil(err)

n, after, err = parseInt([]byte("003\r\nHELLO"))
assert.Equal(n, 3)
assert.Equal(after, []byte("HELLO"))
assert.Nil(err)

// errors
_, _, err = parseInt([]byte("ABC\r\nHELLO"))
assert.ErrorIs(err, errParseInteger)

_, _, err = parseInt([]byte("1234567\r"))
assert.ErrorIs(err, errCRLFNotFound)

_, _, err = parseInt([]byte("1234567"))
assert.ErrorIs(err, errCRLFNotFound)
})

t.Run("command-bulk", func(t *testing.T) {
Expand All @@ -93,11 +93,11 @@ func TestReader(t *testing.T) {
// error
args, err = NewReader([]byte("*A\r\n$3\r\nGET\r\n$3\r\nfoo\r\n")).ReadNextCommand(nil)
assert.Equal(len(args), 0)
assert.NotNil(err)
assert.ErrorIs(err, errParseInteger)

args, err = NewReader([]byte("*3\r\n$A\r\nGET\r\n$3\r\nfoo\r\n")).ReadNextCommand(nil)
assert.Equal(len(args), 0)
assert.NotNil(err)
assert.ErrorIs(err, errParseInteger)

args, err = NewReader([]byte("*3\r\n+PING")).ReadNextCommand(nil)
assert.Equal(len(args), 0)
Expand Down

0 comments on commit 5c7a712

Please sign in to comment.