Skip to content

Commit

Permalink
feat: add encode/decode form
Browse files Browse the repository at this point in the history
  • Loading branch information
xgzlucario committed Oct 29, 2024
1 parent 8347fd7 commit b7fc868
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 40 deletions.
12 changes: 8 additions & 4 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var cmdTable = []*Command{
{"eval", evalCommand, 2, true},
{"ping", pingCommand, 0, false},
{"flushdb", flushdbCommand, 0, true},
{"save", saveCommand, 2, false},
}

func equalFold(a, b string) bool {
Expand Down Expand Up @@ -100,22 +101,22 @@ func setCommand(writer *RESPWriter, args []RESP) {

// EX
if equalFold(arg, EX) && len(extra) >= 2 {
n, err := extra[1].ToInt()
n, err := extra[1].ToDuration()
if err != nil {
writer.WriteError(errParseInteger)
return
}
ttl = dict.GetNanoTime() + int64(time.Second)*int64(n)
ttl = time.Now().Add(n * time.Second).UnixNano()
extra = extra[2:]

// PX
} else if equalFold(arg, PX) && len(extra) >= 2 {
n, err := extra[1].ToInt()
n, err := extra[1].ToDuration()
if err != nil {
writer.WriteError(errParseInteger)
return
}
ttl = dict.GetNanoTime() + int64(time.Millisecond)*int64(n)
ttl = time.Now().Add(n * time.Millisecond).UnixNano()
extra = extra[2:]

// KEEPTTL
Expand Down Expand Up @@ -529,6 +530,9 @@ func flushdbCommand(writer *RESPWriter, _ []RESP) {
writer.WriteString("OK")
}

func saveCommand(writer *RESPWriter, _ []RESP) {
}

func evalCommand(writer *RESPWriter, args []RESP) {
L := server.lua
script := args[0].ToString()
Expand Down
45 changes: 11 additions & 34 deletions internal/dict/dict.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dict

import (
"sync/atomic"
"time"
)

Expand All @@ -10,27 +9,7 @@ const (
KEY_NOT_EXIST = -2
)

var (
_sec atomic.Uint32
_nsec atomic.Int64
)

func init() {
// init backend ticker
tk := time.NewTicker(time.Millisecond / 10)
go func() {
for t := range tk.C {
_sec.Store(uint32(t.Unix()))
_nsec.Store(t.UnixNano())
}
}()
}

func GetNanoTime() int64 {
return _nsec.Load()
}

// Dict is the hashmap for Rotom.
// Dict is the hashmap for rotom.
type Dict struct {
data map[string]any
expire map[string]int64
Expand All @@ -50,19 +29,20 @@ func (dict *Dict) Get(key string) (any, int) {
return nil, KEY_NOT_EXIST
}

nsec, ok := dict.expire[key]
ts, ok := dict.expire[key]
if !ok {
return data, TTL_FOREVER
}

// key expired
if nsec < _nsec.Load() {
now := time.Now().UnixNano()
if ts < now {
delete(dict.data, key)
delete(dict.expire, key)
return nil, KEY_NOT_EXIST
}

return data, nsec2duration(nsec)
return data, int(ts-now) / int(time.Second)
}

func (dict *Dict) Set(key string, data any) {
Expand All @@ -88,7 +68,7 @@ func (dict *Dict) Delete(key string) bool {

// SetTTL set expire time for key.
// return `0` if key not exist or expired.
// return `1` if set successed.
// return `1` if set success.
func (dict *Dict) SetTTL(key string, ttl int64) int {
_, ok := dict.data[key]
if !ok {
Expand All @@ -97,8 +77,8 @@ func (dict *Dict) SetTTL(key string, ttl int64) int {
}

// check key if already expired
nsec, ok := dict.expire[key]
if ok && nsec < _nsec.Load() {
ts, ok := dict.expire[key]
if ok && ts < time.Now().UnixNano() {
delete(dict.data, key)
delete(dict.expire, key)
return 0
Expand All @@ -111,8 +91,9 @@ func (dict *Dict) SetTTL(key string, ttl int64) int {

func (dict *Dict) EvictExpired() {
var count int
for key, nsec := range dict.expire {
if _nsec.Load() > nsec {
now := time.Now().UnixNano()
for key, ts := range dict.expire {
if now > ts {
delete(dict.expire, key)
delete(dict.data, key)
}
Expand All @@ -122,7 +103,3 @@ func (dict *Dict) EvictExpired() {
}
}
}

func nsec2duration(nsec int64) (second int) {
return int(nsec-_nsec.Load()) / int(time.Second)
}
44 changes: 44 additions & 0 deletions internal/list/list.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package list

import (
"github.com/bytedance/sonic"
"io"
)

// +------------------------------ QuickList -----------------------------+
// | +-----------+ +-----------+ +-----------+ |
// head --- | listpack0 | <-> | listpack1 | <-> ... <-> | listpackN | --- tail
Expand Down Expand Up @@ -129,3 +134,42 @@ func (ls *QuickList) Range(start, stop int, fn func(data []byte)) {
fn(it.Next())
}
}

type ListPackData struct {
Data []byte
Size uint32
}

func (ls *QuickList) Encode(writer io.Writer) error {
var data []ListPackData
for p := ls.head; p != nil; p = p.next {
data = append(data, ListPackData{Data: p.data, Size: p.size})
}
return sonic.ConfigDefault.NewEncoder(writer).Encode(data)
}

func (ls *QuickList) Decode(src []byte) error {
var datas []ListPackData
if err := sonic.Unmarshal(src, &datas); err != nil {
return err
}

// head node
n := newNode()
n.size = datas[0].Size
n.data = datas[0].Data
ls.head = n
ls.tail = n
cur := n

for _, data := range datas[1:] {
n := newNode()
n.size = data.Size
n.data = data.Data
cur.next = n
n.prev = cur
ls.tail = n
cur = n
}
return nil
}
10 changes: 8 additions & 2 deletions resp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"io"
"strconv"
"time"
"unsafe"
)

Expand Down Expand Up @@ -119,8 +120,8 @@ func (w *RESPWriter) WriteArrayHead(arrayLen int) {
}

// WriteBulk writes a RESP bulk string from a byte slice.
func (w *RESPWriter) WriteBulk(bluk []byte) {
w.WriteBulkString(b2s(bluk))
func (w *RESPWriter) WriteBulk(bulk []byte) {
w.WriteBulkString(b2s(bulk))
}

// WriteBulkString writes a RESP bulk string from a string.
Expand Down Expand Up @@ -175,6 +176,11 @@ func (r RESP) ToStringUnsafe() string { return b2s(r) }

func (r RESP) ToInt() (int, error) { return strconv.Atoi(b2s(r)) }

func (r RESP) ToDuration() (time.Duration, error) {
n, err := strconv.Atoi(b2s(r))
return time.Duration(n), err
}

func (r RESP) ToFloat() (float64, error) { return strconv.ParseFloat(b2s(r), 64) }

func (r RESP) Clone() []byte { return bytes.Clone(r) }
Expand Down

0 comments on commit b7fc868

Please sign in to comment.