Skip to content

Commit

Permalink
stash
Browse files Browse the repository at this point in the history
  • Loading branch information
xgzlucario committed Nov 4, 2023
1 parent 4eff604 commit f093dde
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 88 deletions.
24 changes: 20 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strconv"
"time"

"github.com/bytedance/sonic"
cache "github.com/xgzlucario/GigaCache"
"github.com/xgzlucario/rotom/base"
)
Expand Down Expand Up @@ -111,9 +110,7 @@ func (c *Client) HKeys(key string) ([]string, error) {
if err != nil {
return nil, err
}
var keys []string
err = sonic.Unmarshal(args, &keys)
return keys, err
return base.ParseStrSlice(args), err
}

// HRemove
Expand Down Expand Up @@ -242,6 +239,25 @@ func (c *Client) BitArray(key string) ([]uint32, error) {
return base.ParseU32Slice(res), nil
}

// ZAdd
func (c *Client) ZAdd(key, field string, score float64, val []byte) error {
return c.doNoRes(NewCodec(OpZAdd).Str(key).Str(field).Float(score).Bytes(val))
}

// ZIncr
func (c *Client) ZIncr(key, field string, score float64) (float64, error) {
args, err := c.do(NewCodec(OpZIncr).Str(key).Str(field).Float(score))
if err != nil {
return 0, err
}
return strconv.ParseFloat(*b2s(args), 64)
}

// ZRemove
func (c *Client) ZRemove(key, field string) error {
return c.doNoRes(NewCodec(OpZRemove).Str(key).Str(field))
}

// Close
func (c *Client) Close() error {
return c.c.Close()
Expand Down
2 changes: 1 addition & 1 deletion codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *Codec) StrSlice(v []string) *Codec {
return s.format(base.FormatStrSlice(v))
}

func (s *Codec) Type(v VType) *Codec {
func (s *Codec) Type(v Type) *Codec {
return s.format([]byte{byte(v)})
}

Expand Down
49 changes: 32 additions & 17 deletions rotom.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ var cmdTable = []Cmd{
return nil
}

vType := VType(args[0][0])
Type := Type(args[0][0])

switch vType {
switch Type {
case TypeString:
e.m.SetTx(string(args[1]), args[3], ts)

Expand Down Expand Up @@ -122,8 +122,15 @@ var cmdTable = []Cmd{
}
e.m.Set(string(args[1]), m)

case TypeZSet:
m := structx.NewZSet[string, float64, []byte]()
if err := m.UnmarshalJSON(args[3]); err != nil {
return err
}
e.m.Set(string(args[1]), m)

default:
return fmt.Errorf("%v: %d", base.ErrUnSupportDataType, vType)
return fmt.Errorf("%w: %d", base.ErrUnSupportDataType, Type)
}

return nil
Expand Down Expand Up @@ -337,21 +344,24 @@ var cmdTable = []Cmd{
}},
// zset
{OpZAdd, 4, func(e *Engine, args [][]byte, _ base.Writer) error {
// key, score, val
// key, field, score, val
s, err := strconv.ParseFloat(string(args[2]), 64)
if err != nil {
return err
}
return e.ZAdd(string(args[0]), string(args[1]), s, args[3])
}},
{OpZIncr, 3, func(e *Engine, args [][]byte, w base.Writer) error {
// key, score, val
// key, field, score
s, err := strconv.ParseFloat(string(args[2]), 64)
if err != nil {
return err
}
_, err = e.ZIncr(string(args[0]), string(args[1]), s)
return err
res, err := e.ZIncr(string(args[0]), string(args[1]), s)
if err != nil {
return err
}
return w.Write([]byte(strconv.FormatFloat(res, 'f', -1, 64)))
}},
{OpZRemove, 2, func(e *Engine, args [][]byte, w base.Writer) error {
// key, val
Expand All @@ -367,11 +377,11 @@ var cmdTable = []Cmd{
}},
}

// VType is value type for Set Operation.
type VType byte
// Type is the data type for Rotom.
type Type byte

const (
TypeString VType = iota + 1
TypeString Type = iota + 1
TypeMap
TypeSet
TypeList
Expand Down Expand Up @@ -406,15 +416,17 @@ var (
SyncPolicy: base.EveryInterval,
SyncInterval: time.Second,
ShrinkInterval: time.Minute,
MonitorIerval: time.Minute,
RunSkipLoadError: true,
Logger: slog.Default(),
}

// No persistent config
NoPersistentConfig = &Config{
ShardCount: 1024,
SyncPolicy: base.Never,
Logger: slog.Default(),
ShardCount: 1024,
SyncPolicy: base.Never,
MonitorIerval: time.Minute,
Logger: slog.Default(),
}
)

Expand All @@ -428,6 +440,7 @@ type Config struct {

SyncInterval time.Duration // Sync to disk interval.
ShrinkInterval time.Duration // Shrink db file interval.
MonitorIerval time.Duration // Monitor interval.

RunSkipLoadError bool // Starts when loading db file error.

Expand Down Expand Up @@ -476,7 +489,7 @@ func Open(conf *Config) (*Engine, error) {

// runtime monitor.
if e.Logger != nil {
e.tickers[0] = base.NewTicker(ctx, time.Minute, func() {
e.tickers[0] = base.NewTicker(ctx, e.MonitorIerval, func() {
e.printRuntimeStats()
})
}
Expand Down Expand Up @@ -1061,10 +1074,10 @@ func (e *Engine) shrink() {
return
}

var _type VType
var _type Type
// Marshal any
data, err := e.m.MarshalBytesFunc(func(key string, v any, i int64) {
switch v.(type) {
switch v := v.(type) {
case Map:
_type = TypeString
case BitMap:
Expand All @@ -1073,8 +1086,10 @@ func (e *Engine) shrink() {
_type = TypeList
case Set:
_type = TypeSet
case ZSet:
_type = TypeZSet
default:
panic(base.ErrUnSupportDataType)
panic(fmt.Errorf("%w: %d", base.ErrUnSupportDataType, v))
}

// SetTx
Expand Down
Loading

0 comments on commit f093dde

Please sign in to comment.