Skip to content

Commit

Permalink
Refact cmdTable & Improve code coverage (#19)
Browse files Browse the repository at this point in the history
* add test coverage

* add server and client test

* fix concurrent close db

* wrap Decoder

* fix decoder

* fix parser

* add coverage

* need to refact load-switch

* wrap hooks in cmdTable

* wrap hooks and writer in cmdTable

* stash

* fix settx timcarry

* reordered cmdTable

* comments

* add hkeys

---------

Co-authored-by: guangzhixu <[email protected]>
  • Loading branch information
xgzlucario and satoshi-099 authored Oct 30, 2023
1 parent f9aa275 commit 370c21f
Show file tree
Hide file tree
Showing 13 changed files with 657 additions and 482 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ cpu: 13th Gen Intel(R) Core(TM) i5-13600KF

### Rotom

200 clients inserting a total of 1 million data, completed in 628ms, reaching a qps of 1.59 million, p99 latency is 1.1ms.
200 clients inserting a total of 1 million data, completed in 556ms, reaching a qps of 1.79 million, p99 latency is 1.1ms.

```bash
$ go run client/*.go
1000000 requests cost: 628.747032ms
[qps] 1590415.82 req/sec
[latency] avg: 115.852µs | min: 3.837µs | p50: 49.395µs | p95: 216.273µs | p99: 1.155185ms | max: 15.9881ms
1000000 requests cost: 556.955696ms
[qps] 1795418.85 req/sec
[latency] avg: 98.74µs | min: 3.632µs | p50: 40.903µs | p95: 175.456µs | p99: 1.09595ms | max: 13.305872ms
```

### Redis
Expand Down
8 changes: 4 additions & 4 deletions README_ZN.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ cpu: 13th Gen Intel(R) Core(TM) i5-13600KF

### Rotom

使用 200 个 clients 插入共 100 万数据,663ms 完成,qps 达到 150 万,p99 延迟 1.2ms
使用 200 个 clients 插入共 100 万数据,556ms 完成,qps 达到 179 万,p99 延迟 1.1ms

```bash
$ go run client/*.go
1000000 requests cost: 663.97797ms
[qps] 1506028.48 req/sec
[latency] avg: 119.645µs | min: 4.052µs | p50: 49.464µs | p95: 425.006µs | p99: 1.195428ms | max: 17.713702ms
1000000 requests cost: 556.955696ms
[qps] 1795418.85 req/sec
[latency] avg: 98.74µs | min: 3.632µs | p50: 40.903µs | p95: 175.456µs | p99: 1.09595ms | max: 13.305872ms
```

### Redis
Expand Down
2 changes: 2 additions & 0 deletions base/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var (
ErrNotNumberic = errors.New("value is not numberic")

ErrParseRecordLine = errors.New("parse record line error, db file is broken")
ErrCheckSum = errors.New("crc checksum error, record line is invalid")

ErrInvalidArgs = errors.New("invalid args")
ErrInvalidResponse = errors.New("invalid response")

Expand Down
6 changes: 6 additions & 0 deletions base/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ type Gober interface {
GobDecode([]byte) error
}

// Writer
type Writer interface {
Write([]byte) (int, error)
WriteByte(byte) error
}

// SyncPolicy represents how often data is synced to disk.
type SyncPolicy byte

Expand Down
97 changes: 71 additions & 26 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
package rotom

import (
"errors"
"net"
"time"

"github.com/bytedance/sonic"
cache "github.com/xgzlucario/GigaCache"
"github.com/xgzlucario/rotom/base"
"golang.org/x/exp/slices"
)

var (
// Since there is a limit to the number of concurrent clients,
// which is usually not very large,
// use bpool to reuse the buffer.
bpool = cache.NewBytePoolCap(1000, 512, 512)
)

// Client defines the client that connects to the server.
Expand All @@ -26,12 +20,13 @@ type Client struct {
func NewClient(addr string) (c *Client, err error) {
c = &Client{}
c.c, err = net.Dial("tcp", addr)
c.b = make([]byte, 512)
return
}

// Set
func (c *Client) Set(key string, val []byte) ([]byte, error) {
return c.SetTx(key, val, NoTTL)
return c.SetTx(key, val, noTTL)
}

// SetEx
Expand All @@ -41,41 +36,79 @@ func (c *Client) SetEx(key string, val []byte, ttl time.Duration) ([]byte, error

// SetTx
func (c *Client) SetTx(key string, val []byte, ts int64) ([]byte, error) {
return c.do(NewCodec(OpSetTx).Type(TypeString).Str(key).Int(ts).Bytes(val))
return c.do(NewCodec(OpSetTx).Type(TypeString).Str(key).Int(ts / timeCarry).Bytes(val))
}

// Remove
func (c *Client) Remove(key string) ([]byte, error) {
return c.do(NewCodec(OpRemove).Str(key))
func (c *Client) Remove(key string) (bool, error) {
args, err := c.do(NewCodec(OpRemove).Str(key))
if err != nil {
return false, err
}
return args[0] == _true, nil
}

// Rename
func (c *Client) Rename(key, newKey string) ([]byte, error) {
return c.do(NewCodec(OpRename).Str(key).Str(newKey))
func (c *Client) Rename(key, newKey string) (bool, error) {
args, err := c.do(NewCodec(OpRename).Str(key).Str(newKey))
if err != nil {
return false, err
}
return args[0] == _true, nil
}

// Get
func (c *Client) Get(key string) ([]byte, error) {
return c.do(NewCodec(ReqGet).Str(key))
return c.do(NewCodec(OpGet).Str(key))
}

// Len
func (c *Client) Len() (int, error) {
bytes, err := c.do(NewCodec(ReqLen))
func (c *Client) Len() (uint64, error) {
args, err := c.do(NewCodec(OpLen))
if err != nil {
return 0, err
}
return base.ParseInt[int](bytes), nil
return base.ParseInt[uint64](args), nil
}

// HSet
func (c *Client) HSet(key, field string, val []byte) ([]byte, error) {
return c.do(NewCodec(OpHSet).Str(key).Str(field).Bytes(val))
func (c *Client) HSet(key, field string, val []byte) error {
_, err := c.do(NewCodec(OpHSet).Str(key).Str(field).Bytes(val))
return err
}

// HGet
func (c *Client) HGet(key, field string) ([]byte, error) {
return c.do(NewCodec(OpHGet).Str(key).Str(field))
}

// HLen
func (c *Client) HLen(key string) (int, error) {
args, err := c.do(NewCodec(OpHLen).Str(key))
if err != nil {
return 0, err
}
return base.ParseInt[int](args), nil
}

// HKeys
func (c *Client) HKeys(key string) ([]string, error) {
args, err := c.do(NewCodec(OpHKeys).Str(key))
if err != nil {
return nil, err
}
var keys []string
err = sonic.Unmarshal(args, &keys)
return keys, err
}

// HRemove
func (c *Client) HRemove(key, field string) ([]byte, error) {
return c.do(NewCodec(OpHRemove).Str(key).Str(field))
func (c *Client) HRemove(key, field string) (bool, error) {
args, err := c.do(NewCodec(OpHRemove).Str(key).Str(field))
if err != nil {
return false, err
}
return args[0] == _true, nil
}

// Close
Expand All @@ -87,17 +120,29 @@ func (c *Client) Close() error {
func (c *Client) do(cd *Codec) ([]byte, error) {
_, err := c.c.Write(cd.B)
cd.Recycle()

if err != nil {
return nil, err
}
c.b = bpool.Get()
defer bpool.Put(c.b)

// read response.
n, err := c.c.Read(c.b)
if err != nil {
return nil, err
}

return slices.Clone(c.b[:n]), nil
// parse data.
op, args, err := NewDecoder(c.b[:n]).ParseRecord()
if err != nil {
return nil, err
}
if op != Response {
return nil, base.ErrInvalidResponse
}

// the first args is response code.
if int64(args[0][0]) == RES_ERROR {
return nil, errors.New(*b2s(args[1]))
}

return args[1], nil
}
66 changes: 62 additions & 4 deletions codec.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rotom

import (
"bytes"
"fmt"
"reflect"
"strconv"
Expand All @@ -16,9 +17,7 @@ const (
)

var codecPool = sync.Pool{
New: func() any {
return &Codec{B: make([]byte, 0, 16)}
},
New: func() any { return &Codec{B: make([]byte, 0, 16)} },
}

// Codec is the primary type for encoding data into a specific format.
Expand Down Expand Up @@ -72,7 +71,7 @@ func (s *Codec) Float(f float64) *Codec {
// format encodes a byte slice into the Coder's buffer as a record.
func (s *Codec) format(v []byte) *Codec {
s.B = append(s.B, base.FormatInt(len(v))...)
s.B = append(s.B, SepChar)
s.B = append(s.B, sepChar)
s.B = append(s.B, v...)
return s
}
Expand Down Expand Up @@ -114,3 +113,62 @@ func s2b(str *string) []byte {
func b2s(buf []byte) *string {
return (*string)(unsafe.Pointer(&buf))
}

// bool2byte
func bool2byte(b bool) byte {
if b {
return _true
}
return _false
}

type Decoder struct {
b []byte
}

func NewDecoder(buf []byte) *Decoder {
return &Decoder{b: buf}
}

// ParseRecord parse one operation record line.
func (s *Decoder) ParseRecord() (op Operation, res [][]byte, err error) {
if s.Done() {
return 0, nil, base.ErrParseRecordLine
}
op = Operation(s.b[0])
line := s.b[1:]

// bound check.
if int(op) >= len(cmdTable) {
return 0, nil, base.ErrParseRecordLine
}

argsNum := cmdTable[op].argsNum
res = make([][]byte, 0, argsNum)

// parses args.
for j := 0; j < int(argsNum); j++ {
i := bytes.IndexByte(line, sepChar)
if i <= 0 {
return 0, nil, base.ErrParseRecordLine
}

klen := base.ParseInt[int](line[:i])
i++

// bound check.
if i+klen > len(line) {
return 0, nil, base.ErrParseRecordLine
}
res = append(res, line[i:i+klen])

line = line[i+klen:]
}
s.b = line

return
}

func (s *Decoder) Done() bool {
return len(s.b) == 0
}
7 changes: 2 additions & 5 deletions examples/client/main.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package main

import (
"bytes"
"fmt"
"strconv"
"time"

"github.com/sourcegraph/conc/pool"
cache "github.com/xgzlucario/GigaCache"
"github.com/xgzlucario/rotom"
"github.com/xgzlucario/rotom/base"
)

const (
Expand All @@ -27,7 +25,6 @@ func cmd() {
start := time.Now()
p := pool.New()

validator := rotom.NewCodec(rotom.Response).Int(int64(rotom.RES_SUCCESS)).Str("ok").B
delays := cache.NewPercentile()

for i := 0; i < CLIENT_NUM; i++ {
Expand All @@ -48,8 +45,8 @@ func cmd() {
if err != nil {
panic(err)
}
if !bytes.Equal(res, validator) {
panic(base.ErrInvalidResponse)
if len(res) > 0 {
panic("error res")
}

// stat
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/sakeven/RbTree v0.0.0-20220710124251-94e35f9fed6c
github.com/sourcegraph/conc v0.3.0
github.com/stretchr/testify v1.8.4
github.com/xgzlucario/GigaCache v0.0.0-20231020075600-61001d347387
github.com/xgzlucario/GigaCache v0.0.0-20231022133204-b0a967d404cb
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/xgzlucario/GigaCache v0.0.0-20231020075600-61001d347387 h1:1B7fhItxnyPHVI95u1uyXL7DkqT7F22RAN+LdirTl0M=
github.com/xgzlucario/GigaCache v0.0.0-20231020075600-61001d347387/go.mod h1:n0gu6svrq5UYwUWv8RRYgt06u8e5E3AMNg5eqflP74Y=
github.com/xgzlucario/GigaCache v0.0.0-20231022133204-b0a967d404cb h1:HqKnS65YqDtZtDNwKDRMIiZKfV3eFWYuomKban0KY+k=
github.com/xgzlucario/GigaCache v0.0.0-20231022133204-b0a967d404cb/go.mod h1:n0gu6svrq5UYwUWv8RRYgt06u8e5E3AMNg5eqflP74Y=
github.com/zeebo/assert v1.3.1 h1:vukIABvugfNMZMQO1ABsyQDJDTVQbn+LWSMy1ol1h6A=
github.com/zeebo/assert v1.3.1/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
Expand Down
Loading

0 comments on commit 370c21f

Please sign in to comment.