Skip to content

Commit

Permalink
List use quick list structure (#36)
Browse files Browse the repository at this point in the history
* refact list with quickList

* add fuzz test for list

* db add setTTL api

* list add range, iter function

---------

Co-authored-by: guangzhixu <[email protected]>
  • Loading branch information
xgzlucario and satoshi-099 authored Apr 4, 2024
1 parent 18bff69 commit 3718f24
Show file tree
Hide file tree
Showing 7 changed files with 569 additions and 134 deletions.
4 changes: 2 additions & 2 deletions benchmark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func benchHSet() {
}

func benchLRPush() {
fmt.Println("========== LRPush ==========")
fmt.Println("========== RPush ==========")
fmt.Println("size: 100*10000 enties")
fmt.Println("desc: value 10 bytes")

Expand All @@ -158,7 +158,7 @@ func benchLRPush() {
for i := 0; i < N; i++ {
t1 := time.Now()
k := fmt.Sprintf("%010d", i)
db.LRPush("mylist", k)
db.RPush("mylist", k)
quant.Add(float64(time.Since(t1)))
}

Expand Down
100 changes: 55 additions & 45 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ const (
mergeTypeOr
mergeTypeXOr

listDirectionLeft = 'L'
listDirectionRight = 'R'

fileLockName = "FLOCK"
)

Expand All @@ -36,6 +33,7 @@ type Operation byte

const (
OpSetTx Operation = iota
OpSetTTL
OpRemove
// map
OpHSet
Expand All @@ -45,8 +43,10 @@ const (
OpSRemove
OpSMerge // union, inter, diff
// list
OpLPush // lpush, rpush
OpLPop // lpop, rpop
OpLPush
OpRPush
OpLPop
OpRPop
// bitmap
OpBitSet
OpBitFlip
Expand Down Expand Up @@ -74,7 +74,7 @@ var cmdTable = []Cmd{
switch tp {
case TypeList:
ls := structx.NewList()
if err := ls.UnmarshalJSON(val); err != nil {
if err := ls.Unmarshal(val); err != nil {
return err
}
db.cm.Set(key, ls)
Expand Down Expand Up @@ -115,6 +115,11 @@ var cmdTable = []Cmd{
}
return nil
}},
{OpSetTTL, func(db *DB, reader *codeman.Reader) error {
// key, ts
db.SetTTL(reader.Str(), reader.Int64())
return nil
}},
{OpRemove, func(db *DB, reader *codeman.Reader) error {
// keys
db.Remove(reader.StrSlice()...)
Expand Down Expand Up @@ -154,27 +159,22 @@ var cmdTable = []Cmd{
}
}},
{OpLPush, func(db *DB, reader *codeman.Reader) error {
// direct, key, items
direct := reader.Byte()
key := reader.Str()
items := reader.StrSlice()

if direct == listDirectionLeft {
return db.LLPush(key, items...)
}
return db.LRPush(key, items...)
// key, items
return db.LPush(reader.Str(), reader.StrSlice()...)
}},
{OpLPop, func(db *DB, reader *codeman.Reader) (err error) {
// direct, key
direct := reader.Byte()
key := reader.Str()

if direct == listDirectionLeft {
_, err = db.LLPop(key)
} else {
_, err = db.LRPop(key)
}
return
{OpRPush, func(db *DB, reader *codeman.Reader) error {
// key, items
return db.RPush(reader.Str(), reader.StrSlice()...)
}},
{OpLPop, func(db *DB, reader *codeman.Reader) error {
// key
_, err := db.LPop(reader.Str())
return err
}},
{OpRPop, func(db *DB, reader *codeman.Reader) error {
// key
_, err := db.RPop(reader.Str())
return err
}},
{OpBitSet, func(db *DB, reader *codeman.Reader) error {
// key, val, offsets
Expand Down Expand Up @@ -385,7 +385,7 @@ func (db *DB) SetEx(key string, val []byte, ttl time.Duration) {
db.SetTx(key, val, cache.GetNanoSec()+int64(ttl))
}

// SetTx store key-value pair with deadlindb.
// SetTx store key-value pair with deadline.
func (db *DB) SetTx(key string, val []byte, ts int64) {
if ts < 0 {
return
Expand All @@ -394,21 +394,32 @@ func (db *DB) SetTx(key string, val []byte, ts int64) {
db.m.SetTx(key, val, ts)
}

// SetTTL set expired time of key-value.
func (db *DB) SetTTL(key string, ts int64) bool {
if ts < 0 {
return false
}
db.encode(newCodec(OpSetTTL).Str(key).Int(ts))
return db.m.SetTTL(key, ts)
}

// Remove
func (db *DB) Remove(keys ...string) (n int) {
db.encode(newCodec(OpRemove).StrSlice(keys))
for _, key := range keys {
if db.m.Remove(key) {
n++
} else if db.cm.Has(key) {
db.cm.Remove(key)
n++
}
}
return
}

// Len
func (db *DB) Len() int {
stat := db.m.Stat()
return stat.Conflict + stat.Len + db.cm.Count()
return db.m.Stat().Len + db.cm.Count()
}

// GC triggers the garbage collection to evict expired kv datas.
Expand All @@ -419,9 +430,9 @@ func (db *DB) GC() {
}

// Scan
func (db *DB) Scan(f func(string, []byte, int64) bool) {
func (db *DB) Scan(f func([]byte, []byte, int64) bool) {
db.m.Scan(func(key, value []byte, ttl int64) bool {
return f(string(key), value, ttl)
return f(key, value, ttl)
})
}

Expand Down Expand Up @@ -455,7 +466,6 @@ func (db *DB) HSet(key, field string, val []byte) error {
}
db.encode(newCodec(OpHSet).Str(key).Str(field).Bytes(val))
m.Set(field, val)

return nil
}

Expand Down Expand Up @@ -594,25 +604,25 @@ func (db *DB) SDiff(dst string, src ...string) error {
return nil
}

// LLPush
func (db *DB) LLPush(key string, items ...string) error {
// LPush
func (db *DB) LPush(key string, items ...string) error {
ls, err := db.fetchList(key, true)
if err != nil {
return err
}
db.encode(newCodec(OpLPush).Byte(listDirectionLeft).Str(key).StrSlice(items))
db.encode(newCodec(OpLPush).Str(key).StrSlice(items))
ls.LPush(items...)

return nil
}

// LRPush
func (db *DB) LRPush(key string, items ...string) error {
// RPush
func (db *DB) RPush(key string, items ...string) error {
ls, err := db.fetchList(key, true)
if err != nil {
return err
}
db.encode(newCodec(OpLPush).Byte(listDirectionRight).Str(key).StrSlice(items))
db.encode(newCodec(OpRPush).Str(key).StrSlice(items))
ls.RPush(items...)

return nil
Expand All @@ -631,8 +641,8 @@ func (db *DB) LIndex(key string, i int) (string, error) {
return res, nil
}

// LLPop
func (db *DB) LLPop(key string) (string, error) {
// LPop
func (db *DB) LPop(key string) (string, error) {
ls, err := db.fetchList(key)
if err != nil {
return "", err
Expand All @@ -641,13 +651,13 @@ func (db *DB) LLPop(key string) (string, error) {
if !ok {
return "", ErrEmptyList
}
db.encode(newCodec(OpLPop).Byte(listDirectionLeft).Str(key))
db.encode(newCodec(OpLPop).Str(key))

return res, nil
}

// LRPop
func (db *DB) LRPop(key string) (string, error) {
// RPop
func (db *DB) RPop(key string) (string, error) {
ls, err := db.fetchList(key)
if err != nil {
return "", err
Expand All @@ -656,7 +666,7 @@ func (db *DB) LRPop(key string) (string, error) {
if !ok {
return "", ErrEmptyList
}
db.encode(newCodec(OpLPop).Byte(listDirectionRight).Str(key))
db.encode(newCodec(OpRPop).Str(key))

return res, nil
}
Expand Down Expand Up @@ -923,7 +933,7 @@ func (db *DB) Shrink() error {
data, err = item.MarshalBinary()
case List:
types = TypeList
data, err = item.MarshalJSON()
data = item.Marshal()
case Set:
types = TypeSet
data, err = item.MarshalJSON()
Expand Down
Loading

0 comments on commit 3718f24

Please sign in to comment.