Skip to content

Commit

Permalink
wrap Decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
satoshi-099 committed Oct 24, 2023
1 parent 0398e41 commit 507e338
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 56 deletions.
2 changes: 2 additions & 0 deletions base/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var (
ErrNotNumberic = errors.New("value is not numberic")

ErrParseRecordLine = errors.New("parse record line error, db file is broken")
ErrCheckSum = errors.New("crc checksum error, record line is invalid")

ErrInvalidArgs = errors.New("invalid args")
ErrInvalidResponse = errors.New("invalid response")

Expand Down
63 changes: 60 additions & 3 deletions codec.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package rotom

import (
"bytes"
"encoding/binary"
"fmt"
"reflect"
"strconv"
"sync"
"unsafe"

"github.com/xgzlucario/rotom/base"
"github.com/zeebo/xxh3"
)

const (
Expand All @@ -16,9 +19,7 @@ const (
)

var codecPool = sync.Pool{
New: func() any {
return &Codec{B: make([]byte, 0, 16)}
},
New: func() any { return &Codec{B: make([]byte, 0, 16)} },
}

// Codec is the primary type for encoding data into a specific format.
Expand Down Expand Up @@ -77,6 +78,11 @@ func (s *Codec) format(v []byte) *Codec {
return s
}

// crc
func (s *Codec) crc() {
s.B = binary.LittleEndian.AppendUint32(s.B, uint32(xxh3.Hash(s.B)))
}

func (s *Codec) Any(v any) (*Codec, error) {
buf, err := s.encode(v)
if err != nil {
Expand Down Expand Up @@ -114,3 +120,54 @@ func s2b(str *string) []byte {
func b2s(buf []byte) *string {
return (*string)(unsafe.Pointer(&buf))
}

type Decoder struct {
b []byte
}

func NewDecoder(buf []byte) *Decoder {
return &Decoder{b: buf}
}

// ParseRecord parse one operation record line.
func (s *Decoder) ParseRecord() (op Operation, res [][]byte, err error) {
if s.Done() {
return 0, nil, base.ErrParseRecordLine
}
op = Operation(s.b[0])
line := s.b[1:]

// bound check.
if int(op) >= len(cmdTable) {
return 0, nil, base.ErrParseRecordLine
}

argsNum := cmdTable[op].ArgsNum
res = make([][]byte, 0, argsNum)

// parses args.
for j := 0; j < argsNum; j++ {
i := bytes.IndexByte(line, SepChar)
if i <= 0 {
return 0, nil, base.ErrParseRecordLine
}

klen := base.ParseInt[int](line[:i])
i++

// bound check.
if i+klen > len(line) {
return 0, nil, base.ErrParseRecordLine
}
res = append(res, line[i:i+klen])

line = line[i+klen:]
}
s.b = line

return
}

func (s *Decoder) Done() bool {
return len(s.b) == 0
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/sourcegraph/conc v0.3.0
github.com/stretchr/testify v1.8.4
github.com/xgzlucario/GigaCache v0.0.0-20231022133204-b0a967d404cb
github.com/zeebo/xxh3 v1.0.2
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
)

Expand All @@ -26,7 +27,6 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/arch v0.5.0 // indirect
Expand Down
53 changes: 5 additions & 48 deletions rotom.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,25 +784,11 @@ func (e *Engine) load() error {
return err
}

e.logInfo("start to load e size %s", formatSize(len(line)))
e.logInfo("loading db file size %s", formatSize(len(line)))

var args [][]byte

// record line is like:
// <OP><argsNum><args...>
for len(line) > 2 {
op := Operation(line[0])

// if operation valid
if int(op) >= len(cmdTable) {
return base.ErrParseRecordLine
}

argsNum := cmdTable[op].ArgsNum
line = line[1:]

// parse args by operation
args, line, err = parseLine(line, argsNum)
decoder := NewDecoder(line)
for !decoder.Done() {
op, args, err := decoder.ParseRecord()
if err != nil {
return err
}
Expand Down Expand Up @@ -1045,35 +1031,6 @@ func (e *Engine) shrink() {
e.logInfo("rotom rewrite done")
}

// parseLine parse file content to record lines.
// exp:
// input: <key_len>SEP<key_value><somthing...>
// return: key_value, somthing..., error
func parseLine(line []byte, argsNum int) ([][]byte, []byte, error) {
res := make([][]byte, 0, argsNum)

for flag := 0; flag < argsNum; flag++ {
i := bytes.IndexByte(line, SepChar)
if i <= 0 {
return nil, nil, base.ErrParseRecordLine
}

key_len := base.ParseInt[int](line[:i])
i++

// valid
if len(line) < i+key_len {
return nil, nil, base.ErrParseRecordLine
}

res = append(res, line[i:i+key_len])

line = line[i+key_len:]
}

return res, line, nil
}

// fetchMap
func (e *Engine) fetchMap(key string, setWhenNotExist ...bool) (m Map, err error) {
return fetch(e, key, func() Map {
Expand Down Expand Up @@ -1171,7 +1128,7 @@ func (e *Engine) printRuntimeStats() {
With("alloc", formatSize(memStats.Alloc)).
With("sys", formatSize(memStats.Sys)).
With("gctime", stats.NumGC).
With("heapObj", memStats.HeapObjects).
With("heapObjects", memStats.HeapObjects).
With("gcpause", stats.PauseTotal/time.Duration(stats.NumGC)).
Info("[Runtime]")
}
Expand Down
6 changes: 2 additions & 4 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ func (e *RotomEngine) OnTraffic(conn gnet.Conn) gnet.Action {

// handleEvent
func (e *Engine) handleEvent(line []byte) (msg []byte, err error) {
op := Operation(line[0])

// parse args by operation
args, _, err := parseLine(line[1:], cmdTable[op].ArgsNum)
decoder := NewDecoder(line)
op, args, err := decoder.ParseRecord()
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 507e338

Please sign in to comment.