Skip to content

Commit

Permalink
v2 protocol add hdel command
Browse files Browse the repository at this point in the history
  • Loading branch information
xgzlucario committed May 24, 2024
1 parent 72a010a commit 9baf806
Show file tree
Hide file tree
Showing 11 changed files with 169 additions and 84 deletions.
2 changes: 1 addition & 1 deletion batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (db *DB) BatchHSet(key string, batches ...*Batch) error {

for _, b := range batches {
codec = codec.Str(b.Key).Bytes(b.Val).Int(b.Timestamp)
m.Set(b.Key, b.Val, b.Timestamp)
m.Set(b.Key, b.Val)
}
db.encode(codec)

Expand Down
9 changes: 0 additions & 9 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,15 +473,6 @@ func (db *DB) HRemove(key string, fields ...string) (n int, err error) {
return
}

// HKeys
func (db *DB) HKeys(key string) ([]string, error) {
m, err := db.fetchMap(key)
if err != nil {
return nil, err
}
return m.Keys(), nil
}

// SAdd
func (db *DB) SAdd(key string, items ...string) (int, error) {
s, err := db.fetchSet(key, true)
Expand Down
13 changes: 0 additions & 13 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,6 @@ func TestHmap(t *testing.T) {
res, err := db.HGet(mapkey, key)
assert.Nil(err)
assert.Equal(res, val)

// HLen
num, err := db.HLen(mapkey)
assert.Nil(err)

// HKeys
keys, err := db.HKeys(mapkey)
assert.Nil(err)
assert.Equal(len(keys), num)
}
}

Expand Down Expand Up @@ -223,10 +214,6 @@ func TestHmap(t *testing.T) {
assert.Equal(res, 0)
assert.ErrorContains(err, ErrWrongType.Error())

m, err := db.HKeys("fake")
assert.Equal(m, nilStrings)
assert.ErrorContains(err, ErrWrongType.Error())

n, err := db.HRemove("fake", "foo")
assert.Equal(n, 0)
assert.ErrorContains(err, ErrWrongType.Error())
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/bytedance/sonic v1.11.6
github.com/deckarep/golang-set/v2 v2.6.0
github.com/orcaman/concurrent-map/v2 v2.0.1
github.com/panjf2000/gnet/v2 v2.5.2
github.com/rosedblabs/wal v1.3.6
github.com/sakeven/RbTree v0.0.0-20240321014605-9899538dc980
github.com/stretchr/testify v1.9.0
Expand All @@ -25,7 +26,6 @@ require (
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/panjf2000/gnet/v2 v2.5.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
github.com/RoaringBitmap/roaring v1.9.3 h1:t4EbC5qQwnisr5PrP9nt0IRhRTb9gMUgQF4t4S2OByM=
github.com/RoaringBitmap/roaring v1.9.3/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bits-and-blooms/bitset v1.13.0 h1:bAQ9OPNFYbGHV6Nez0tmNI0RiEu7/hxlYJRUA0wFAVE=
Expand Down Expand Up @@ -35,8 +36,11 @@ github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c=
github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM=
github.com/panjf2000/ants/v2 v2.9.0 h1:SztCLkVxBRigbg+vt0S5QvF5vxAbxbKt09/YfAJ0tEo=
github.com/panjf2000/ants/v2 v2.9.0/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I=
github.com/panjf2000/gnet/v2 v2.5.2 h1:ctslzTSO77beWighLbLL9zsO7wmh5pYkomuWctO9+MY=
github.com/panjf2000/gnet/v2 v2.5.2/go.mod h1:cm9AIICvZSrQllpQG5u5st56VMJCAWbB9/qdNKlrpKM=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down Expand Up @@ -67,6 +71,7 @@ github.com/xgzlucario/quicklist v0.0.0-20240428064242-d453ca4cbed3/go.mod h1:1Zg
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
Expand Down Expand Up @@ -113,6 +118,7 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogR
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
14 changes: 7 additions & 7 deletions structx/map.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package structx

import (
"slices"
"unsafe"

"github.com/bytedance/sonic"
Expand Down Expand Up @@ -31,23 +32,22 @@ func (m *Map) Get(key string) ([]byte, int64, bool) {
}

// Set
func (m *Map) Set(key string, val []byte, ts int64) {
m.m.SetTx(key, val, ts)
func (m *Map) Set(key string, val []byte) {
m.m.Set(key, val)
}

// Remove
func (m *Map) Remove(key string) bool {
return m.m.Remove(key)
}

// Keys
func (m *Map) Keys() (keys []string) {
keys = make([]string, 0, m.m.Stat().Len)
// Scan
func (m *Map) Scan(fn func(key, value []byte)) {
m.m.Scan(func(key, val []byte, _ int64) (next bool) {
keys = append(keys, string(key))
// return copy
fn(slices.Clone(key), slices.Clone(val))
return true
})
return
}

// Len
Expand Down
2 changes: 1 addition & 1 deletion v2/aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Aof struct {
}

func NewAof(path string) (*Aof, error) {
f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0666)
f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
if err != nil {
return nil, err
}
Expand Down
108 changes: 92 additions & 16 deletions v2/handler.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package main

import (
"fmt"

"github.com/xgzlucario/rotom/structx"
)

var (
RespOK = []byte("OK")
RespPong = []byte("PONG")
)

var HSETs = map[string]map[string]string{}

func pingCommand(args []Value) Value {
if len(args) == 0 {
return Value{typ: TypeString, str: RespPong}
Expand All @@ -18,15 +22,15 @@ func setCommand(args []Value) Value {
key := args[0].bulk
value := args[1].bulk

server.db.strs.Set(b2s(key), value)
db.strs.Set(b2s(key), value)

return Value{typ: TypeString, str: RespOK}
}

func getCommand(args []Value) Value {
key := args[0].bulk

value, _, ok := server.db.strs.Get(b2s(key))
value, _, ok := db.strs.Get(b2s(key))
if !ok {
return Value{typ: TypeNull}
}
Expand All @@ -37,12 +41,13 @@ func getCommand(args []Value) Value {
func hsetCommand(args []Value) Value {
hash := b2s(args[0].bulk)
key := b2s(args[1].bulk)
value := b2s(args[2].bulk)
value := args[2].bulk

if _, ok := HSETs[hash]; !ok {
HSETs[hash] = map[string]string{}
m, err := fetchMap(hash, true)
if err != nil {
return ErrValue(err.Error())
}
HSETs[hash][key] = value
m.Set(key, value)

return Value{typ: TypeString, str: RespOK}
}
Expand All @@ -51,27 +56,98 @@ func hgetCommand(args []Value) Value {
hash := args[0].bulk
key := args[1].bulk

value, ok := HSETs[b2s(hash)][b2s(key)]
m, err := fetchMap(b2s(hash))
if err != nil {
return ErrValue(err.Error())
}

value, _, ok := m.Get(b2s(key))
if !ok {
return Value{typ: TypeNull}
}

return Value{typ: TypeBulk, bulk: []byte(value)}
}

func hdelCommand(args []Value) Value {
hash := args[0].bulk
keys := args[1:]

m, err := fetchMap(b2s(hash))
if err != nil {
return ErrValue(err.Error())
}

var success int64
for _, v := range keys {
if m.Remove(b2s(v.bulk)) {
success++
}
}

return Value{typ: TypeInteger, num: success}
}

func hgetallCommand(args []Value) Value {
hash := args[0].bulk

value, ok := HSETs[b2s(hash)]
if !ok {
return Value{typ: TypeNull}
m, err := fetchMap(b2s(hash))
if err != nil {
return ErrValue(err.Error())
}

var values []Value
for k, v := range value {
values = append(values, Value{typ: TypeBulk, bulk: []byte(k)})
values = append(values, Value{typ: TypeBulk, bulk: []byte(v)})
}
m.Scan(func(key, value []byte) {
values = append(values, Value{typ: TypeBulk, bulk: key})
values = append(values, Value{typ: TypeBulk, bulk: value})
})

return Value{typ: TypeArray, array: values}
}

func fetchMap(key string, setnx ...bool) (Map, error) {
return fetch(key, func() Map {
return structx.NewMap()
}, setnx...)
}

func fetchSet(key string, setnx ...bool) (Set, error) {
return fetch(key, func() Set {
return structx.NewSet()
}, setnx...)
}

func fetchList(key string, setnx ...bool) (List, error) {
return fetch(key, func() List {
return structx.NewList()
}, setnx...)
}

func fetchBitMap(key string, setnx ...bool) (BitMap, error) {
return fetch(key, func() BitMap {
return structx.NewBitmap()
}, setnx...)
}

func fetchZSet(key string, setnx ...bool) (ZSet, error) {
return fetch(key, func() ZSet {
return structx.NewZSet()
}, setnx...)
}

func fetch[T any](key string, new func() T, setnx ...bool) (v T, err error) {
item, ok := db.extras[key]
if ok {
v, ok := item.(T)
if ok {
return v, nil
}
return v, fmt.Errorf("wrong type assert: %T->%T", item, v)
}

v = new()
if len(setnx) > 0 && setnx[0] {
db.extras[key] = v
}
return v, nil
}
29 changes: 15 additions & 14 deletions v2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,36 @@ import (
"bytes"
"fmt"
"log"
"net"
"unsafe"

"github.com/panjf2000/gnet/v2"
)

func (s *Server) OnBoot(eng gnet.Engine) gnet.Action {
s.engine = eng
func (s *Server) OnBoot(engine gnet.Engine) gnet.Action {
s.engine = engine
return gnet.None
}

func (es *Server) OnTraffic(c gnet.Conn) gnet.Action {
func (s *Server) OnTraffic(c gnet.Conn) gnet.Action {
handleConnection(c)
return gnet.None
}

func main() {
config, err := LoadConfig("config.json")
if err != nil {
log.Printf("config error: %v\n", err)
log.Printf("load config error: %v\n", err)
}
err = initServer(config)
if err != nil {
if err = initDB(config); err != nil {
log.Printf("init db error: %v\n", err)
}
if err = RunServer(config); err != nil {
log.Printf("init server error: %v\n", err)
}
log.Println("rotom server is up.")
server.Run()
}

func handleConnection(conn gnet.Conn) {
func handleConnection(conn net.Conn) {
resp := NewResp(conn)
for {
value, err := resp.Read()
Expand All @@ -52,7 +53,7 @@ func handleConnection(conn gnet.Conn) {
}
}

func processCommand(conn gnet.Conn, cmdStr []byte, args []Value) {
func processCommand(conn net.Conn, cmdStr []byte, args []Value) {
writer := NewWriter(conn)

cmd := lookupCommand(b2s(cmdStr))
Expand All @@ -63,7 +64,7 @@ func processCommand(conn gnet.Conn, cmdStr []byte, args []Value) {
}

// check command args
if len(args) < 2 {
if len(args) < cmd.arity {
result := Value{
typ: TypeError,
str: []byte(fmt.Sprintf("ERR wrong number of arguments for '%s' command", cmd.name)),
Expand All @@ -72,12 +73,12 @@ func processCommand(conn gnet.Conn, cmdStr []byte, args []Value) {
return
}

if b2s(cmdStr) == "set" || b2s(cmdStr) == "hset" {
// Manually constructing the array slice to include command and args.
if b2s(cmdStr) == "set" || b2s(cmdStr) == "hset" || b2s(cmdStr) == "hdel" {
// manually constructing the array slice to include command and args
values := make([]Value, len(args)+1)
values[0] = Value{typ: TypeBulk, bulk: cmdStr}
copy(values[1:], args)
server.db.aof.Write(Value{typ: TypeArray, array: values})
db.aof.Write(Value{typ: TypeArray, array: values})
}

result := cmd.handler(args)
Expand Down
Loading

0 comments on commit 9baf806

Please sign in to comment.