From 8971f9ab3e016f5420109880baf4b8d544c39d77 Mon Sep 17 00:00:00 2001 From: xgzlucario <912156837@qq.com> Date: Sun, 2 Jun 2024 16:11:06 +0800 Subject: [PATCH] rotom v2 --- .gitignore | 2 +- v2/aof.go => aof.go | 0 batch.go | 78 --- batch_test.go | 125 ---- bench_test.go | 43 ++ codeman/codec.go | 107 ---- codeman/reader.go | 91 --- v2/config.go => config.go | 0 v2/config.json => config.json | 2 +- db.go | 1016 --------------------------------- db_test.go | 880 ---------------------------- v2/epoll.go => epoll.go | 22 +- errors.go | 41 +- example/main.go | 28 - go.mod | 16 +- go.sum | 28 +- v2/handler.go => handler.go | 5 - main.go | 32 ++ options.go | 33 -- v2/resp.go => resp.go | 0 v2/rotom.go => rotom.go | 77 ++- structx/bitmap.go | 58 +- structx/map.go | 1 + structx/zset.go | 16 - utils.go | 2 +- v2/main.go | 78 --- v2/utils.go | 7 - 27 files changed, 184 insertions(+), 2604 deletions(-) rename v2/aof.go => aof.go (100%) delete mode 100644 batch.go delete mode 100644 batch_test.go create mode 100644 bench_test.go delete mode 100644 codeman/codec.go delete mode 100644 codeman/reader.go rename v2/config.go => config.go (100%) rename v2/config.json => config.json (67%) delete mode 100644 db.go delete mode 100644 db_test.go rename v2/epoll.go => epoll.go (75%) delete mode 100644 example/main.go rename v2/handler.go => handler.go (91%) create mode 100644 main.go delete mode 100644 options.go rename v2/resp.go => resp.go (100%) rename v2/rotom.go => rotom.go (67%) delete mode 100644 v2/main.go delete mode 100644 v2/utils.go diff --git a/.gitignore b/.gitignore index 1bf5648..3eadea3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ tmp-* coverage.* -rotom \ No newline at end of file +*.aof \ No newline at end of file diff --git a/v2/aof.go b/aof.go similarity index 100% rename from v2/aof.go rename to aof.go diff --git a/batch.go b/batch.go deleted file mode 100644 index 79e438c..0000000 --- a/batch.go +++ /dev/null @@ -1,78 +0,0 @@ -package rotom - -import ( - "fmt" - - "github.com/xgzlucario/rotom/codeman" -) - -type ( - // For Set, HSet - Batch struct { - Key string - Val []byte - Timestamp int64 - } - - // For ZSet - ZSBatch struct { - Key string - Score int64 - } -) - -func checkts(batches []*Batch) { - for _, b := range batches { - if b.Timestamp < 0 { - panic(fmt.Sprintf("error: batch key `%s` timestamp is negetive", b.Key)) - } - } -} - -// BatchSet -func (db *DB) BatchSet(batches ...*Batch) { - checkts(batches) - codec := codeman.NewCodec() - - for _, b := range batches { - codec = codec.Byte(byte(OpSetTx)).Str(b.Key).Int(b.Timestamp).Bytes(b.Val) - db.m.SetTx(b.Key, b.Val, b.Timestamp) - } - db.encode(codec) -} - -// BatchHSet -func (db *DB) BatchHSet(key string, batches ...*Batch) error { - checkts(batches) - - m, err := db.fetchMap(key, true) - if err != nil { - return err - } - codec := newCodec(OpHSetTx).Str(key).Int(int64(len(batches))) - - for _, b := range batches { - codec = codec.Str(b.Key).Bytes(b.Val).Int(b.Timestamp) - m.Set(b.Key, b.Val) - } - db.encode(codec) - - return nil -} - -// BatchZSet -func (db *DB) BatchZSet(key string, batches ...*ZSBatch) error { - m, err := db.fetchZSet(key, true) - if err != nil { - return err - } - codec := newCodec(OpZSet).Str(key).Int(int64(len(batches))) - - for _, b := range batches { - codec = codec.Str(b.Key).Int(b.Score) - m.Set(b.Key, b.Score) - } - db.encode(codec) - - return nil -} diff --git a/batch_test.go b/batch_test.go deleted file mode 100644 index fdb8de6..0000000 --- a/batch_test.go +++ /dev/null @@ -1,125 +0,0 @@ -package rotom - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/assert" -) - -func genKV(i int) (string, []byte) { - k := fmt.Sprintf("k-%08x", i) - v := fmt.Sprintf("v-%08x", i) - return k, []byte(v) -} - -func TestBatchSet(t *testing.T) { - assert := assert.New(t) - const N = 1000 - db, _ := createDB() - - batch := make([]*Batch, 0) - for i := 0; i < N; i++ { - k, v := genKV(i) - batch = append(batch, &Batch{ - Key: k, - Val: []byte(v), - }) - } - db.BatchSet(batch...) - - // get - for i := 0; i < N; i++ { - k, v := genKV(i) - val, ts, err := db.Get(k) - assert.Equal(val, v) - assert.Equal(ts, int64(0)) - assert.Nil(err) - } - - db.Sync() - db.Shrink() - db.Close() - - // reopen - db2, _ := Open(db.GetOptions()) - for i := 0; i < N; i++ { - k, v := genKV(i) - val, ts, err := db2.Get(k) - assert.Equal(val, v) - assert.Equal(ts, int64(0)) - assert.Nil(err) - } -} - -func TestBatchHSet(t *testing.T) { - assert := assert.New(t) - const N = 1000 - db, _ := createDB() - - batches := make([]*Batch, 0, N) - for i := 0; i < N; i++ { - k, v := genKV(i) - batches = append(batches, &Batch{ - Key: k, - Val: []byte(v), - }) - } - db.BatchHSet("map", batches...) - - // get - for i := 0; i < N; i++ { - k, v := genKV(i) - val, err := db.HGet("map", k) - assert.Equal(val, v) - assert.Nil(err) - } - - db.Shrink() - db.Close() - - // reopen - db2, _ := Open(db.GetOptions()) - for i := 0; i < N; i++ { - k, v := genKV(i) - val, err := db2.HGet("map", k) - assert.Equal(val, v) - assert.Nil(err) - } -} - -func TestBatchZSet(t *testing.T) { - assert := assert.New(t) - const N = 1000 - db, _ := createDB() - - batches := make([]*ZSBatch, 0, N) - for i := 0; i < N; i++ { - k, _ := genKV(i) - batches = append(batches, &ZSBatch{ - Key: k, - Score: int64(i), - }) - } - db.BatchZSet("zs", batches...) - - // get - for i := 0; i < N; i++ { - k, _ := genKV(i) - score, err := db.ZGet("zs", k) - assert.Equal(score, int64(i)) - assert.Nil(err) - } - - db.Shrink() - db.Close() - - // reopen - db2, _ := Open(db.GetOptions()) - for i := 0; i < N; i++ { - k, _ := genKV(i) - score, err := db2.ZGet("zs", k) - assert.Equal(score, int64(i)) - assert.Nil(err) - } -} diff --git a/bench_test.go b/bench_test.go new file mode 100644 index 0000000..7354fbe --- /dev/null +++ b/bench_test.go @@ -0,0 +1,43 @@ +package main + +import ( + "testing" + + "github.com/cockroachdb/swiss" +) + +func BenchmarkMap(b *testing.B) { + const N = 100 + + b.Run("stdmap/set", func(b *testing.B) { + m := make(map[int]int, N) + for i := 0; i < b.N; i++ { + m[i%N] = i % N + } + }) + b.Run("swiss/set", func(b *testing.B) { + m := swiss.New[int, int](N) + for i := 0; i < b.N; i++ { + m.Put(i%N, i%N) + } + }) + + b.Run("stdmap/get", func(b *testing.B) { + m := make(map[int]int, N) + for i := 0; i < N; i++ { + m[i] = i + } + for i := 0; i < b.N; i++ { + _ = m[i%N] + } + }) + b.Run("swiss/get", func(b *testing.B) { + m := swiss.New[int, int](N) + for i := 0; i < N; i++ { + m.Put(i, i) + } + for i := 0; i < b.N; i++ { + m.Get(i % N) + } + }) +} diff --git a/codeman/codec.go b/codeman/codec.go deleted file mode 100644 index 26a946a..0000000 --- a/codeman/codec.go +++ /dev/null @@ -1,107 +0,0 @@ -package codeman - -import ( - "encoding/binary" - "unsafe" - - "golang.org/x/exp/constraints" -) - -const ( - _true = 1 - _false = 0 -) - -// Codec is the primary type for encoding data into a specific format. -type Codec struct { - b []byte -} - -type Integer constraints.Integer - -// NewCodec -func NewCodec() *Codec { - return &Codec{b: make([]byte, 0, 16)} -} - -func (s *Codec) Recycle() { -} - -func (s *Codec) Content() []byte { - return s.b -} - -func (s *Codec) Str(v string) *Codec { - return s.formatString(v) -} - -func (s *Codec) Byte(v byte) *Codec { - s.b = append(s.b, v) - return s -} - -func (s *Codec) Bytes(v []byte) *Codec { - return s.format(v) -} - -func (s *Codec) Bool(v bool) *Codec { - if v { - s.b = append(s.b, _true) - } else { - s.b = append(s.b, _false) - } - return s -} - -func (s *Codec) Uint32(v uint32) *Codec { - s.b = binary.AppendUvarint(s.b, uint64(v)) - return s -} - -func (s *Codec) Int(v int64) *Codec { - s.b = binary.AppendVarint(s.b, v) - return s -} - -func (s *Codec) StrSlice(v []string) *Codec { - s.b = append(s.b, formatStrSlice(v)...) - return s -} - -func (s *Codec) Uint32Slice(v []uint32) *Codec { - s.b = append(s.b, formatNumberSlice(v)...) - return s -} - -func (s *Codec) format(v []byte) *Codec { - return s.formatString(b2s(v)) -} - -func (s *Codec) formatString(v string) *Codec { - s.b = binary.AppendUvarint(s.b, uint64(len(v))) - s.b = append(s.b, v...) - return s -} - -func formatStrSlice(s []string) []byte { - data := make([]byte, 0, len(s)*2+1) - data = binary.AppendUvarint(data, uint64(len(s))) - for _, v := range s { - data = binary.AppendUvarint(data, uint64(len(v))) - data = append(data, v...) - } - return data -} - -func formatNumberSlice[T Integer](s []T) []byte { - data := make([]byte, 0, len(s)+1) - data = binary.AppendUvarint(data, uint64(len(s))) - for _, v := range s { - data = binary.AppendUvarint(data, uint64(v)) - } - return data -} - -func b2s(b []byte) string { - return *(*string)(unsafe.Pointer(&b)) -} diff --git a/codeman/reader.go b/codeman/reader.go deleted file mode 100644 index 1d390b5..0000000 --- a/codeman/reader.go +++ /dev/null @@ -1,91 +0,0 @@ -package codeman - -import ( - "encoding/binary" -) - -type Reader struct { - b []byte -} - -func NewReader(buf []byte) *Reader { - return &Reader{b: buf} -} - -func (s *Reader) read() []byte { - num, i := binary.Uvarint(s.b) - if i == 0 { - panic("codeman/bug: reader is done") - } - end := i + int(num) - - // bound check. - _ = s.b[end-1] - - res := s.b[i:end] - s.b = s.b[end:] - - return res -} - -func (s *Reader) readUvarint() uint64 { - num, i := binary.Uvarint(s.b) - if i == 0 { - panic("codeman/bug: reader is done") - } - s.b = s.b[i:] - return num -} - -func (s *Reader) readVarint() int64 { - num, i := binary.Varint(s.b) - if i == 0 { - panic("codeman/bug: reader is done") - } - s.b = s.b[i:] - return num -} - -func (s *Reader) Done() bool { - return len(s.b) == 0 -} - -func (s *Reader) RawBytes() []byte { - return s.read() -} - -func (s *Reader) Str() string { - return string(s.read()) -} - -func (s *Reader) StrSlice() []string { - data := make([]string, s.readUvarint()) - for i := range data { - data[i] = s.Str() - } - return data -} - -func (s *Reader) Uint32Slice() []uint32 { - data := make([]uint32, s.readUvarint()) - for i := range data { - data[i] = s.Uint32() - } - return data -} - -func (s *Reader) Uint32() uint32 { - return uint32(s.readUvarint()) -} - -func (s *Reader) Int64() int64 { - return int64(s.readVarint()) -} - -func (s *Reader) Bool() bool { - return s.readUvarint() == _true -} - -func (s *Reader) Byte() byte { - return byte(s.readUvarint()) -} diff --git a/v2/config.go b/config.go similarity index 100% rename from v2/config.go rename to config.go diff --git a/v2/config.json b/config.json similarity index 67% rename from v2/config.json rename to config.json index 5748f95..e9e210c 100644 --- a/v2/config.json +++ b/config.json @@ -1,5 +1,5 @@ { "port": 6767, - "appendonly": true, + "appendonly": false, "appendfilename": "appendonly.aof" } diff --git a/db.go b/db.go deleted file mode 100644 index f1f5a92..0000000 --- a/db.go +++ /dev/null @@ -1,1016 +0,0 @@ -// Package rotom provides an in-memory key-value database. -package rotom - -import ( - "fmt" - "io" - "os" - "path/filepath" - "strconv" - "sync" - "time" - - cmap "github.com/orcaman/concurrent-map/v2" - "github.com/rosedblabs/wal" - cache "github.com/xgzlucario/GigaCache" - "github.com/xgzlucario/rotom/codeman" - "github.com/xgzlucario/rotom/structx" -) - -const ( - noTTL = 0 - - mergeTypeAnd byte = iota + 1 - mergeTypeOr - mergeTypeXOr -) - -// Operation needs redo. -type Operation byte - -const ( - OpSetTx Operation = iota // for string - OpSetObject // for data structure - OpSetTTL - OpRemove - // map - OpHSetTx - OpHRemove - // set - OpSAdd - OpSRemove - OpSMerge - // list - OpLPush - OpRPush - OpLPop - OpRPop - OpLSet - // bitmap - OpBitSet - OpBitFlip - OpBitMerge - // zset - OpZSet - OpZIncr - OpZRemove -) - -type Cmd struct { - op Operation - hook func(*DB, *codeman.Reader) error -} - -// cmdTable defines how each command recover database from redo log. -var cmdTable = []Cmd{ - {OpSetTx, func(db *DB, reader *codeman.Reader) error { - // key, ts, val - key := reader.Str() - ts := reader.Int64() - val := reader.RawBytes() - // check ttl. - if ts > cache.GetNanoSec() || ts == noTTL { - db.SetTx(key, val, ts) - } - return nil - }}, - {OpSetObject, func(db *DB, reader *codeman.Reader) error { - // type, key, val - tp := reader.Int64() - key := reader.Str() - val := reader.RawBytes() - - switch tp { - case TypeList: - ls := structx.NewList() - if err := ls.UnmarshalBinary(val); err != nil { - return err - } - db.cm.Set(key, ls) - - case TypeSet: - s := structx.NewSet() - if err := s.UnmarshalJSON(val); err != nil { - return err - } - db.cm.Set(key, s) - - case TypeMap: - m := structx.NewMap() - if err := m.UnmarshalJSON(val); err != nil { - return err - } - db.cm.Set(key, m) - - case TypeBitmap: - m := structx.NewBitmap() - if err := m.UnmarshalBinary(val); err != nil { - return err - } - db.cm.Set(key, m) - - case TypeZSet: - m := structx.NewZSet() - if err := m.UnmarshalJSON(val); err != nil { - return err - } - db.cm.Set(key, m) - } - return nil - }}, - {OpSetTTL, func(db *DB, reader *codeman.Reader) error { - // key, ts - db.SetTTL(reader.Str(), reader.Int64()) - return nil - }}, - {OpRemove, func(db *DB, reader *codeman.Reader) error { - // keys - db.Remove(reader.StrSlice()...) - return nil - }}, - {OpHSetTx, func(db *DB, reader *codeman.Reader) (err error) { - // key, count - key := reader.Str() - count := reader.Int64() - for i := int64(0); i < count; i++ { - // field, val, ts - err = db.HSetTx(key, reader.Str(), reader.RawBytes(), reader.Int64()) - } - return - }}, - {OpHRemove, func(db *DB, reader *codeman.Reader) error { - // key, fields - _, err := db.HRemove(reader.Str(), reader.StrSlice()...) - return err - }}, - {OpSAdd, func(db *DB, reader *codeman.Reader) error { - // key, items - _, err := db.SAdd(reader.Str(), reader.StrSlice()...) - return err - }}, - {OpSRemove, func(db *DB, reader *codeman.Reader) error { - // key, items - return db.SRemove(reader.Str(), reader.StrSlice()...) - }}, - {OpSMerge, func(db *DB, reader *codeman.Reader) error { - // op, key, items - op := reader.Byte() - key := reader.Str() - items := reader.StrSlice() - - switch op { - case mergeTypeAnd: - return db.SInter(key, items...) - case mergeTypeOr: - return db.SUnion(key, items...) - default: - return db.SDiff(key, items...) - } - }}, - {OpLPush, func(db *DB, reader *codeman.Reader) error { - // key, items - return db.LPush(reader.Str(), reader.StrSlice()...) - }}, - {OpRPush, func(db *DB, reader *codeman.Reader) error { - // key, items - return db.RPush(reader.Str(), reader.StrSlice()...) - }}, - {OpLPop, func(db *DB, reader *codeman.Reader) error { - // key - _, err := db.LPop(reader.Str()) - return err - }}, - {OpRPop, func(db *DB, reader *codeman.Reader) error { - // key - _, err := db.RPop(reader.Str()) - return err - }}, - {OpLSet, func(db *DB, reader *codeman.Reader) error { - // key, index, item - _, err := db.LSet(reader.Str(), int(reader.Int64()), reader.Str()) - return err - }}, - {OpBitSet, func(db *DB, reader *codeman.Reader) error { - // key, val, offsets - _, err := db.BitSet(reader.Str(), reader.Bool(), reader.Uint32Slice()...) - return err - }}, - {OpBitFlip, func(db *DB, reader *codeman.Reader) error { - // key, offset - return db.BitFlip(reader.Str(), reader.Uint32(), reader.Uint32()) - }}, - {OpBitMerge, func(db *DB, reader *codeman.Reader) error { - // op, key, items - op := reader.Byte() - key := reader.Str() - items := reader.StrSlice() - - switch op { - case mergeTypeAnd: - return db.BitAnd(key, items...) - case mergeTypeOr: - return db.BitOr(key, items...) - default: - return db.BitXor(key, items...) - } - }}, - {OpZSet, func(db *DB, reader *codeman.Reader) (err error) { - // key, count - key := reader.Str() - count := reader.Int64() - for i := int64(0); i < count; i++ { - // field, score - err = db.ZAdd(key, reader.Str(), reader.Int64()) - } - return - }}, - {OpZIncr, func(db *DB, reader *codeman.Reader) error { - // key, field, score - _, err := db.ZIncr(reader.Str(), reader.Str(), reader.Int64()) - return err - }}, - {OpZRemove, func(db *DB, reader *codeman.Reader) error { - // key, field - return db.ZRemove(reader.Str(), reader.Str()) - }}, -} - -// Type is the data type for Rotom. -type Type = int64 - -const ( - TypeString Type = iota + 1 - TypeMap - TypeSet - TypeList - TypeZSet - TypeBitmap -) - -// Type aliases for built-in types. -type ( - Map = *structx.Map - Set = *structx.Set - List = *structx.List - ZSet = *structx.ZSet - BitMap = *structx.Bitmap -) - -// DB represents a rotom database. -type DB struct { - mu sync.Mutex - options *Options - wal *wal.WAL - loading bool // is loading finished from wal. - closed bool - m *cache.GigaCache // data for strings. - cm cmap.ConcurrentMap[string, any] // data for built-in types. -} - -// Open create a new db instance by options. -func Open(options Options) (*DB, error) { - if err := checkOptions(options); err != nil { - return nil, err - } - - // create wal. - walOptions := wal.DefaultOptions - walOptions.DirPath = options.DirPath - wal, err := wal.Open(walOptions) - if err != nil { - return nil, err - } - - // init db instance. - cacheOptions := cache.DefaultOptions - cacheOptions.ShardCount = options.ShardCount - db := &DB{ - options: &options, - loading: true, - wal: wal, - m: cache.New(cacheOptions), - cm: cmap.New[any](), - } - - // load db from wal. - if err := db.loadFromWal(); err != nil { - return nil, err - } - db.loading = false - - return db, nil -} - -// Close the database, close all data files and release file lock. -func (db *DB) Close() error { - db.mu.Lock() - defer db.mu.Unlock() - - if db.closed { - return ErrDatabaseClosed - } - db.closed = true - - return db.wal.Close() -} - -// GetOptions -func (db *DB) GetOptions() Options { - return *db.options -} - -func (db *DB) encode(cd *codeman.Codec) { - if db.loading { - return - } - db.wal.Write(cd.Content()) - cd.Recycle() -} - -// Sync -func (db *DB) Sync() error { - return db.wal.Sync() -} - -func newCodec(op Operation) *codeman.Codec { - return codeman.NewCodec().Byte(byte(op)) -} - -// Get -func (db *DB) Get(key string) ([]byte, int64, error) { - val, ts, ok := db.m.Get(key) - if !ok { - return nil, 0, ErrKeyNotFound - } - return val, ts, nil -} - -// Set store key-value pair. -func (db *DB) Set(key string, val []byte) { - db.SetTx(key, val, noTTL) -} - -// SetEx store key-value pair with ttl. -func (db *DB) SetEx(key string, val []byte, ttl time.Duration) { - db.SetTx(key, val, cache.GetNanoSec()+int64(ttl)) -} - -// SetTx store key-value pair with deadline. -func (db *DB) SetTx(key string, val []byte, ts int64) { - db.BatchSet(&Batch{key, val, ts}) -} - -// SetNx stoe key-value if not exist. -func (db *DB) SetNx(key string, val []byte, ts int64) bool { - _, _, ok := db.m.Get(key) - if ok { - return false - } - db.SetTx(key, val, ts) - return true -} - -// SetTTL set expired time of key-value. -func (db *DB) SetTTL(key string, ts int64) bool { - if ts < 0 { - return false - } - db.encode(newCodec(OpSetTTL).Str(key).Int(ts)) - return db.m.SetTTL(key, ts) -} - -// Incr increase number to key. -func (db *DB) Incr(key string, incr int64) (n int64, err error) { - val, ts, ok := db.m.Get(key) - if ok { - n, err = strconv.ParseInt(b2s(val), 10, 64) - if err != nil { - return - } - } - n += incr - valNew := strconv.FormatInt(n, 10) - db.m.SetTx(key, s2b(&valNew), ts) - db.encode(newCodec(OpSetTx).Str(key).Int(ts).Str(valNew)) - return -} - -// Remove -func (db *DB) Remove(keys ...string) (n int) { - db.encode(newCodec(OpRemove).StrSlice(keys)) - for _, key := range keys { - if db.m.Remove(key) { - n++ - } else if db.cm.Has(key) { - db.cm.Remove(key) - n++ - } - } - return -} - -// Len -func (db *DB) Len() int { - return db.m.GetStats().Len + db.cm.Count() -} - -// GC triggers the garbage collection to evict expired kv datas. -func (db *DB) GC() { - db.m.Migrate() -} - -// Scan -func (db *DB) Scan(f func([]byte, []byte, int64) bool) { - db.m.Scan(func(key, val []byte, ttl int64) bool { - return f(key, val, ttl) - }) -} - -// HGet -func (db *DB) HGet(key, field string) ([]byte, error) { - m, err := db.fetchMap(key) - if err != nil { - return nil, err - } - res, _, ok := m.Get(field) - if !ok { - return nil, ErrFieldNotFound - } - return res, nil -} - -// HLen -func (db *DB) HLen(key string) (int, error) { - m, err := db.fetchMap(key) - if err != nil { - return 0, err - } - return m.Len(), nil -} - -// HSet -func (db *DB) HSet(key, field string, val []byte) error { - return db.BatchHSet(key, &Batch{Key: field, Val: val}) -} - -// HSetTx -func (db *DB) HSetTx(key, field string, val []byte, ts int64) error { - return db.BatchHSet(key, &Batch{Key: field, Val: val, Timestamp: ts}) -} - -// HRemove -func (db *DB) HRemove(key string, fields ...string) (n int, err error) { - m, err := db.fetchMap(key) - if err != nil { - return 0, err - } - db.encode(newCodec(OpHRemove).Str(key).StrSlice(fields)) - for _, k := range fields { - if m.Remove(k) { - n++ - } - } - return -} - -// SAdd -func (db *DB) SAdd(key string, items ...string) (int, error) { - s, err := db.fetchSet(key, true) - if err != nil { - return 0, err - } - db.encode(newCodec(OpSAdd).Str(key).StrSlice(items)) - return s.Append(items...), nil -} - -// SRemove -func (db *DB) SRemove(key string, items ...string) error { - s, err := db.fetchSet(key) - if err != nil { - return err - } - db.encode(newCodec(OpSRemove).Str(key).StrSlice(items)) - s.RemoveAll(items...) - return nil -} - -// SHas returns whether the given items are all in the set. -func (db *DB) SHas(key string, items ...string) (bool, error) { - s, err := db.fetchSet(key) - if err != nil { - return false, err - } - return s.Contains(items...), nil -} - -// SCard -func (db *DB) SCard(key string) (int, error) { - s, err := db.fetchSet(key) - if err != nil { - return 0, err - } - return s.Cardinality(), nil -} - -// SMembers -func (db *DB) SMembers(key string) ([]string, error) { - s, err := db.fetchSet(key) - if err != nil { - return nil, err - } - return s.ToSlice(), nil -} - -// SUnion -func (db *DB) SUnion(dst string, src ...string) error { - srcSet, err := db.fetchSet(src[0]) - if err != nil { - return err - } - s := srcSet.Clone() - - for _, key := range src[1:] { - ts, err := db.fetchSet(key) - if err != nil { - return err - } - s.Union(ts) - } - db.encode(newCodec(OpSMerge).Byte(mergeTypeOr).Str(dst).StrSlice(src)) - db.cm.Set(dst, s) - - return nil -} - -// SInter -func (db *DB) SInter(dst string, src ...string) error { - srcSet, err := db.fetchSet(src[0]) - if err != nil { - return err - } - s := srcSet.Clone() - - for _, key := range src[1:] { - ts, err := db.fetchSet(key) - if err != nil { - return err - } - s.Intersect(ts) - } - db.encode(newCodec(OpSMerge).Byte(mergeTypeAnd).Str(dst).StrSlice(src)) - db.cm.Set(dst, s) - - return nil -} - -// SDiff -func (db *DB) SDiff(dst string, src ...string) error { - srcSet, err := db.fetchSet(src[0]) - if err != nil { - return err - } - s := srcSet.Clone() - - for _, key := range src[1:] { - ts, err := db.fetchSet(key) - if err != nil { - return err - } - s.Difference(ts) - } - db.encode(newCodec(OpSMerge).Byte(mergeTypeXOr).Str(dst).StrSlice(src)) - db.cm.Set(dst, s) - - return nil -} - -// LPush -func (db *DB) LPush(key string, items ...string) error { - ls, err := db.fetchList(key, true) - if err != nil { - return err - } - db.encode(newCodec(OpLPush).Str(key).StrSlice(items)) - ls.LPush(items...) - - return nil -} - -// RPush -func (db *DB) RPush(key string, items ...string) error { - ls, err := db.fetchList(key, true) - if err != nil { - return err - } - db.encode(newCodec(OpRPush).Str(key).StrSlice(items)) - ls.RPush(items...) - - return nil -} - -// LIndex -func (db *DB) LIndex(key string, i int) (string, error) { - ls, err := db.fetchList(key) - if err != nil { - return "", err - } - res, ok := ls.Index(i) - if !ok { - return "", ErrIndexOutOfRange - } - return res, nil -} - -// LPop -func (db *DB) LPop(key string) (string, error) { - ls, err := db.fetchList(key) - if err != nil { - return "", err - } - res, ok := ls.LPop() - if !ok { - return "", ErrEmptyList - } - db.encode(newCodec(OpLPop).Str(key)) - - return res, nil -} - -// RPop -func (db *DB) RPop(key string) (string, error) { - ls, err := db.fetchList(key) - if err != nil { - return "", err - } - res, ok := ls.RPop() - if !ok { - return "", ErrEmptyList - } - db.encode(newCodec(OpRPop).Str(key)) - - return res, nil -} - -// LLen -func (db *DB) LLen(key string) (int, error) { - ls, err := db.fetchList(key) - if err != nil { - return 0, err - } - return ls.Size(), nil -} - -// LSet -func (db *DB) LSet(key string, index int, item string) (bool, error) { - ls, err := db.fetchList(key) - if err != nil { - return false, err - } - db.encode(newCodec(OpLSet).Str(key).Int(int64(index)).Str(item)) - return ls.Set(index, item), nil -} - -// LRange -func (db *DB) LRange(key string, start, end int, f func(string) (stop bool)) error { - ls, err := db.fetchList(key) - if err != nil { - return err - } - ls.Range(start, end, func(data []byte) bool { - return f(string(data)) - }) - return nil -} - -// BitTest -func (db *DB) BitTest(key string, offset uint32) (bool, error) { - bm, err := db.fetchBitMap(key) - if err != nil { - return false, err - } - return bm.Test(offset), nil -} - -// BitSet -func (db *DB) BitSet(key string, val bool, offsets ...uint32) (int, error) { - bm, err := db.fetchBitMap(key, true) - if err != nil { - return 0, err - } - db.encode(newCodec(OpBitSet).Str(key).Bool(val).Uint32Slice(offsets)) - - var n int - if val { - n = bm.Add(offsets...) - } else { - n = bm.Remove(offsets...) - } - - return n, nil -} - -// BitFlip -func (db *DB) BitFlip(key string, start, end uint32) error { - bm, err := db.fetchBitMap(key) - if err != nil { - return err - } - db.encode(newCodec(OpBitFlip).Str(key).Uint32(start).Uint32(end)) - bm.Flip(uint64(start), uint64(end)) - - return nil -} - -// BitAnd -func (db *DB) BitAnd(dst string, src ...string) error { - bm, err := db.fetchBitMap(src[0]) - if err != nil { - return err - } - bm = bm.Clone() - - for _, key := range src[1:] { - tbm, err := db.fetchBitMap(key) - if err != nil { - return err - } - bm.And(tbm) - } - db.encode(newCodec(OpBitMerge).Byte(mergeTypeAnd).Str(dst).StrSlice(src)) - db.cm.Set(dst, bm) - - return nil -} - -// BitOr -func (db *DB) BitOr(dst string, src ...string) error { - bm, err := db.fetchBitMap(src[0]) - if err != nil { - return err - } - bm = bm.Clone() - - for _, key := range src[1:] { - tbm, err := db.fetchBitMap(key) - if err != nil { - return err - } - bm.Or(tbm) - } - db.encode(newCodec(OpBitMerge).Byte(mergeTypeOr).Str(dst).StrSlice(src)) - db.cm.Set(dst, bm) - - return nil -} - -// BitXor -func (db *DB) BitXor(dst string, src ...string) error { - bm, err := db.fetchBitMap(src[0]) - if err != nil { - return err - } - bm = bm.Clone() - - for _, key := range src[1:] { - tbm, err := db.fetchBitMap(key) - if err != nil { - return err - } - bm.Xor(tbm) - } - db.encode(newCodec(OpBitMerge).Byte(mergeTypeXOr).Str(dst).StrSlice(src)) - db.cm.Set(dst, bm) - - return nil -} - -// BitArray -func (db *DB) BitArray(key string) ([]uint32, error) { - bm, err := db.fetchBitMap(key) - if err != nil { - return nil, err - } - return bm.ToArray(), nil -} - -// BitCount -func (db *DB) BitCount(key string) (uint64, error) { - bm, err := db.fetchBitMap(key) - if err != nil { - return 0, err - } - return bm.Len(), err -} - -// ZAdd -func (db *DB) ZAdd(zset, key string, score int64) error { - return db.BatchZSet(zset, &ZSBatch{Key: key, Score: score}) -} - -// ZGet -func (db *DB) ZGet(zset, key string) (int64, error) { - zs, err := db.fetchZSet(zset) - if err != nil { - return 0, err - } - score, ok := zs.Get(key) - if !ok { - return 0, ErrKeyNotFound - } - return score, nil -} - -// ZCard -func (db *DB) ZCard(zset string) (int, error) { - zs, err := db.fetchZSet(zset) - if err != nil { - return 0, err - } - return zs.Len(), nil -} - -// ZIter -func (db *DB) ZIter(zset string, f func(string, int64) bool) error { - zs, err := db.fetchZSet(zset) - if err != nil { - return err - } - zs.Iter(func(k string, s int64) bool { - return f(k, s) - }) - return nil -} - -// ZIncr -func (db *DB) ZIncr(zset, key string, incr int64) (int64, error) { - zs, err := db.fetchZSet(zset, true) - if err != nil { - return 0, err - } - db.encode(newCodec(OpZIncr).Str(zset).Str(key).Int(incr)) - - return zs.Incr(key, incr), nil -} - -// ZRemove -func (db *DB) ZRemove(zset string, key string) error { - zs, err := db.fetchZSet(zset) - if err != nil { - return err - } - db.encode(newCodec(OpZRemove).Str(zset).Str(key)) - zs.Delete(key) - - return nil -} - -// loadFromWal load data to mem from wal. -func (db *DB) loadFromWal() error { - reader := db.wal.NewReader() - for { - data, _, err := reader.Next() - if err == io.EOF { - break - } - - // read all records. - for rd := codeman.NewReader(data); !rd.Done(); { - op := Operation(rd.Byte()) - if err := cmdTable[op].hook(db, rd); err != nil { - return err - } - } - } - return nil -} - -// Shrink uses `RDB` to create database snapshots to disk. -func (db *DB) Shrink() error { - db.mu.Lock() - defer db.mu.Unlock() - - // create new segment file. - segmentId := db.wal.ActiveSegmentID() - if err := db.wal.OpenNewActiveSegment(); err != nil { - return err - } - - var pendWriteSize int - // writeAll write all wal pending data to disk. - writeAll := func() { - db.wal.WriteAll() - pendWriteSize = 0 - } - // write write to wal buffer pending. - write := func(b []byte) { - db.wal.PendingWrites(b) - pendWriteSize += len(b) - if pendWriteSize >= wal.MB { - writeAll() - } - } - - // marshal string datas. - db.m.Scan(func(key, val []byte, ts int64) bool { - cd := newCodec(OpSetTx).Bytes(key).Int(ts).Bytes(val) - write(cd.Content()) - return true - }) - writeAll() - - // marshal built-in types. - var types Type - var data []byte - var err error - - for t := range db.cm.IterBuffered() { - switch item := t.Val.(type) { - case Map: - types = TypeMap - data, err = item.MarshalJSON() - case BitMap: - types = TypeBitmap - data, err = item.MarshalBinary() - case List: - types = TypeList - data, err = item.MarshalBinary() - case Set: - types = TypeSet - data, err = item.MarshalJSON() - case ZSet: - types = TypeZSet - data, err = item.MarshalJSON() - } - if err != nil { - return err - } - cd := newCodec(OpSetObject).Int(types).Str(t.Key).Bytes(data) - write(cd.Content()) - } - writeAll() - - if err := db.wal.Sync(); err != nil { - return err - } - - // remove all old segment files. - return db.removeOldSegments(segmentId) -} - -func (db *DB) removeOldSegments(maxSegmentId uint32) error { - segmentName := fmt.Sprintf("%09d.SEG", maxSegmentId) - - return filepath.WalkDir(db.options.DirPath, func(path string, file os.DirEntry, err error) error { - if file.Name() <= segmentName { - return os.Remove(path) - } - return err - }) -} - -func (db *DB) fetchMap(key string, setnx ...bool) (m Map, err error) { - return fetch(db, key, func() Map { - return structx.NewMap() - }, setnx...) -} - -func (db *DB) fetchSet(key string, setnx ...bool) (s Set, err error) { - return fetch(db, key, func() Set { - return structx.NewSet() - }, setnx...) -} - -func (db *DB) fetchList(key string, setnx ...bool) (m List, err error) { - return fetch(db, key, func() List { - return structx.NewList() - }, setnx...) -} - -func (db *DB) fetchBitMap(key string, setnx ...bool) (bm BitMap, err error) { - return fetch(db, key, func() BitMap { - return structx.NewBitmap() - }, setnx...) -} - -func (db *DB) fetchZSet(key string, setnx ...bool) (z ZSet, err error) { - return fetch(db, key, func() ZSet { - return structx.NewZSet() - }, setnx...) -} - -func fetch[T any](db *DB, key string, new func() T, setnx ...bool) (v T, err error) { - item, ok := db.cm.Get(key) - if ok { - v, ok := item.(T) - if ok { - return v, nil - } - return v, fmt.Errorf("%w: %T->%T", ErrWrongType, item, v) - } - - v = new() - if len(setnx) > 0 && setnx[0] { - db.cm.Set(key, v) - } - return v, nil -} diff --git a/db_test.go b/db_test.go deleted file mode 100644 index 324640c..0000000 --- a/db_test.go +++ /dev/null @@ -1,880 +0,0 @@ -package rotom - -import ( - "errors" - "fmt" - "math/rand/v2" - "strconv" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/xgzlucario/rotom/codeman" -) - -var ( - nilBytes []byte - nilStrings []string -) - -func createDB() (*DB, error) { - options := DefaultOptions - options.ShardCount = 4 - options.DirPath = fmt.Sprintf("tmp-%x", time.Now().UnixNano()) - return Open(options) -} - -func TestDB(t *testing.T) { - t.Parallel() - assert := assert.New(t) - const N = 5000 - db, err := createDB() - assert.Nil(err) - - // Test db operations - for i := 0; i < N; i++ { - key := strconv.Itoa(i) - val := []byte(strconv.Itoa(i)) - db.Set("set-"+key, val) - db.SetEx("ttl-"+key, val, time.Minute) - db.SetEx("expired-"+key, val, time.Second) - } - time.Sleep(time.Second * 2) - - // Test get. - var ts int64 - now := time.Now().UnixNano() - - for i := 0; i < N; i++ { - key := strconv.Itoa(i) - // set - val, _, err := db.Get("set-" + key) - assert.Nil(err) - assert.Equal(val, []byte(key)) - // ttl - val, ts, err = db.Get("ttl-" + key) - assert.Nil(err) - assert.Equal(val, []byte(key)) - assert.True(ts > now) - // expired - val, ts, err = db.Get("expired-" + key) - assert.Equal(val, nilBytes) - assert.Equal(err, ErrKeyNotFound) - assert.Equal(ts, int64(0)) - // invalid - val, ts, err = db.Get("invalid-" + key) - assert.Equal(val, nilBytes) - assert.Equal(err, ErrKeyNotFound) - assert.Equal(ts, int64(0)) - } - - // Scan - var count int - db.Scan(func(key, val []byte, ts int64) bool { - count++ - return true - }) - assert.Equal(count, N*2) - assert.Equal(int(db.Len()), N*3) - - // GC - db.GC() - count = 0 - db.Scan(func(key, val []byte, ts int64) bool { - count++ - return true - }) - assert.Equal(count, N*2) - assert.Equal(int(db.Len()), N*2) - - // Error - val, _, err := db.Get("map") - assert.Equal(val, nilBytes) - assert.Equal(err, ErrKeyNotFound) - - val, _, err = db.Get("none") - assert.Equal(val, nilBytes) - assert.Equal(err, ErrKeyNotFound) - - // Remove - n := db.Remove("set-1", "set-2", "set-3") - assert.Equal(n, 3) - - // close - assert.Nil(db.Close()) - assert.Equal(db.Close(), ErrDatabaseClosed) - - // Load Success - _, err = Open(db.GetOptions()) - assert.Nil(err) - - t.Run("setTTL", func(t *testing.T) { - db, _ := createDB() - - db.HSet("hmap", "k", []byte("v")) - n := db.Remove("hmap") - assert.Equal(n, 1) - - assert.False(db.SetTTL("h", -1)) - - ts := time.Now().Add(time.Minute).UnixNano() - for i := 0; i < 100; i++ { - k := fmt.Sprintf("%08d", i) - db.SetTx(k, []byte(k), ts) - } - // set ttl - for i := 0; i < 100; i++ { - k := fmt.Sprintf("%08d", i) - assert.True(db.SetTTL(k, 0)) - } - - db.Close() - db, _ = Open(db.GetOptions()) - - // check ttl - for i := 0; i < 100; i++ { - k := fmt.Sprintf("%08d", i) - v, ts, err := db.Get(k) - assert.Equal(string(v), k) - assert.Equal(int64(0), ts) - assert.Nil(err) - } - }) - - t.Run("error", func(t *testing.T) { - db, _ := createDB() - - assert.Panics(func() { - db.SetTx("k", []byte("a"), -1) - }) - }) -} - -func TestHmap(t *testing.T) { - t.Parallel() - assert := assert.New(t) - db, err := createDB() - assert.Nil(err) - defer db.Close() - - check := func() { - for i := 0; i < 8000; i++ { - mapkey := "map" + strconv.Itoa(i%100) - key := "key" + strconv.Itoa(i) - val := []byte(strconv.Itoa(i)) - - // HGet - res, err := db.HGet(mapkey, key) - assert.Nil(err) - assert.Equal(res, val) - } - } - - for i := 0; i < 10000; i++ { - mapkey := "map" + strconv.Itoa(i%100) - key := "key" + strconv.Itoa(i) - val := []byte(strconv.Itoa(i)) - - // HSet - err := db.HSet(mapkey, key, val) - assert.Nil(err) - - if i > 8000 { - // HRemove - n, err := db.HRemove(mapkey, key) - assert.Nil(err) - assert.Equal(n, 1) - } - } - - check() - - // reload - db.Close() - db, err = Open(db.GetOptions()) - assert.Nil(err) - - check() - - // shrink and reload - db.Shrink() - db.Close() - _, err = Open(db.GetOptions()) - assert.Nil(err) - - check() - - // Error - db.LPush("fake", "123") - - err = db.HSet("fake", "a", []byte("b")) - assert.ErrorContains(err, ErrWrongType.Error()) - - res, err := db.HLen("fake") - assert.Equal(res, 0) - assert.ErrorContains(err, ErrWrongType.Error()) - - n, err := db.HRemove("fake", "foo") - assert.Equal(n, 0) - assert.ErrorContains(err, ErrWrongType.Error()) - - db.HSet("map", "m1", []byte("m2")) - { - res, err := db.HGet("fake", "none") - assert.Nil(res) - assert.ErrorContains(err, ErrWrongType.Error()) - } - { - res, err := db.HGet("map", "none") - assert.Nil(res) - assert.Equal(err, ErrFieldNotFound) - } -} - -func randString() string { - return fmt.Sprintf("%08x", rand.Uint32()) -} - -func TestList(t *testing.T) { - t.Parallel() - assert := assert.New(t) - db, err := createDB() - assert.Nil(err) - - for i := 0; i < 5000; i++ { - key := "list" + strconv.Itoa(i/1000) - val := randString() - - switch i % 3 { - case 0: - assert.Nil(db.RPush(key, val)) - case 1: - assert.Nil(db.LPush(key, val)) - // check - res, err := db.LIndex(key, 0) - assert.Nil(err) - assert.Equal(res, val) - case 2: - newKey := fmt.Sprintf("reset%d", i) - ok, _ := db.LSet(key, 0, newKey) - // check - res, err := db.LIndex(key, 0) - if errors.Is(err, ErrIndexOutOfRange) { - assert.Equal(res, "") - assert.False(ok) - } else { - assert.Nil(err) - assert.Equal(res, newKey) - assert.True(ok) - } - } - - if i > 4000 { - switch i % 3 { - case 0: - res, err := db.RPop(key) - assert.Nil(err) - assert.Equal(res, val) - case 1: - res, err := db.LPop(key) - assert.Nil(err) - assert.Equal(res, val) - } - } - - num, err := db.LLen(key) - assert.Nil(err) - var count int - err = db.LRange(key, 0, -1, func(s string) (stop bool) { - count++ - return false - }) - assert.Nil(err) - assert.Equal(count, num) - } - - // Error - db.HSet("map", "key", []byte("value")) - - err = db.LPush("map", "1") - assert.ErrorContains(err, ErrWrongType.Error()) - - err = db.RPush("map", "1") - assert.ErrorContains(err, ErrWrongType.Error()) - - _, err = db.LSet("map", 1, "newKey") - assert.ErrorContains(err, ErrWrongType.Error()) - - err = db.LRange("map", 0, -1, func(s string) (stop bool) { - return false - }) - assert.ErrorContains(err, ErrWrongType.Error()) - - res, err := db.LPop("map") - assert.Equal(res, "") - assert.ErrorContains(err, ErrWrongType.Error()) - - res, err = db.RPop("map") - assert.Equal(res, "") - assert.ErrorContains(err, ErrWrongType.Error()) - - s, err := db.LIndex("map", 1) - assert.Equal(s, "") - assert.ErrorContains(err, ErrWrongType.Error()) - - n, err := db.LLen("map") - assert.Equal(n, 0) - assert.ErrorContains(err, ErrWrongType.Error()) - - // empty list - { - db.RPush("list", "1") - db.RPop("list") - - res, err = db.LPop("list") - assert.Equal(res, "") - assert.Equal(err, ErrEmptyList) - - res, err = db.RPop("list") - assert.Equal(res, "") - assert.Equal(err, ErrEmptyList) - - res, err = db.LIndex("list", 9) - assert.Equal(res, "") - assert.Equal(err, ErrIndexOutOfRange) - - for i := 0; i < 100; i++ { - db.RPush("list", randString()) - } - } - - // reload - db.Close() - db, err = Open(db.GetOptions()) - assert.Nil(err) - - // shrink and reload - db.Shrink() - db.Close() - _, err = Open(db.GetOptions()) - assert.Nil(err) - - t.Run("lpop", func(t *testing.T) { - db, err := createDB() - assert.Nil(err) - - for i := 0; i < 1000; i++ { - db.RPush("ls", strconv.Itoa(i)) - } - for i := 0; i < 1000; i++ { - v, err := db.LPop("ls") - assert.Equal(v, strconv.Itoa(i)) - assert.Nil(err) - } - }) - - t.Run("rpop", func(t *testing.T) { - db, err := createDB() - assert.Nil(err) - - for i := 0; i < 1000; i++ { - db.LPush("ls", strconv.Itoa(i)) - } - for i := 0; i < 1000; i++ { - v, err := db.RPop("ls") - assert.Equal(v, strconv.Itoa(i)) - assert.Nil(err) - } - }) -} - -func TestSet(t *testing.T) { - t.Parallel() - assert := assert.New(t) - db, err := createDB() - assert.Nil(err) - - // SAdd - for i := 0; i < 1000; i++ { - n, err := db.SAdd("set"+strconv.Itoa(i/100), strconv.Itoa(i)) - assert.Equal(n, 1) - assert.Nil(err) - } - - // SHas - for i := 500; i < 1500; i++ { - ok, err := db.SHas("set"+strconv.Itoa(i/100), strconv.Itoa(i)) - assert.Nil(err) - assert.Equal(ok, i < 1000) - } - - // SRemove - for i := 0; i < 1000; i++ { - key := "set" + strconv.Itoa(i/100) - - if i%2 == 0 { - assert.Nil(db.SRemove(key, strconv.Itoa(i))) - } - - err = db.SRemove(key, "none") - assert.Nil(err) - - // SCard SMembers - n, err1 := db.SCard(key) - m, err2 := db.SMembers(key) - assert.Nil(err1) - assert.Nil(err2) - assert.Equal(n, len(m)) - } - - // Union - for i := 0; i < 1000; i++ { - // Add random data - for i := 0; i < 20; i++ { - stri := strconv.Itoa(i) - db.SAdd("a"+stri, randString()) - db.SAdd("b"+stri, randString()) - } - stri := strconv.Itoa(i) - - err = db.SUnion("union"+stri, "a"+stri, "b"+stri) - assert.Nil(err) - - err = db.SInter("inter"+stri, "a"+stri, "b"+stri) - assert.Nil(err) - - err = db.SDiff("diff"+stri, "a"+stri, "b"+stri) - assert.Nil(err) - - // diff + inter = union - db.SUnion("res"+stri, "inter"+stri, "diff"+stri) - - m1, err1 := db.SMembers("union" + stri) - assert.Nil(err1) - m2, err2 := db.SMembers("res" + stri) - assert.Nil(err2) - assert.ElementsMatch(m1, m2) - } - - // Error - db.SAdd("set", "1") - - db.HSet("map", "key", []byte("1")) - n, err := db.SAdd("map", "1") - assert.Equal(n, 0) - assert.ErrorContains(err, ErrWrongType.Error()) - - ok, err := db.SHas("map", "1") - assert.False(ok) - assert.ErrorContains(err, ErrWrongType.Error()) - - err = db.SRemove("map", "1") - assert.ErrorContains(err, ErrWrongType.Error()) - - n, err = db.SCard("map") - assert.Equal(n, 0) - assert.ErrorContains(err, ErrWrongType.Error()) - - m, err := db.SMembers("map") - assert.Equal(m, nilStrings) - assert.ErrorContains(err, ErrWrongType.Error()) - - err = db.SUnion("", "map", "set") - assert.ErrorContains(err, ErrWrongType.Error()) - err = db.SUnion("", "set", "map") - assert.ErrorContains(err, ErrWrongType.Error()) - - err = db.SDiff("", "map", "set") - assert.ErrorContains(err, ErrWrongType.Error()) - err = db.SDiff("", "set", "map") - assert.ErrorContains(err, ErrWrongType.Error()) - - err = db.SInter("", "map", "set") - assert.ErrorContains(err, ErrWrongType.Error()) - err = db.SInter("", "set", "map") - assert.ErrorContains(err, ErrWrongType.Error()) - - // reload - db.Close() - db, err = Open(db.GetOptions()) - assert.Nil(err) - - // shrink and reload - db.Shrink() - db.Close() - _, err = Open(db.GetOptions()) - assert.Nil(err) -} - -func TestBitmap(t *testing.T) { - t.Parallel() - assert := assert.New(t) - db, err := createDB() - assert.Nil(err) - - for i := 0; i < 1000; i++ { - key := strconv.Itoa(i / 100) - - n, err := db.BitSet(key, true, uint32(i)) - assert.Nil(err) - assert.Equal(n, 1) - - n, err = db.BitSet(key, false, uint32(i)) - assert.Nil(err) - assert.Equal(n, 1) - - n, err = db.BitSet(key, true, uint32(i)) - assert.Nil(err) - assert.Equal(n, 1) - - ok, err := db.BitTest(key, uint32(i)) - assert.True(ok) - assert.Nil(err) - - // Error - db.BitSet("my-bitset", true, 1) - db.LPush("none", "123") - - n, err = db.BitSet("none", true, uint32(i)) - assert.Equal(n, 0) - assert.ErrorContains(err, ErrWrongType.Error()) - - ok, err = db.BitTest("none", uint32(i)) - assert.False(ok) - assert.ErrorContains(err, ErrWrongType.Error()) - - err = db.BitFlip("none", uint32(i), uint32(i+1)) - assert.ErrorContains(err, ErrWrongType.Error()) - - m, err := db.BitArray("none") - assert.Nil(m) - assert.ErrorContains(err, ErrWrongType.Error()) - - num, err := db.BitCount("none") - assert.Equal(num, uint64(0)) - assert.ErrorContains(err, ErrWrongType.Error()) - - err = db.BitAnd("", "none", "my-bitset") - assert.ErrorContains(err, ErrWrongType.Error()) - err = db.BitAnd("", "my-bitset", "none") - assert.ErrorContains(err, ErrWrongType.Error()) - - err = db.BitOr("", "none", "my-bitset") - assert.ErrorContains(err, ErrWrongType.Error()) - err = db.BitOr("", "my-bitset", "none") - assert.ErrorContains(err, ErrWrongType.Error()) - - err = db.BitXor("", "none", "my-bitset") - assert.ErrorContains(err, ErrWrongType.Error()) - err = db.BitXor("", "my-bitset", "none") - assert.ErrorContains(err, ErrWrongType.Error()) - } - - for i := 0; i < 1000; i++ { - // Add random data - for i := 0; i < 20; i++ { - stri := strconv.Itoa(i) - db.BitSet("a"+stri, true, rand.Uint32()) - db.BitSet("b"+stri, true, rand.Uint32()) - } - stri := strconv.Itoa(i) - - err = db.BitOr("or"+stri, "a"+stri, "b"+stri) - assert.Nil(err) - - err = db.BitAnd("and"+stri, "a"+stri, "b"+stri) - assert.Nil(err) - - err = db.BitXor("xor"+stri, "a"+stri, "b"+stri) - assert.Nil(err) - - // xor + and = or - db.BitOr("res"+stri, "and"+stri, "xor"+stri) - - m1, err1 := db.BitArray("or" + stri) - assert.Nil(err1) - n1, errn1 := db.BitCount("or" + stri) - assert.Nil(errn1) - assert.Equal(uint64(len(m1)), n1) - - m2, err2 := db.BitArray("res" + stri) - assert.Nil(err2) - assert.ElementsMatch(m1, m2) - } - - // flip - for i := 0; i < 1000; i++ { - db.BitSet("bf", true, uint32(i)) - } - db.BitFlip("bf", 500, 1500) - for i := 500; i < 1000; i++ { - ok, _ := db.BitTest("bf", uint32(i)) - assert.False(ok) - } - for i := 1000; i < 1500; i++ { - ok, _ := db.BitTest("bf", uint32(i)) - assert.True(ok) - } - - // reload - db.Close() - db, err = Open(db.GetOptions()) - assert.Nil(err) - - // shrink and reload - db.Shrink() - db.Close() - _, err = Open(db.GetOptions()) - assert.Nil(err) -} - -func TestZSet(t *testing.T) { - t.Parallel() - assert := assert.New(t) - db, err := createDB() - assert.Nil(err) - - genKey := func(i int) string { - return fmt.Sprintf("k-%06x", i) - } - - // ZAdd - for i := 0; i < 1000; i++ { - err := db.ZAdd("zset", genKey(i), int64(i)) - assert.Nil(err) - } - { - // card - n, err := db.ZCard("zset") - assert.Nil(err) - assert.Equal(n, 1000) - } - - check := func() { - // exist - for i := 0; i < 1000; i++ { - // card - n, err := db.ZCard("zset") - assert.Nil(err) - assert.Equal(n, 1000) - - // zget - score, err := db.ZGet("zset", genKey(i)) - assert.Nil(err) - assert.Equal(score, int64(i)) - } - - // not exist - for i := 1000; i < 2000; i++ { - score, err := db.ZGet("zset", genKey(i)) - assert.Equal(err, ErrKeyNotFound) - assert.Equal(score, int64(0)) - } - - // iter - count := 0 - err = db.ZIter("zset", func(key string, score int64) bool { - count++ - return count >= 1000 - }) - assert.Nil(err) - assert.Equal(count, 1000) - } - - check() - - // Reload - db.Close() - db, err = Open(db.GetOptions()) - assert.Nil(err) - - check() - - // ZIncr - for i := 0; i < 1000; i++ { - num, err := db.ZIncr("zset", genKey(i), 3) - assert.Nil(err) - assert.Equal(num, int64(i+3)) - } - for i := 3000; i < 4000; i++ { - num, err := db.ZIncr("zset", genKey(i), 3) - assert.Nil(err) - assert.Equal(num, int64(3)) - } - - // ZRemove - for i := 0; i < 800; i++ { - err := db.ZRemove("zset", genKey(i)) - assert.Nil(err) - } - - for i := 5000; i < 6000; i++ { - err := db.ZRemove("zset", genKey(i)) - assert.Nil(err) - } - - // reload - db.Close() - db, err = Open(db.GetOptions()) - assert.Nil(err) - - // shrink and reload - db.Shrink() - db.Close() - db, err = Open(db.GetOptions()) - assert.Nil(err) - - // Test error - db.SAdd("set", "1") - - n, err := db.ZGet("set", "1") - assert.Equal(n, int64(0)) - assert.ErrorContains(err, ErrWrongType.Error()) - - err = db.ZIter("set", func(key string, score int64) bool { - return false - }) - assert.ErrorContains(err, ErrWrongType.Error()) - - err = db.ZAdd("set", "key", 1) - assert.ErrorContains(err, ErrWrongType.Error()) - - _, err = db.ZIncr("set", "key", 1) - assert.ErrorContains(err, ErrWrongType.Error()) - - err = db.ZRemove("set", "key") - assert.ErrorContains(err, ErrWrongType.Error()) - - _, err = db.ZCard("set") - assert.ErrorContains(err, ErrWrongType.Error()) -} - -func TestInvalidCodec(t *testing.T) { - t.Parallel() - assert := assert.New(t) - - // read args. - codec := newCodec(OpSetTx).Bool(true) - reader := codeman.NewReader(codec.Content()) - - n := reader.Uint32() - assert.Equal(uint64(n), uint64(OpSetTx)) - - assert.Equal(true, reader.Bool()) - - assert.Panics(func() { - reader.StrSlice() - }) - assert.Panics(func() { - reader.Byte() - }) - assert.Panics(func() { - reader.Int64() - }) - assert.Panics(func() { - reader.RawBytes() - }) -} - -func TestCheckOption(t *testing.T) { - t.Parallel() - assert := assert.New(t) - - t.Run("checkOptions", func(t *testing.T) { - options := DefaultOptions - options.DirPath = "" - _, err := Open(options) - assert.NotNil(err) - - options.DirPath = "README.md" - _, err = Open(options) - assert.NotNil(err) - - options.DirPath = "test1" - options.ShardCount = 0 - _, err = Open(options) - assert.NotNil(err) - }) -} - -func TestUnmarshalError(t *testing.T) { - t.Parallel() - assert := assert.New(t) - - for _, types := range []int64{TypeMap, TypeList, TypeSet, TypeZSet, TypeBitmap} { - db, err := createDB() - assert.Nil(err) - - // unmarshal error. - db.encode(newCodec(OpSetObject).Int(types).Str("key").Bytes([]byte("error"))) - db.Close() - _, err = Open(db.GetOptions()) - assert.NotNil(err) - } -} - -func TestIncr(t *testing.T) { - t.Parallel() - assert := assert.New(t) - db, _ := createDB() - - for i := 0; i < 1000; i++ { - n, err := db.Incr(fmt.Sprintf("key-%d", i), 1) - assert.Equal(n, int64(1)) - assert.Nil(err) - } - - for i := 0; i < 1000; i++ { - n, err := db.Incr(fmt.Sprintf("key-%d", i), 1) - assert.Equal(n, int64(2)) - assert.Nil(err) - } - - // Error - db.Set("ss", []byte("abcde")) - n, err := db.Incr("ss", 1) - assert.Equal(n, int64(0)) - assert.NotNil(err) - - // Reopen - db.Close() - db, _ = Open(db.GetOptions()) - - // Get - for i := 0; i < 1000; i++ { - val, ts, err := db.Get(fmt.Sprintf("key-%d", i)) - assert.Equal(val, []byte("2")) - assert.Equal(ts, int64(0)) - assert.Nil(err) - } - - // Shrink - err = db.Shrink() - assert.Nil(err) -} - -func TestSetNx(t *testing.T) { - t.Parallel() - assert := assert.New(t) - db, _ := createDB() - - ok := db.SetNx("key", []byte("val"), time.Now().Add(time.Second).UnixNano()) - assert.True(ok) - - for i := 0; i < 100; i++ { - ok := db.SetNx("key", []byte("val"), 0) - assert.False(ok) - } - - // Reopen - db.Close() - db, _ = Open(db.GetOptions()) - - ok = db.SetNx("key", []byte("val"), 0) - assert.False(ok) - - time.Sleep(time.Second * 2) - - ok = db.SetNx("key", []byte("val"), 0) - assert.True(ok) -} diff --git a/v2/epoll.go b/epoll.go similarity index 75% rename from v2/epoll.go rename to epoll.go index 086d85a..6d6046c 100644 --- a/v2/epoll.go +++ b/epoll.go @@ -8,12 +8,13 @@ import ( "sync" "syscall" + "github.com/cockroachdb/swiss" "golang.org/x/sys/unix" ) type epoll struct { fd int - connections map[int]net.Conn + connections *swiss.Map[int, net.Conn] lock *sync.RWMutex } @@ -25,7 +26,7 @@ func MkEpoll() (*epoll, error) { return &epoll{ fd: fd, lock: &sync.RWMutex{}, - connections: make(map[int]net.Conn), + connections: swiss.New[int, net.Conn](100), }, nil } @@ -38,7 +39,7 @@ func (e *epoll) Add(conn net.Conn) error { } e.lock.Lock() - e.connections[fd] = conn + e.connections.Put(fd, conn) e.lock.Unlock() return nil @@ -52,7 +53,7 @@ func (e *epoll) Remove(conn net.Conn) error { } e.lock.Lock() - delete(e.connections, fd) + e.connections.Delete(fd) e.lock.Unlock() return nil @@ -60,6 +61,7 @@ func (e *epoll) Remove(conn net.Conn) error { func (e *epoll) Wait() ([]net.Conn, error) { events := make([]unix.EpollEvent, 100) + retry: n, err := unix.EpollWait(e.fd, events, 0) if err != nil { @@ -76,21 +78,17 @@ retry: e.lock.RLock() defer e.lock.RUnlock() for i := 0; i < n; i++ { - conn := e.connections[int(events[i].Fd)] - connections = append(connections, conn) + conn, ok := e.connections.Get(int(events[i].Fd)) + if ok { + connections = append(connections, conn) + } } return connections, nil } func socketFD(conn net.Conn) int { - //tls := reflect.TypeOf(conn.UnderlyingConn()) == reflect.TypeOf(&tls.Conn{}) - // Extract the file descriptor associated with the connection - //connVal := reflect.Indirect(reflect.ValueOf(conn)).FieldByName("conn").Elem() tcpConn := reflect.Indirect(reflect.ValueOf(conn)).FieldByName("conn") - //if tls { - // tcpConn = reflect.Indirect(tcpConn.Elem()) - //} fdVal := tcpConn.FieldByName("fd") pfdVal := reflect.Indirect(fdVal).FieldByName("pfd") diff --git a/errors.go b/errors.go index a5ddb72..9397159 100644 --- a/errors.go +++ b/errors.go @@ -1,41 +1,14 @@ -package rotom +package main import ( "errors" + "fmt" ) var ( - ErrKeyNotFound = errors.New("key not found") - - ErrFieldNotFound = errors.New("field not found") - - ErrTypeAssert = errors.New("type assert error") - - ErrOutOfBounds = errors.New("index out of bounds") - - ErrWrongType = errors.New("wrong data type") - - ErrWrongBitValue = errors.New("wrong bit value") - - ErrUnSupportDataType = errors.New("unsupport data type") - - ErrUnknownOperationType = errors.New("unknown operation type") - - ErrNotString = errors.New("value is not string") - - ErrNotNumberic = errors.New("value is not numberic") - - ErrInvalidArgs = errors.New("invalid args") - - ErrInvalidResponse = errors.New("invalid response") - - ErrDatabaseClosed = errors.New("database closed") - - ErrUnSupportOperation = errors.New("unsupport operation") - - ErrIndexOutOfRange = errors.New("index out of range") - - ErrEmptyList = errors.New("list is empty") - - ErrShrinkRunning = errors.New("shrink is running") + ErrWrongType = errors.New("WRONGTYPE Operation against a key holding the wrong kind of value") ) + +func ErrWrongArgs(cmd string) error { + return fmt.Errorf("ERR wrong number of arguments for '%s' command", cmd) +} diff --git a/example/main.go b/example/main.go deleted file mode 100644 index 40738b5..0000000 --- a/example/main.go +++ /dev/null @@ -1,28 +0,0 @@ -package main - -import ( - "fmt" - "net/http" - _ "net/http/pprof" - "time" - - "github.com/xgzlucario/rotom" -) - -func main() { - go http.ListenAndServe("localhost:6060", nil) - - db, err := rotom.Open(rotom.DefaultOptions) - if err != nil { - panic(err) - } - defer db.Close() - - for i := 0; ; i++ { - if i%10000 == 0 { - fmt.Println(i/10000, "w") - } - k := fmt.Sprintf("%09x", i) - db.SetEx(k, []byte(k), time.Second*10) - } -} diff --git a/go.mod b/go.mod index fc25d81..18470bd 100644 --- a/go.mod +++ b/go.mod @@ -5,14 +5,12 @@ go 1.22 require ( github.com/RoaringBitmap/roaring v1.9.3 github.com/bytedance/sonic v1.11.8 + github.com/cockroachdb/swiss v0.0.0-20240303172742-c161743eb608 github.com/deckarep/golang-set/v2 v2.6.0 - github.com/orcaman/concurrent-map/v2 v2.0.1 - github.com/rosedblabs/wal v1.3.6 github.com/sakeven/RbTree v0.0.0-20240321014605-9899538dc980 - github.com/stretchr/testify v1.9.0 + github.com/sourcegraph/conc v0.3.0 github.com/xgzlucario/GigaCache v0.0.0-20240531152919-576765cef731 github.com/xgzlucario/quicklist v0.0.0-20240530174658-6f1a884f579b - golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc golang.org/x/sys v0.20.0 ) @@ -21,14 +19,12 @@ require ( github.com/bytedance/sonic/loader v0.1.1 // indirect github.com/cloudwego/base64x v0.1.4 // indirect github.com/cloudwego/iasm v0.2.0 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect - github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect - github.com/kr/text v0.2.0 // indirect github.com/mschoch/smat v0.2.0 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/testify v1.9.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect - github.com/valyala/bytebufferpool v1.0.0 // indirect + go.uber.org/atomic v1.7.0 // indirect + go.uber.org/multierr v1.9.0 // indirect golang.org/x/arch v0.8.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect + golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc // indirect ) diff --git a/go.sum b/go.sum index 8820d03..d616d56 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/RoaringBitmap/roaring v1.9.3 h1:t4EbC5qQwnisr5PrP9nt0IRhRTb9gMUgQF4t4S2OByM= github.com/RoaringBitmap/roaring v1.9.3/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90= +github.com/aclements/go-perfevent v0.0.0-20240301234650-f7843625020f h1:JjxwchlOepwsUWcQwD2mLUAGE9aCp0/ehy6yCHFBOvo= +github.com/aclements/go-perfevent v0.0.0-20240301234650-f7843625020f/go.mod h1:tMDTce/yLLN/SK8gMOxQfnyeMeCg8KGzp0D1cbECEeo= github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/bits-and-blooms/bitset v1.13.0 h1:bAQ9OPNFYbGHV6Nez0tmNI0RiEu7/hxlYJRUA0wFAVE= github.com/bits-and-blooms/bitset v1.13.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= @@ -11,37 +13,29 @@ github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/ github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/cockroachdb/swiss v0.0.0-20240303172742-c161743eb608 h1:GXGrLNSC5+LGrwVgtczB6JCITxB9WGaLv7XilPkBDvc= +github.com/cockroachdb/swiss v0.0.0-20240303172742-c161743eb608/go.mod h1:yBRu/cnL4ks9bgy4vAASdjIW+/xMlFwuHKqtmh3GZQg= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM= github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= -github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= -github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= -github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= -github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= -github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= -github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= -github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= -github.com/rosedblabs/wal v1.3.6 h1:oxZYTPX/u4JuGDW98wQ1YamWqerlrlSUFKhgP6Gd/Ao= -github.com/rosedblabs/wal v1.3.6/go.mod h1:wdq54KJUyVTOv1uddMc6Cdh2d/YCIo8yjcwJAb1RCEM= github.com/sakeven/RbTree v0.0.0-20240321014605-9899538dc980 h1:t5uAkycj8WepkboiZvJzHB+FvkNj+P6Z2dEN4pFajU4= github.com/sakeven/RbTree v0.0.0-20240321014605-9899538dc980/go.mod h1:zwEumjdcK6Q/ky/gFPqMviw1p7ZUb+B3pU4ybgOHvns= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= @@ -50,12 +44,14 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= -github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= -github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/xgzlucario/GigaCache v0.0.0-20240531152919-576765cef731 h1:frRQxMZFCPWfoiWau4bPcYmNDGNVPLqM9nqnsp6Uakg= github.com/xgzlucario/GigaCache v0.0.0-20240531152919-576765cef731/go.mod h1:sPwGPAuvd9WdiONTmusXGNocqcY5L/J7+st1upAMlX8= github.com/xgzlucario/quicklist v0.0.0-20240530174658-6f1a884f579b h1:C/+nN/kFJ6yrmEhIu+5Ra2jx/W8w+Ayu8pTiZfuU5Xc= github.com/xgzlucario/quicklist v0.0.0-20240530174658-6f1a884f579b/go.mod h1:1ZgyZNk91XIllYdOPpwP+9L2RCw6QGSy6alTYF+Z0iU= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= +go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc= golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= @@ -65,8 +61,6 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/v2/handler.go b/handler.go similarity index 91% rename from v2/handler.go rename to handler.go index 25f6188..63072ef 100644 --- a/v2/handler.go +++ b/handler.go @@ -1,7 +1,6 @@ package main import ( - "errors" "fmt" "strconv" "time" @@ -10,10 +9,6 @@ import ( "github.com/xgzlucario/rotom/structx" ) -var ( - ErrWrongType = errors.New("WRONGTYPE Operation against a key holding the wrong kind of value") -) - func pingCommand(_ []Value) Value { return Value{typ: STRING, str: "PONG"} } diff --git a/main.go b/main.go new file mode 100644 index 0000000..ee26f26 --- /dev/null +++ b/main.go @@ -0,0 +1,32 @@ +package main + +import ( + "log" + "syscall" +) + +func main() { + var err error + config, err = LoadConfig("config.json") + if err != nil { + log.Panicf("load config error: %v\n", err) + } + if err = InitDB(); err != nil { + log.Panicf("init db error: %v\n", err) + } + setLimit() + server.RunServe() +} + +func setLimit() { + var rLimit syscall.Rlimit + if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil { + panic(err) + } + rLimit.Cur = rLimit.Max + if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil { + panic(err) + } + + log.Printf("set cur socketFD limit: %d", rLimit.Cur) +} diff --git a/options.go b/options.go deleted file mode 100644 index a9c9962..0000000 --- a/options.go +++ /dev/null @@ -1,33 +0,0 @@ -package rotom - -import ( - "errors" -) - -var DefaultOptions = Options{ - DirPath: "rotom", - ShardCount: 1024, - RunSkipLoadError: true, -} - -// Options represents the configuration for rotom. -type Options struct { - // Dir path if the db storage path. - DirPath string - - // ShardCount is the shard numbers for underlying hashmap. - ShardCount uint32 - - // Skip error when loading db file when startup. - RunSkipLoadError bool -} - -func checkOptions(option Options) error { - if option.ShardCount == 0 { - return errors.New("invalid shard count") - } - if option.DirPath == "" { - return errors.New("invalid dir path") - } - return nil -} diff --git a/v2/resp.go b/resp.go similarity index 100% rename from v2/resp.go rename to resp.go diff --git a/v2/rotom.go b/rotom.go similarity index 67% rename from v2/rotom.go rename to rotom.go index 7cfdb78..f7031ac 100644 --- a/v2/rotom.go +++ b/rotom.go @@ -7,11 +7,11 @@ import ( "net" "os" + "github.com/sourcegraph/conc/pool" cache "github.com/xgzlucario/GigaCache" "github.com/xgzlucario/rotom/structx" ) -// Type aliases for built-in types. type ( Map = *structx.Map Set = *structx.Set @@ -26,7 +26,10 @@ type DB struct { aof *Aof } -type Server struct{} +type Server struct { + epoller *epoll + workerpool *pool.Pool +} type Command struct { name string @@ -105,12 +108,8 @@ func InitDB() (err error) { return nil } -func (s *Server) Run() { - epoller, err := MkEpoll() - if err != nil { - panic(err) - } - +func (s *Server) RunServe() { + // Start tcp listener. listener, err := net.Listen("tcp", fmt.Sprintf(":%d", config.Port)) if err != nil { fmt.Println("Error creating listener:", err) @@ -118,7 +117,17 @@ func (s *Server) Run() { } defer listener.Close() - // epoll waiter + // Start epoll waiter. + epoller, err := MkEpoll() + if err != nil { + fmt.Println("Error creating epoller:", err) + os.Exit(1) + } + s.epoller = epoller + + // Start goroutine workerpool. + s.workerpool = pool.New().WithMaxGoroutines(100 * 10000) + go func() { var buf = make([]byte, 512) for { @@ -140,7 +149,12 @@ func (s *Server) Run() { conn.Close() } else { - handleConnection(buf[:n], conn) + // bench test + // _ = n + // pool.Go(func() { + // conn.Write(ValueOK.Marshal()) + // }) + s.handleConnection(buf[:n], conn) } } } @@ -158,6 +172,45 @@ func (s *Server) Run() { } } -func ErrWrongArgs(cmd string) error { - return fmt.Errorf("ERR wrong number of arguments for '%s' command", cmd) +func (s *Server) handleConnection(buf []byte, conn net.Conn) { + resp := NewResp(bytes.NewReader(buf)) + for { + value, err := resp.Read() + if err != nil { + return + } + + if value.typ != ARRAY || len(value.array) == 0 { + log.Println("invalid request, expected non-empty array") + continue + } + + value.array[0].bulk = bytes.ToLower(value.array[0].bulk) + command := value.array[0].bulk + args := value.array[1:] + + var res Value + + // Lookup for command. + cmd, err := lookupCommand(b2s(command)) + if err != nil { + res = NewErrValue(err) + + } else { + // Write aof file if needed. + if config.AppendOnly { + cmd.writeAofFile(db.aof, value.array) + } + + // Process command. + res = cmd.processCommand(args) + } + + // Async write result. + s.workerpool.Go(func() { + if _, err = conn.Write(res.Marshal()); err != nil { + log.Printf("write reply error: %v", err) + } + }) + } } diff --git a/structx/bitmap.go b/structx/bitmap.go index e1d7a24..5653c3c 100644 --- a/structx/bitmap.go +++ b/structx/bitmap.go @@ -1,14 +1,10 @@ package structx import ( - "sync" - "github.com/RoaringBitmap/roaring" ) -// Bitmap type Bitmap struct { - sync.RWMutex bm *roaring.Bitmap } @@ -16,105 +12,63 @@ func NewBitmap() *Bitmap { return &Bitmap{bm: roaring.New()} } -// Add func (b *Bitmap) Add(items ...uint32) (n int) { - b.Lock() for _, item := range items { if b.bm.CheckedAdd(item) { n++ } } - b.Unlock() return } -// Remove func (b *Bitmap) Remove(items ...uint32) (n int) { - b.Lock() for _, item := range items { if b.bm.CheckedRemove(item) { n++ } } - b.Unlock() return } -// Test func (b *Bitmap) Test(i uint32) bool { - b.Lock() - ok := b.bm.Contains(i) - b.Unlock() - return ok + return b.bm.Contains(i) } -// Flip func (b *Bitmap) Flip(start, end uint64) { - b.Lock() b.bm.Flip(start, end) - b.Unlock() } -// ToArray func (b *Bitmap) ToArray() []uint32 { - b.Lock() - arr := b.bm.ToArray() - b.Unlock() - return arr + return b.bm.ToArray() } -// Len func (b *Bitmap) Len() uint64 { - b.RLock() - len := b.bm.Stats().Cardinality - b.RUnlock() - return len + return b.bm.Stats().Cardinality } -// Or func (b *Bitmap) Or(b2 *Bitmap) *Bitmap { - b.Lock() b.bm.Or(b2.bm) - b.Unlock() return b } -// And func (b *Bitmap) And(b2 *Bitmap) *Bitmap { - b.Lock() b.bm.And(b2.bm) - b.Unlock() return b } -// Xor func (b *Bitmap) Xor(b2 *Bitmap) *Bitmap { - b.Lock() b.bm.Xor(b2.bm) - b.Unlock() return b } -// Clone func (b *Bitmap) Clone() *Bitmap { - b.RLock() - b2 := &Bitmap{bm: b.bm.Clone()} - b.RUnlock() - return b2 + return &Bitmap{bm: b.bm.Clone()} } -// MarshalBinary func (b *Bitmap) MarshalBinary() ([]byte, error) { - b.RLock() - src, err := b.bm.MarshalBinary() - b.RUnlock() - return src, err + return b.bm.MarshalBinary() } -// UnmarshalBinary func (b *Bitmap) UnmarshalBinary(data []byte) error { - b.Lock() - err := b.bm.UnmarshalBinary(data) - b.Unlock() - return err + return b.bm.UnmarshalBinary(data) } diff --git a/structx/map.go b/structx/map.go index 93005f2..6e0e2ae 100644 --- a/structx/map.go +++ b/structx/map.go @@ -15,6 +15,7 @@ type Map struct { func defaultOptions() cache.Options { options := cache.DefaultOptions + options.ConcurrencySafe = false options.ShardCount = 4 options.IndexSize = 8 options.BufferSize = 32 diff --git a/structx/zset.go b/structx/zset.go index a5317ba..7f9f707 100644 --- a/structx/zset.go +++ b/structx/zset.go @@ -1,15 +1,12 @@ package structx import ( - "sync" - "github.com/bytedance/sonic" rbtree "github.com/sakeven/RbTree" ) // ZSet type ZSet struct { - sync.RWMutex m map[string]int64 tree *rbtree.Tree[int64, string] } @@ -24,17 +21,13 @@ func NewZSet() *ZSet { // Get func (z *ZSet) Get(key string) (int64, bool) { - z.RLock() - defer z.RUnlock() s, ok := z.m[key] return s, ok } // Set upsert value by key. func (z *ZSet) Set(key string, score int64) { - z.Lock() z.set(key, score) - z.Unlock() } func (z *ZSet) set(key string, score int64) { @@ -59,7 +52,6 @@ func (z *ZSet) deleteNode(score int64, key string) bool { // Incr func (z *ZSet) Incr(key string, incr int64) int64 { - z.Lock() score, ok := z.m[key] if ok { z.deleteNode(score, key) @@ -67,34 +59,26 @@ func (z *ZSet) Incr(key string, incr int64) int64 { score += incr z.m[key] = score z.tree.Insert(score, key) - z.Unlock() return score } // Delete func (z *ZSet) Delete(key string) (s int64, ok bool) { - z.Lock() score, ok := z.m[key] if ok { delete(z.m, key) z.deleteNode(score, key) } - z.Unlock() return score, ok } // Len func (z *ZSet) Len() int { - z.RLock() - defer z.RUnlock() return len(z.m) } // Iter iterate all elements by scores. func (z *ZSet) Iter(f func(k string, s int64) bool) { - z.RLock() - defer z.RUnlock() - for it := z.tree.Iterator(); it != nil; it = it.Next() { if f(it.Value, it.Key) { return diff --git a/utils.go b/utils.go index 511d041..58d4ecf 100644 --- a/utils.go +++ b/utils.go @@ -1,4 +1,4 @@ -package rotom +package main import "unsafe" diff --git a/v2/main.go b/v2/main.go deleted file mode 100644 index 02d78ea..0000000 --- a/v2/main.go +++ /dev/null @@ -1,78 +0,0 @@ -package main - -import ( - "bytes" - "log" - "net" - "syscall" -) - -func main() { - var err error - config, err = LoadConfig("config.json") - if err != nil { - log.Panicf("load config error: %v\n", err) - } - if err = InitDB(); err != nil { - log.Panicf("init db error: %v\n", err) - } - setLimit() - server.Run() -} - -func handleConnection(buf []byte, conn net.Conn) { - resp := NewResp(bytes.NewReader(buf)) - for { - value, err := resp.Read() - if err != nil { - return - } - - if value.typ != ARRAY || len(value.array) == 0 { - log.Println("invalid request, expected non-empty array") - continue - } - - value.array[0].bulk = bytes.ToLower(value.array[0].bulk) - command := value.array[0].bulk - args := value.array[1:] - - var res Value - - // Lookup for command. - cmd, err := lookupCommand(b2s(command)) - if err != nil { - log.Printf("%v", err) - res = NewErrValue(err) - - } else { - // Write aof file if needed. - if config.AppendOnly { - cmd.writeAofFile(db.aof, value.array) - } - - // Process command. - res = cmd.processCommand(args) - } - - // Async write result. - go func() { - if _, err = conn.Write(res.Marshal()); err != nil { - log.Printf("write reply error: %v", err) - } - }() - } -} - -func setLimit() { - var rLimit syscall.Rlimit - if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil { - panic(err) - } - rLimit.Cur = rLimit.Max - if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil { - panic(err) - } - - log.Printf("set cur limit: %d", rLimit.Cur) -} diff --git a/v2/utils.go b/v2/utils.go deleted file mode 100644 index 1a16964..0000000 --- a/v2/utils.go +++ /dev/null @@ -1,7 +0,0 @@ -package main - -import "unsafe" - -func b2s(b []byte) string { - return *(*string)(unsafe.Pointer(&b)) -}