Skip to content

Commit

Permalink
stash
Browse files Browse the repository at this point in the history
  • Loading branch information
satoshi-099 committed Nov 3, 2023
1 parent bc4a86b commit 4787df5
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 222 deletions.
11 changes: 11 additions & 0 deletions base/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ type Writer interface {
WriteByte(byte) error
}

// NullWriter
type NullWriter struct{}

func (NullWriter) Write([]byte) (int, error) {
return 0, nil
}

func (NullWriter) WriteByte(byte) error {
return nil
}

// SyncPolicy represents how often data is synced to disk.
type SyncPolicy byte

Expand Down
48 changes: 34 additions & 14 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@ func NewClient(addr string) (c *Client, err error) {
}

// Set
func (c *Client) Set(key string, val []byte) ([]byte, error) {
func (c *Client) Set(key string, val []byte) error {
return c.SetTx(key, val, noTTL)
}

// SetEx
func (c *Client) SetEx(key string, val []byte, ttl time.Duration) ([]byte, error) {
func (c *Client) SetEx(key string, val []byte, ttl time.Duration) error {
return c.SetTx(key, val, cache.GetClock()+int64(ttl))
}

// SetTx
func (c *Client) SetTx(key string, val []byte, ts int64) ([]byte, error) {
return c.do(NewCodec(OpSetTx).Type(TypeString).Str(key).Int(ts / timeCarry).Bytes(val))
func (c *Client) SetTx(key string, val []byte, ts int64) error {
return c.doNoRes(NewCodec(OpSetTx).Type(TypeString).Str(key).Int(ts / timeCarry).Bytes(val))
}

// Remove
Expand Down Expand Up @@ -73,8 +73,7 @@ func (c *Client) Len() (uint64, error) {

// HSet
func (c *Client) HSet(key, field string, val []byte) error {
_, err := c.do(NewCodec(OpHSet).Str(key).Str(field).Bytes(val))
return err
return c.doNoRes(NewCodec(OpHSet).Str(key).Str(field).Bytes(val))
}

// HGet
Expand Down Expand Up @@ -122,8 +121,7 @@ func (c *Client) SAdd(key string, items ...string) (int, error) {

// SRemove
func (c *Client) SRemove(key, item string) error {
_, err := c.do(NewCodec(OpSRemove).Str(key).Str(item))
return err
return c.doNoRes(NewCodec(OpSRemove).Str(key).Str(item))
}

// SHas
Expand Down Expand Up @@ -155,27 +153,49 @@ func (c *Client) SMembers(key string) ([]string, error) {

// SUnion
func (c *Client) SUnion(dstKey string, srcKeys ...string) error {
_, err := c.do(NewCodec(OpSUnion).Str(dstKey).StrSlice(srcKeys))
return err
return c.doNoRes(NewCodec(OpSUnion).Str(dstKey).StrSlice(srcKeys))
}

// SInter
func (c *Client) SInter(dstKey string, srcKeys ...string) error {
_, err := c.do(NewCodec(OpSInter).Str(dstKey).StrSlice(srcKeys))
return err
return c.doNoRes(NewCodec(OpSInter).Str(dstKey).StrSlice(srcKeys))
}

// SDiff
func (c *Client) SDiff(dstKey string, srcKeys ...string) error {
_, err := c.do(NewCodec(OpSDiff).Str(dstKey).StrSlice(srcKeys))
return err
return c.doNoRes(NewCodec(OpSDiff).Str(dstKey).StrSlice(srcKeys))
}

// BitSet
func (c *Client) BitSet(key string, offset uint32, val bool) error {
return c.doNoRes(NewCodec(OpBitSet).Str(key).Uint(offset).Bool(val))
}

// BitTest
func (c *Client) BitTest(key string, offset uint32) (bool, error) {
args, err := c.do(NewCodec(OpBitTest).Str(key).Uint(offset))
if err != nil {
return false, err
}
return args[0] == _true, nil
}

// BitFlip
func (c *Client) BitFlip(key string, offset uint32) error {
return c.doNoRes(NewCodec(OpBitFlip).Str(key).Uint(offset))
}

// Close
func (c *Client) Close() error {
return c.c.Close()
}

// doNoRes do without res.
func (c *Client) doNoRes(cd *Codec) error {
_, err := c.do(cd)
return err
}

// do send request and return response.
func (c *Client) do(cd *Codec) ([]byte, error) {
_, err := c.c.Write(cd.B)
Expand Down
5 changes: 1 addition & 4 deletions examples/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,10 @@ func cmd() {
k := strconv.FormatInt(addnow, 36)

// send
res, err := cli.SetTx(k, []byte(k), addnow)
err := cli.SetTx(k, []byte(k), addnow)
if err != nil {
panic(err)
}
if len(res) > 0 {
panic("error res")
}

// stat
delays.Add(float64(time.Since(now)))
Expand Down
59 changes: 29 additions & 30 deletions rotom.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (
"fmt"
"log/slog"
"os"
"reflect"
"runtime"
"runtime/debug"
"strconv"
"sync"
"time"

"github.com/bytedance/sonic"
"github.com/panjf2000/gnet/v2"
cache "github.com/xgzlucario/GigaCache"
"github.com/xgzlucario/rotom/base"
Expand Down Expand Up @@ -53,6 +53,7 @@ const (
OpLLen
// bitmap
OpBitSet
OpBitTest
OpBitFlip
OpBitOr
OpBitAnd
Expand Down Expand Up @@ -154,38 +155,29 @@ var cmdTable = []Cmd{
}},
{OpHGet, 2, func(e *Engine, args [][]byte, w base.Writer) error {
// key, field
m, err := e.fetchMap(string(args[0]))
res, err := e.HGet(string(args[0]), string(args[1]))
if err != nil {
return err
}
val, ok := m.Get(string(args[1]))
if !ok {
return base.ErrFieldNotFound
}
_, err = w.Write(val)
_, err = w.Write(res)
return err
}},
{OpHLen, 1, func(e *Engine, args [][]byte, w base.Writer) error {
// key
m, err := e.fetchMap(string(args[0]))
n, err := e.HLen(string(args[0]))
if err != nil {
return err
}
res := base.FormatInt[int](m.Len())
_, err = w.Write(res)
_, err = w.Write(base.FormatInt(n))
return err
}},
{OpHKeys, 1, func(e *Engine, args [][]byte, w base.Writer) error {
// key
m, err := e.fetchMap(string(args[0]))
if err != nil {
return err
}
src, err := sonic.Marshal(m.Keys())
res, err := e.HKeys(string(args[0]))
if err != nil {
return err
}
_, err = w.Write(src)
_, err = w.Write(base.FormatStrSlice(res))
return err
}},
{OpHRemove, 2, func(e *Engine, args [][]byte, w base.Writer) error {
Expand Down Expand Up @@ -286,6 +278,14 @@ var cmdTable = []Cmd{
_, err := e.BitSet(string(args[0]), base.ParseInt[uint32](args[1]), args[2][0] == _true)
return err
}},
{OpBitTest, 2, func(e *Engine, args [][]byte, w base.Writer) error {
// key, offset
ok, err := e.BitTest(string(args[0]), base.ParseInt[uint32](args[1]))
if err != nil {
return err
}
return w.WriteByte(bool2byte(ok))
}},
{OpBitFlip, 2, func(e *Engine, args [][]byte, w base.Writer) error {
// key, offset
return e.BitFlip(string(args[0]), base.ParseInt[uint32](args[1]))
Expand Down Expand Up @@ -598,7 +598,6 @@ func (e *Engine) HGet(key, field string) ([]byte, error) {
if err != nil {
return nil, err
}

res, ok := m.Get(field)
if !ok {
return nil, base.ErrFieldNotFound
Expand Down Expand Up @@ -1009,7 +1008,7 @@ func (e *Engine) load() error {
if err != nil {
return err
}
if err := cmdTable[op].hook(e, args, nil); err != nil {
if err := cmdTable[op].hook(e, args, base.NullWriter{}); err != nil {
return err
}
}
Expand All @@ -1024,24 +1023,24 @@ func (e *Engine) shrink() {
return
}

var rec VType
var _type VType
// Marshal any
data, err := e.m.MarshalBytesFunc(func(key string, v any, i int64) {
switch v.(type) {
case Map:
rec = TypeString
_type = TypeString
case BitMap:
rec = TypeBitmap
_type = TypeBitmap
case List:
rec = TypeList
_type = TypeList
case Set:
rec = TypeSet
_type = TypeSet
default:
panic(base.ErrUnSupportDataType)
}

// SetTx
if cd, err := NewCodec(OpSetTx).Type(rec).Str(key).Int(i / timeCarry).Any(v); err == nil {
if cd, err := NewCodec(OpSetTx).Type(_type).Str(key).Int(i / timeCarry).Any(v); err == nil {
e.rwbuf.Write(cd.B)
cd.Recycle()
}
Expand Down Expand Up @@ -1109,22 +1108,22 @@ func (e *Engine) fetchZSet(key string, setnx ...bool) (z ZSet, err error) {
}

// fetch
func fetch[T any](e *Engine, key string, new func() T, setnx ...bool) (T, error) {
func fetch[T any](e *Engine, key string, new func() T, setnx ...bool) (v T, err error) {
m, _, ok := e.m.Get(key)
if ok {
m, ok := m.(T)
if ok {
return m, nil
}
var v T
return v, base.ErrWrongType
return v, fmt.Errorf("%w: %v->%v", base.ErrWrongType, reflect.TypeOf(m), reflect.TypeOf(v))
}
vptr := new()

v = new()
if len(setnx) > 0 && setnx[0] {
e.m.Set(key, vptr)
e.m.Set(key, v)
}

return vptr, nil
return v, nil
}

// formatSize
Expand Down
Loading

0 comments on commit 4787df5

Please sign in to comment.