Skip to content

Commit

Permalink
feat: db save command
Browse files Browse the repository at this point in the history
  • Loading branch information
xgzlucario committed Nov 2, 2024
1 parent e990c51 commit 424844a
Show file tree
Hide file tree
Showing 15 changed files with 114 additions and 212 deletions.
78 changes: 58 additions & 20 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package main

import (
"fmt"
"github.com/bytedance/sonic"
"os"
"strconv"
"strings"
"time"

"github.com/xgzlucario/rotom/internal/dict"
"github.com/xgzlucario/rotom/internal/hash"
"github.com/xgzlucario/rotom/internal/list"
"github.com/xgzlucario/rotom/internal/zset"
Expand Down Expand Up @@ -127,7 +127,7 @@ func setCommand(writer *RESPWriter, args []RESP) {

// NX
} else if equalFold(arg, NX) {
if _, ttl := db.dict.Get(key); ttl != dict.KEY_NOT_EXIST {
if _, ttl := db.dict.Get(key); ttl != KEY_NOT_EXIST {
writer.WriteNull()
return
}
Expand All @@ -147,7 +147,7 @@ func incrCommand(writer *RESPWriter, args []RESP) {
key := args[0].ToStringUnsafe()

object, ttl := db.dict.Get(key)
if ttl == dict.KEY_NOT_EXIST {
if ttl == KEY_NOT_EXIST {
db.dict.Set(strings.Clone(key), 1)
writer.WriteInteger(1)
return
Expand Down Expand Up @@ -178,7 +178,7 @@ func incrCommand(writer *RESPWriter, args []RESP) {
func getCommand(writer *RESPWriter, args []RESP) {
key := args[0].ToStringUnsafe()
object, ttl := db.dict.Get(key)
if ttl == dict.KEY_NOT_EXIST {
if ttl == KEY_NOT_EXIST {
writer.WriteNull()
return
}
Expand Down Expand Up @@ -515,48 +515,67 @@ func zpopminCommand(writer *RESPWriter, args []RESP) {
}
}

zset, err := fetchZSet(key)
zs, err := fetchZSet(key)
if err != nil {
writer.WriteError(err)
return
}

size := min(zset.Len(), count)
size := min(zs.Len(), count)
writer.WriteArrayHead(size * 2)
for range size {
key, score := zset.PopMin()
key, score := zs.PopMin()
writer.WriteBulkString(key)
writer.WriteFloat(score)
}
}

func flushdbCommand(writer *RESPWriter, _ []RESP) {
db.dict = dict.New()
db.dict = New()
writer.WriteString("OK")
}

type KVEntry struct {
Type ObjectType `json:"o"`
Key string `json:"k"`
Ttl int64 `json:"t,omitempty"`
Data any `json:"v"`
}

func saveCommand(writer *RESPWriter, _ []RESP) {
dbWriter := NewWriter(1024)
err := db.dict.EncodeTo(dbWriter)
if err != nil {
writer.WriteError(err)
return
}
fs, err := os.Create("rdb.test")
if err != nil {
writer.WriteError(err)
return
}
_, err = dbWriter.FlushTo(fs)
if err != nil {
writer.WriteError(err)
return

batchSize := 100
dbWriter.WriteArrayHead(len(db.dict.data)/batchSize + 1)

var entries []KVEntry
for k, v := range db.dict.data {
entries = append(entries, KVEntry{
Type: getObjectType(v),
Key: k,
Ttl: db.dict.expire[k],
Data: v,
})
if len(entries) == batchSize {
bytes, _ := sonic.Marshal(entries)
dbWriter.WriteBulk(bytes)
entries = entries[:0]
}
}
err = fs.Close()
bytes, _ := sonic.Marshal(entries)
dbWriter.WriteBulk(bytes)

_, err = dbWriter.FlushTo(fs)
if err != nil {
writer.WriteError(err)
return
}
_ = fs.Close()
writer.WriteBulkString("OK")
}

Expand Down Expand Up @@ -613,8 +632,7 @@ func fetchZSet(key []byte, setnx ...bool) (ZSet, error) {

func fetch[T any](key []byte, new func() T, setnx ...bool) (T, error) {
object, ttl := db.dict.Get(b2s(key))

if ttl != dict.KEY_NOT_EXIST {
if ttl != KEY_NOT_EXIST {
v, ok := object.(T)
if !ok {
return v, errWrongType
Expand Down Expand Up @@ -646,3 +664,23 @@ func fetch[T any](key []byte, new func() T, setnx ...bool) (T, error) {

return v, nil
}

func getObjectType(object any) ObjectType {
switch object.(type) {
case string, []byte:
return TypeString
case int:
return TypeInteger
case *hash.Map:
return TypeMap
case *hash.ZipMap:
return TypeZipMap
case *hash.Set:
return TypeSet
case *hash.ZipSet:
return TypeZipSet
case *list.QuickList:
return TypeList
}
return TypeUnknown
}
26 changes: 26 additions & 0 deletions const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package main

type Object interface {
GetType() ObjectType
Marshal() ([]byte, error)
Unmarshal([]byte) error
}

type ObjectType byte

const (
TypeUnknown ObjectType = iota
TypeString
TypeInteger
TypeMap
TypeZipMap
TypeSet
TypeZipSet
TypeList
TypeZSet
)

const (
TTL_FOREVER = -1
KEY_NOT_EXIST = -2
)
81 changes: 1 addition & 80 deletions internal/dict/dict.go → dict.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,9 @@
package dict
package main

import (
"github.com/bytedance/sonic"
"strconv"
"time"
)

type Object interface {
GetType() ObjectType
Encode() ([]byte, error)
Decode([]byte) error
}

type ObjectType byte

const (
TypeString ObjectType = iota + 1
TypeInteger
TypeMap
TypeZipMap
TypeSet
TypeZipSet
TypeList
TypeZSet
)

const (
TTL_FOREVER = -1
KEY_NOT_EXIST = -2
)

// Dict is the hashmap for rotom.
type Dict struct {
data map[string]any
Expand Down Expand Up @@ -124,56 +98,3 @@ func (dict *Dict) EvictExpired() {
}
}
}

type KVEntry struct {
ObjectType ObjectType `json:"p"`
Key string `json:"k"`
Ttl int64 `json:"t"`
Data []byte `json:"v"`
}

type RESPWriter interface {
WriteArrayHead(int)
WriteBulk([]byte)
}

type RESPReader interface {
ReadArrayHead(int)
ReadBulk([]byte)
}

func (dict *Dict) EncodeTo(writer RESPWriter) error {
writer.WriteArrayHead(len(dict.data))
var entry KVEntry

for k, v := range dict.data {
ttl, ok := dict.expire[k]
if !ok {
ttl = TTL_FOREVER
}
entry.Key = k
entry.Ttl = ttl

switch vtype := v.(type) {
case string:
entry.ObjectType = TypeString
entry.Data = []byte(vtype)
case []byte:
entry.ObjectType = TypeString
entry.Data = vtype
case int:
entry.ObjectType = TypeInteger
entry.Data = []byte(strconv.Itoa(vtype))
case Object:
entry.ObjectType = vtype.GetType()
data, err := vtype.Encode()
if err != nil {
return err
}
entry.Data = data
}
entryBytes, _ := sonic.Marshal(entry)
writer.WriteBulk(entryBytes)
}
return nil
}
2 changes: 1 addition & 1 deletion internal/dict/dict_test.go → dict_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dict
package main

import (
"testing"
Expand Down
79 changes: 0 additions & 79 deletions internal/dict/benchmark/main.go

This file was deleted.

8 changes: 4 additions & 4 deletions internal/hash/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ type MapI interface {
Remove(key string) bool
Len() int
Scan(fn func(key string, val []byte))
Encode() ([]byte, error)
Decode([]byte) error
Marshal() ([]byte, error)
Unmarshal([]byte) error
}

var _ MapI = (*Map)(nil)
Expand Down Expand Up @@ -51,10 +51,10 @@ func (m *Map) Scan(fn func(key string, val []byte)) {
}
}

func (m *Map) Encode() ([]byte, error) {
func (m *Map) Marshal() ([]byte, error) {
return sonic.Marshal(m.data)
}

func (m *Map) Decode(src []byte) error {
func (m *Map) Unmarshal(src []byte) error {
return sonic.Unmarshal(src, &m.data)
}
Loading

0 comments on commit 424844a

Please sign in to comment.