From 9baf806f287ca457dff1112e9259e0d07aad4ccf Mon Sep 17 00:00:00 2001 From: xgzlucario <912156837@qq.com> Date: Sat, 25 May 2024 00:02:11 +0800 Subject: [PATCH] v2 protocol add hdel command --- batch.go | 2 +- db.go | 9 ----- db_test.go | 13 ------ go.mod | 2 +- go.sum | 6 +++ structx/map.go | 14 +++---- v2/aof.go | 2 +- v2/handler.go | 108 +++++++++++++++++++++++++++++++++++++++++-------- v2/main.go | 29 ++++++------- v2/resp.go | 17 +++++--- v2/rotom.go | 51 +++++++++++++++-------- 11 files changed, 169 insertions(+), 84 deletions(-) diff --git a/batch.go b/batch.go index 804241a..79e438c 100644 --- a/batch.go +++ b/batch.go @@ -53,7 +53,7 @@ func (db *DB) BatchHSet(key string, batches ...*Batch) error { for _, b := range batches { codec = codec.Str(b.Key).Bytes(b.Val).Int(b.Timestamp) - m.Set(b.Key, b.Val, b.Timestamp) + m.Set(b.Key, b.Val) } db.encode(codec) diff --git a/db.go b/db.go index 5da0773..2236dc3 100644 --- a/db.go +++ b/db.go @@ -473,15 +473,6 @@ func (db *DB) HRemove(key string, fields ...string) (n int, err error) { return } -// HKeys -func (db *DB) HKeys(key string) ([]string, error) { - m, err := db.fetchMap(key) - if err != nil { - return nil, err - } - return m.Keys(), nil -} - // SAdd func (db *DB) SAdd(key string, items ...string) (int, error) { s, err := db.fetchSet(key, true) diff --git a/db_test.go b/db_test.go index f3c2683..324640c 100644 --- a/db_test.go +++ b/db_test.go @@ -167,15 +167,6 @@ func TestHmap(t *testing.T) { res, err := db.HGet(mapkey, key) assert.Nil(err) assert.Equal(res, val) - - // HLen - num, err := db.HLen(mapkey) - assert.Nil(err) - - // HKeys - keys, err := db.HKeys(mapkey) - assert.Nil(err) - assert.Equal(len(keys), num) } } @@ -223,10 +214,6 @@ func TestHmap(t *testing.T) { assert.Equal(res, 0) assert.ErrorContains(err, ErrWrongType.Error()) - m, err := db.HKeys("fake") - assert.Equal(m, nilStrings) - assert.ErrorContains(err, ErrWrongType.Error()) - n, err := db.HRemove("fake", "foo") assert.Equal(n, 0) assert.ErrorContains(err, ErrWrongType.Error()) diff --git a/go.mod b/go.mod index 79c8e83..3724df0 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/bytedance/sonic v1.11.6 github.com/deckarep/golang-set/v2 v2.6.0 github.com/orcaman/concurrent-map/v2 v2.0.1 + github.com/panjf2000/gnet/v2 v2.5.2 github.com/rosedblabs/wal v1.3.6 github.com/sakeven/RbTree v0.0.0-20240321014605-9899538dc980 github.com/stretchr/testify v1.9.0 @@ -25,7 +26,6 @@ require ( github.com/klauspost/cpuid/v2 v2.2.7 // indirect github.com/kr/text v0.2.0 // indirect github.com/mschoch/smat v0.2.0 // indirect - github.com/panjf2000/gnet/v2 v2.5.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect diff --git a/go.sum b/go.sum index 822b000..f579881 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,6 @@ github.com/RoaringBitmap/roaring v1.9.3 h1:t4EbC5qQwnisr5PrP9nt0IRhRTb9gMUgQF4t4S2OByM= github.com/RoaringBitmap/roaring v1.9.3/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90= +github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/bits-and-blooms/bitset v1.13.0 h1:bAQ9OPNFYbGHV6Nez0tmNI0RiEu7/hxlYJRUA0wFAVE= @@ -35,8 +36,11 @@ github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= +github.com/panjf2000/ants/v2 v2.9.0 h1:SztCLkVxBRigbg+vt0S5QvF5vxAbxbKt09/YfAJ0tEo= +github.com/panjf2000/ants/v2 v2.9.0/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I= github.com/panjf2000/gnet/v2 v2.5.2 h1:ctslzTSO77beWighLbLL9zsO7wmh5pYkomuWctO9+MY= github.com/panjf2000/gnet/v2 v2.5.2/go.mod h1:cm9AIICvZSrQllpQG5u5st56VMJCAWbB9/qdNKlrpKM= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -67,6 +71,7 @@ github.com/xgzlucario/quicklist v0.0.0-20240428064242-d453ca4cbed3/go.mod h1:1Zg github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= @@ -113,6 +118,7 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogR gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= +gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/structx/map.go b/structx/map.go index eb20c14..0b68948 100644 --- a/structx/map.go +++ b/structx/map.go @@ -1,6 +1,7 @@ package structx import ( + "slices" "unsafe" "github.com/bytedance/sonic" @@ -31,8 +32,8 @@ func (m *Map) Get(key string) ([]byte, int64, bool) { } // Set -func (m *Map) Set(key string, val []byte, ts int64) { - m.m.SetTx(key, val, ts) +func (m *Map) Set(key string, val []byte) { + m.m.Set(key, val) } // Remove @@ -40,14 +41,13 @@ func (m *Map) Remove(key string) bool { return m.m.Remove(key) } -// Keys -func (m *Map) Keys() (keys []string) { - keys = make([]string, 0, m.m.Stat().Len) +// Scan +func (m *Map) Scan(fn func(key, value []byte)) { m.m.Scan(func(key, val []byte, _ int64) (next bool) { - keys = append(keys, string(key)) + // return copy + fn(slices.Clone(key), slices.Clone(val)) return true }) - return } // Len diff --git a/v2/aof.go b/v2/aof.go index d002346..540c45a 100644 --- a/v2/aof.go +++ b/v2/aof.go @@ -16,7 +16,7 @@ type Aof struct { } func NewAof(path string) (*Aof, error) { - f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0666) + f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) if err != nil { return nil, err } diff --git a/v2/handler.go b/v2/handler.go index 668e298..c7310b4 100644 --- a/v2/handler.go +++ b/v2/handler.go @@ -1,12 +1,16 @@ package main +import ( + "fmt" + + "github.com/xgzlucario/rotom/structx" +) + var ( RespOK = []byte("OK") RespPong = []byte("PONG") ) -var HSETs = map[string]map[string]string{} - func pingCommand(args []Value) Value { if len(args) == 0 { return Value{typ: TypeString, str: RespPong} @@ -18,7 +22,7 @@ func setCommand(args []Value) Value { key := args[0].bulk value := args[1].bulk - server.db.strs.Set(b2s(key), value) + db.strs.Set(b2s(key), value) return Value{typ: TypeString, str: RespOK} } @@ -26,7 +30,7 @@ func setCommand(args []Value) Value { func getCommand(args []Value) Value { key := args[0].bulk - value, _, ok := server.db.strs.Get(b2s(key)) + value, _, ok := db.strs.Get(b2s(key)) if !ok { return Value{typ: TypeNull} } @@ -37,12 +41,13 @@ func getCommand(args []Value) Value { func hsetCommand(args []Value) Value { hash := b2s(args[0].bulk) key := b2s(args[1].bulk) - value := b2s(args[2].bulk) + value := args[2].bulk - if _, ok := HSETs[hash]; !ok { - HSETs[hash] = map[string]string{} + m, err := fetchMap(hash, true) + if err != nil { + return ErrValue(err.Error()) } - HSETs[hash][key] = value + m.Set(key, value) return Value{typ: TypeString, str: RespOK} } @@ -51,7 +56,12 @@ func hgetCommand(args []Value) Value { hash := args[0].bulk key := args[1].bulk - value, ok := HSETs[b2s(hash)][b2s(key)] + m, err := fetchMap(b2s(hash)) + if err != nil { + return ErrValue(err.Error()) + } + + value, _, ok := m.Get(b2s(key)) if !ok { return Value{typ: TypeNull} } @@ -59,19 +69,85 @@ func hgetCommand(args []Value) Value { return Value{typ: TypeBulk, bulk: []byte(value)} } +func hdelCommand(args []Value) Value { + hash := args[0].bulk + keys := args[1:] + + m, err := fetchMap(b2s(hash)) + if err != nil { + return ErrValue(err.Error()) + } + + var success int64 + for _, v := range keys { + if m.Remove(b2s(v.bulk)) { + success++ + } + } + + return Value{typ: TypeInteger, num: success} +} + func hgetallCommand(args []Value) Value { hash := args[0].bulk - value, ok := HSETs[b2s(hash)] - if !ok { - return Value{typ: TypeNull} + m, err := fetchMap(b2s(hash)) + if err != nil { + return ErrValue(err.Error()) } var values []Value - for k, v := range value { - values = append(values, Value{typ: TypeBulk, bulk: []byte(k)}) - values = append(values, Value{typ: TypeBulk, bulk: []byte(v)}) - } + m.Scan(func(key, value []byte) { + values = append(values, Value{typ: TypeBulk, bulk: key}) + values = append(values, Value{typ: TypeBulk, bulk: value}) + }) return Value{typ: TypeArray, array: values} } + +func fetchMap(key string, setnx ...bool) (Map, error) { + return fetch(key, func() Map { + return structx.NewMap() + }, setnx...) +} + +func fetchSet(key string, setnx ...bool) (Set, error) { + return fetch(key, func() Set { + return structx.NewSet() + }, setnx...) +} + +func fetchList(key string, setnx ...bool) (List, error) { + return fetch(key, func() List { + return structx.NewList() + }, setnx...) +} + +func fetchBitMap(key string, setnx ...bool) (BitMap, error) { + return fetch(key, func() BitMap { + return structx.NewBitmap() + }, setnx...) +} + +func fetchZSet(key string, setnx ...bool) (ZSet, error) { + return fetch(key, func() ZSet { + return structx.NewZSet() + }, setnx...) +} + +func fetch[T any](key string, new func() T, setnx ...bool) (v T, err error) { + item, ok := db.extras[key] + if ok { + v, ok := item.(T) + if ok { + return v, nil + } + return v, fmt.Errorf("wrong type assert: %T->%T", item, v) + } + + v = new() + if len(setnx) > 0 && setnx[0] { + db.extras[key] = v + } + return v, nil +} diff --git a/v2/main.go b/v2/main.go index 412ce80..9fe107f 100644 --- a/v2/main.go +++ b/v2/main.go @@ -4,17 +4,18 @@ import ( "bytes" "fmt" "log" + "net" "unsafe" "github.com/panjf2000/gnet/v2" ) -func (s *Server) OnBoot(eng gnet.Engine) gnet.Action { - s.engine = eng +func (s *Server) OnBoot(engine gnet.Engine) gnet.Action { + s.engine = engine return gnet.None } -func (es *Server) OnTraffic(c gnet.Conn) gnet.Action { +func (s *Server) OnTraffic(c gnet.Conn) gnet.Action { handleConnection(c) return gnet.None } @@ -22,17 +23,17 @@ func (es *Server) OnTraffic(c gnet.Conn) gnet.Action { func main() { config, err := LoadConfig("config.json") if err != nil { - log.Printf("config error: %v\n", err) + log.Printf("load config error: %v\n", err) } - err = initServer(config) - if err != nil { + if err = initDB(config); err != nil { + log.Printf("init db error: %v\n", err) + } + if err = RunServer(config); err != nil { log.Printf("init server error: %v\n", err) } - log.Println("rotom server is up.") - server.Run() } -func handleConnection(conn gnet.Conn) { +func handleConnection(conn net.Conn) { resp := NewResp(conn) for { value, err := resp.Read() @@ -52,7 +53,7 @@ func handleConnection(conn gnet.Conn) { } } -func processCommand(conn gnet.Conn, cmdStr []byte, args []Value) { +func processCommand(conn net.Conn, cmdStr []byte, args []Value) { writer := NewWriter(conn) cmd := lookupCommand(b2s(cmdStr)) @@ -63,7 +64,7 @@ func processCommand(conn gnet.Conn, cmdStr []byte, args []Value) { } // check command args - if len(args) < 2 { + if len(args) < cmd.arity { result := Value{ typ: TypeError, str: []byte(fmt.Sprintf("ERR wrong number of arguments for '%s' command", cmd.name)), @@ -72,12 +73,12 @@ func processCommand(conn gnet.Conn, cmdStr []byte, args []Value) { return } - if b2s(cmdStr) == "set" || b2s(cmdStr) == "hset" { - // Manually constructing the array slice to include command and args. + if b2s(cmdStr) == "set" || b2s(cmdStr) == "hset" || b2s(cmdStr) == "hdel" { + // manually constructing the array slice to include command and args values := make([]Value, len(args)+1) values[0] = Value{typ: TypeBulk, bulk: cmdStr} copy(values[1:], args) - server.db.aof.Write(Value{typ: TypeArray, array: values}) + db.aof.Write(Value{typ: TypeArray, array: values}) } result := cmd.handler(args) diff --git a/v2/resp.go b/v2/resp.go index 70243aa..f4f181f 100644 --- a/v2/resp.go +++ b/v2/resp.go @@ -28,11 +28,11 @@ const ( // Value represents the different types of RESP (Redis Serialization Protocol) values. type Value struct { - typ ValueType // Type of value ('string', 'error', 'integer', 'bulk', 'array') - str []byte // Used for string and error types - // num int // Used for integer type - bulk []byte // Used for bulk strings - array []Value // Used for arrays of nested values + typ ValueType // Type of value ('string', 'error', 'integer', 'bulk', 'array') + str []byte // Used for string and error types + num int64 // Used for integer type + bulk []byte // Used for bulk strings + array []Value // Used for arrays of nested values } // Resp is a parser for RESP encoded data. @@ -150,6 +150,8 @@ func (v Value) Marshal() []byte { return v.marshalBulk() case TypeString: return v.marshalString() + case TypeInteger: + return v.marshalInteger() case TypeNull: return v.marshallNull() case TypeError: @@ -159,6 +161,11 @@ func (v Value) Marshal() []byte { } } +func (v Value) marshalInteger() []byte { + str := strconv.FormatInt(v.num, 10) + return append([]byte{INTEGER}, append([]byte(str), '\r', '\n')...) +} + // marshalString marshals a string value into RESP format. func (v Value) marshalString() []byte { return append([]byte{STRING}, append([]byte(v.str), '\r', '\n')...) diff --git a/v2/rotom.go b/v2/rotom.go index 8d3e294..07b7219 100644 --- a/v2/rotom.go +++ b/v2/rotom.go @@ -6,6 +6,27 @@ import ( "github.com/panjf2000/gnet/v2" cache "github.com/xgzlucario/GigaCache" + "github.com/xgzlucario/rotom/structx" +) + +// DataType is the data type for Rotom. +type DataType byte + +const ( + TypeMap DataType = iota + 1 + TypeSet + TypeList + TypeZSet + TypeBitmap +) + +// Type aliases for built-in types. +type ( + Map = *structx.Map + Set = *structx.Set + List = *structx.List + ZSet = *structx.ZSet + BitMap = *structx.Bitmap ) type DB struct { @@ -17,14 +38,9 @@ type DB struct { type Server struct { gnet.BuiltinEventEngine engine gnet.Engine - port int - db *DB + config *Config } -// type Client struct { -// conn net.Conn -// } - type CommandHandler func([]Value) Value type Command struct { @@ -35,6 +51,7 @@ type Command struct { // global varibles var ( + db DB server Server cmdTable []Command = []Command{ {"ping", pingCommand, 0}, @@ -42,6 +59,7 @@ var ( {"get", getCommand, 1}, {"hset", hsetCommand, 3}, {"hget", hgetCommand, 2}, + {"hdel", hdelCommand, 2}, {"hgetall", hgetallCommand, 1}, // TODO } @@ -56,13 +74,10 @@ func lookupCommand(cmdStr string) *Command { return nil } -func initServer(config *Config) (err error) { - server.port = config.Port - server.db = &DB{ - strs: cache.New(cache.DefaultOptions), - extras: map[string]any{}, - } - server.db.aof, err = NewAof(config.AppendOnlyFileName) +func initDB(config *Config) (err error) { + db.strs = cache.New(cache.DefaultOptions) + db.extras = make(map[string]any) + db.aof, err = NewAof(config.AppendOnlyFileName) if err != nil { log.Printf("failed to initialize aof file: %v\n", err) return @@ -70,9 +85,11 @@ func initServer(config *Config) (err error) { return nil } -func (s *Server) Run() { - defer server.db.aof.Close() +func RunServer(config *Config) (err error) { + server.config = config + defer db.aof.Close() - servePath := fmt.Sprintf("tcp://:%d", s.port) - gnet.Run(&server, servePath) + servePath := fmt.Sprintf("tcp://:%d", config.Port) + log.Printf("rotom server is binding on %s\n", servePath) + return gnet.Run(&server, servePath) }