Skip to content

Commit

Permalink
stash
Browse files Browse the repository at this point in the history
  • Loading branch information
satoshi-099 committed Oct 27, 2023
1 parent 10f79d6 commit 04c654a
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 85 deletions.
6 changes: 6 additions & 0 deletions base/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ type Gober interface {
GobDecode([]byte) error
}

// Writer
type Writer interface {
Write([]byte) (int, error)
WriteByte(byte) error
}

// SyncPolicy represents how often data is synced to disk.
type SyncPolicy byte

Expand Down
78 changes: 52 additions & 26 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
package rotom

import (
"errors"
"net"
"time"

cache "github.com/xgzlucario/GigaCache"
"github.com/xgzlucario/rotom/base"
"golang.org/x/exp/slices"
)

var (
// Since there is a limit to the number of concurrent clients,
// which is usually not very large,
// use bpool to reuse the buffer.
bpool = cache.NewBytePoolCap(1000, 512, 512)
)

// Client defines the client that connects to the server.
Expand All @@ -26,6 +19,7 @@ type Client struct {
func NewClient(addr string) (c *Client, err error) {
c = &Client{}
c.c, err = net.Dial("tcp", addr)
c.b = make([]byte, 64)
return
}

Expand All @@ -41,45 +35,65 @@ func (c *Client) SetEx(key string, val []byte, ttl time.Duration) ([]byte, error

// SetTx
func (c *Client) SetTx(key string, val []byte, ts int64) ([]byte, error) {
return c.do(NewCodec(OpSetTx).Type(TypeString).Str(key).Int(ts).Bytes(val))
args, err := c.do(NewCodec(OpSetTx).Type(TypeString).Str(key).Int(ts).Bytes(val))
if err != nil {
return nil, err
}
return args, nil
}

// Remove
func (c *Client) Remove(key string) ([]byte, error) {
return c.do(NewCodec(OpRemove).Str(key))
func (c *Client) Remove(key string) (bool, error) {
args, err := c.do(NewCodec(OpRemove).Str(key))
if err != nil {
return false, err
}
return args[0] == _true, nil
}

// Rename
func (c *Client) Rename(key, newKey string) ([]byte, error) {
return c.do(NewCodec(OpRename).Str(key).Str(newKey))
func (c *Client) Rename(key, newKey string) (bool, error) {
args, err := c.do(NewCodec(OpRename).Str(key).Str(newKey))
if err != nil {
return false, err
}
return args[0] == _true, nil
}

// Get
func (c *Client) Get(key string) ([]byte, error) {
return c.do(NewCodec(ReqGet).Str(key))
args, err := c.do(NewCodec(ReqGet).Str(key))
if err != nil {
return nil, err
}
return args, nil
}

// Len
func (c *Client) Len() (uint64, error) {
res, err := c.do(NewCodec(ReqLen))
if err != nil {
return 0, err
}
_, args, err := NewDecoder(res).ParseRecord()
args, err := c.do(NewCodec(ReqLen))
if err != nil {
return 0, err
}
return base.ParseInt[uint64](args[0]), nil
return base.ParseInt[uint64](args), nil
}

// HSet
func (c *Client) HSet(key, field string, val []byte) ([]byte, error) {
return c.do(NewCodec(OpHSet).Str(key).Str(field).Bytes(val))
args, err := c.do(NewCodec(OpHSet).Str(key).Str(field).Bytes(val))
if err != nil {
return nil, err
}
return args, nil
}

// HRemove
func (c *Client) HRemove(key, field string) ([]byte, error) {
return c.do(NewCodec(OpHRemove).Str(key).Str(field))
args, err := c.do(NewCodec(OpHRemove).Str(key).Str(field))
if err != nil {
return nil, err
}
return args, nil
}

// Close
Expand All @@ -91,17 +105,29 @@ func (c *Client) Close() error {
func (c *Client) do(cd *Codec) ([]byte, error) {
_, err := c.c.Write(cd.B)
cd.Recycle()

if err != nil {
return nil, err
}
c.b = bpool.Get()
defer bpool.Put(c.b)

// read response.
n, err := c.c.Read(c.b)
if err != nil {
return nil, err
}

return slices.Clone(c.b[:n]), nil
// parse data.
op, args, err := NewDecoder(c.b[:n]).ParseRecord()
if err != nil {
return nil, err
}
if op != Response {
return nil, base.ErrInvalidResponse
}

// the first args is response code.
if int64(args[0][0]) == RES_ERROR {
return nil, errors.New(*b2s(args[1]))
}

return args[1], nil
}
77 changes: 42 additions & 35 deletions rotom.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"bytes"
"context"
"fmt"
"io"
"log/slog"
"os"
"runtime"
Expand Down Expand Up @@ -66,12 +65,12 @@ const (
type Cmd struct {
op Operation
argsNum byte
hook func(*Engine, [][]byte, io.Writer) error
hook func(*Engine, [][]byte, base.Writer) error
}

// cmdTable defines the number of parameters required for the operation.
var cmdTable = []Cmd{
{OpSetTx, 4, func(e *Engine, args [][]byte, _ io.Writer) error {
{OpSetTx, 4, func(e *Engine, args [][]byte, _ base.Writer) error {
// type, key, ts, val
ts := base.ParseInt[int64](args[2]) * timeCarry
if ts < cache.GetClock() && ts != noTTL {
Expand Down Expand Up @@ -118,98 +117,106 @@ var cmdTable = []Cmd{

return nil
}},
{OpRemove, 1, func(e *Engine, args [][]byte, w io.Writer) error {
{OpRemove, 1, func(e *Engine, args [][]byte, w base.Writer) error {
// key
e.Remove(*b2s(args[0]))
if e.Remove(*b2s(args[0])) {
w.WriteByte(_true)
} else {
w.WriteByte(_false)
}
return nil
}},
{OpRename, 2, func(e *Engine, args [][]byte, w io.Writer) error {
{OpRename, 2, func(e *Engine, args [][]byte, w base.Writer) error {
// old, new
e.Rename(*b2s(args[0]), *b2s(args[1]))
if e.Rename(*b2s(args[0]), *b2s(args[1])) {
w.WriteByte(_true)
} else {
w.WriteByte(_false)
}
return nil
}},
// map
{OpHSet, 3, func(e *Engine, args [][]byte, _ io.Writer) error {
{OpHSet, 3, func(e *Engine, args [][]byte, _ base.Writer) error {
// key, field, val
return e.HSet(*b2s(args[0]), *b2s(args[1]), args[2])
}},
{OpHRemove, 2, func(e *Engine, args [][]byte, _ io.Writer) error {
{OpHRemove, 2, func(e *Engine, args [][]byte, _ base.Writer) error {
// key, field
return e.HRemove(*b2s(args[0]), *b2s(args[1]))
}},
// set
{OpSAdd, 2, func(e *Engine, args [][]byte, w io.Writer) error {
{OpSAdd, 2, func(e *Engine, args [][]byte, w base.Writer) error {
// key, item
return e.SAdd(*b2s(args[0]), *b2s(args[1]))
}},
{OpSRemove, 2, func(e *Engine, args [][]byte, w io.Writer) error {
{OpSRemove, 2, func(e *Engine, args [][]byte, w base.Writer) error {
// key, item
_, err := e.SRemove(*b2s(args[0]), *b2s(args[1]))
return err
}},
{OpSUnion, 3, func(e *Engine, args [][]byte, w io.Writer) error {
{OpSUnion, 3, func(e *Engine, args [][]byte, w base.Writer) error {
// key1, key2, dest
return e.SUnion(*b2s(args[0]), *b2s(args[1]), *b2s(args[2]))
}},
{OpSInter, 3, func(e *Engine, args [][]byte, w io.Writer) error {
{OpSInter, 3, func(e *Engine, args [][]byte, w base.Writer) error {
// key1, key2, dest
return e.SInter(*b2s(args[0]), *b2s(args[1]), *b2s(args[2]))
}},
{OpSDiff, 3, func(e *Engine, args [][]byte, w io.Writer) error {
{OpSDiff, 3, func(e *Engine, args [][]byte, w base.Writer) error {
// key1, key2, dest
return e.SDiff(*b2s(args[0]), *b2s(args[1]), *b2s(args[2]))
}},
// list
{OpLPush, 2, func(e *Engine, args [][]byte, w io.Writer) error {
{OpLPush, 2, func(e *Engine, args [][]byte, w base.Writer) error {
// key, item
return e.LPush(*b2s(args[0]), *b2s(args[1]))
}},
{OpLPop, 1, func(e *Engine, args [][]byte, w io.Writer) error {
{OpLPop, 1, func(e *Engine, args [][]byte, w base.Writer) error {
// key
_, err := e.LPop(*b2s(args[0]))
return err
}},
{OpRPush, 2, func(e *Engine, args [][]byte, w io.Writer) error {
{OpRPush, 2, func(e *Engine, args [][]byte, w base.Writer) error {
// key, item
return e.RPush(*b2s(args[0]), *b2s(args[1]))
}},
{OpRPop, 1, func(e *Engine, args [][]byte, w io.Writer) error {
{OpRPop, 1, func(e *Engine, args [][]byte, w base.Writer) error {
// key
_, err := e.RPop(*b2s(args[0]))
return err
}},
// bitmap
{OpBitSet, 3, func(e *Engine, args [][]byte, w io.Writer) error {
{OpBitSet, 3, func(e *Engine, args [][]byte, w base.Writer) error {
// key, offset, val
_, err := e.BitSet(*b2s(args[0]), base.ParseInt[uint32](args[1]), args[2][0] == _true)
return err
}},
{OpBitFlip, 2, func(e *Engine, args [][]byte, w io.Writer) error {
{OpBitFlip, 2, func(e *Engine, args [][]byte, w base.Writer) error {
// key, offset
return e.BitFlip(*b2s(args[0]), base.ParseInt[uint32](args[1]))
}},
{OpBitOr, 3, func(e *Engine, args [][]byte, w io.Writer) error {
{OpBitOr, 3, func(e *Engine, args [][]byte, w base.Writer) error {
// key1, key2, dest
return e.BitOr(*b2s(args[0]), *b2s(args[1]), *b2s(args[2]))
}},
{OpBitAnd, 3, func(e *Engine, args [][]byte, w io.Writer) error {
{OpBitAnd, 3, func(e *Engine, args [][]byte, w base.Writer) error {
// key1, key2, dest
return e.BitAnd(*b2s(args[0]), *b2s(args[1]), *b2s(args[2]))
}},
{OpBitXor, 3, func(e *Engine, args [][]byte, w io.Writer) error {
{OpBitXor, 3, func(e *Engine, args [][]byte, w base.Writer) error {
// key1, key2, dest
return e.BitXor(*b2s(args[0]), *b2s(args[1]), *b2s(args[2]))
}},
// zset
{OpZAdd, 4, func(e *Engine, args [][]byte, w io.Writer) error {
{OpZAdd, 4, func(e *Engine, args [][]byte, w base.Writer) error {
// key, score, val
s, err := strconv.ParseFloat(*b2s(args[2]), 64)
if err != nil {
return err
}
return e.ZAdd(*b2s(args[0]), *b2s(args[1]), s, args[3])
}},
{OpZIncr, 3, func(e *Engine, args [][]byte, w io.Writer) error {
{OpZIncr, 3, func(e *Engine, args [][]byte, w base.Writer) error {
// key, score, val
s, err := strconv.ParseFloat(*b2s(args[2]), 64)
if err != nil {
Expand All @@ -218,24 +225,24 @@ var cmdTable = []Cmd{
_, err = e.ZIncr(*b2s(args[0]), *b2s(args[1]), s)
return err
}},
{OpZRemove, 2, func(e *Engine, args [][]byte, w io.Writer) error {
{OpZRemove, 2, func(e *Engine, args [][]byte, w base.Writer) error {
// key, val
return e.ZRemove(*b2s(args[0]), *b2s(args[1]))
}},
// marshal
{OpMarshalBytes, 1, func(e *Engine, args [][]byte, w io.Writer) error {
{OpMarshalBytes, 1, func(e *Engine, args [][]byte, w base.Writer) error {
// val
return e.m.UnmarshalBytes(args[0])
}},
// request
{Response, 2, func(e *Engine, args [][]byte, w io.Writer) error {
{Response, 2, func(e *Engine, args [][]byte, w base.Writer) error {
return nil
}},
{ReqPing, 0, func(e *Engine, args [][]byte, w io.Writer) error {
{ReqPing, 0, func(e *Engine, args [][]byte, w base.Writer) error {
_, err := w.Write([]byte("pong"))
return err
}},
{ReqGet, 1, func(e *Engine, args [][]byte, w io.Writer) error {
{ReqGet, 1, func(e *Engine, args [][]byte, w base.Writer) error {
// key
val, _, err := e.GetBytes(*b2s(args[0]))
if err != nil {
Expand All @@ -244,12 +251,12 @@ var cmdTable = []Cmd{
_, err = w.Write(val)
return err
}},
{ReqLen, 0, func(e *Engine, args [][]byte, w io.Writer) error {
str := strconv.Itoa(int(e.Stat().Len))
_, err := w.Write(s2b(&str))
{ReqLen, 0, func(e *Engine, args [][]byte, w base.Writer) error {
res := base.FormatInt[uint64](e.Stat().Len)
_, err := w.Write(res)
return err
}},
{ReqHLen, 1, func(e *Engine, args [][]byte, w io.Writer) error {
{ReqHLen, 1, func(e *Engine, args [][]byte, w base.Writer) error {
// key
m, err := e.fetchMap(*b2s(args[0]))
if err != nil {
Expand All @@ -259,7 +266,7 @@ var cmdTable = []Cmd{
_, err = w.Write(s2b(&str))
return err
}},
{ReqLLen, 1, func(e *Engine, args [][]byte, w io.Writer) error {
{ReqLLen, 1, func(e *Engine, args [][]byte, w base.Writer) error {
// key
l, err := e.fetchList(*b2s(args[0]))
if err != nil {
Expand Down
Loading

0 comments on commit 04c654a

Please sign in to comment.