Skip to content

Commit

Permalink
add ticker for forceStart
Browse files Browse the repository at this point in the history
  • Loading branch information
satoshi-099 committed Oct 31, 2023
1 parent 45ecc1a commit 6460348
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 54 deletions.
3 changes: 2 additions & 1 deletion base/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ var (
ErrInvalidArgs = errors.New("invalid args")
ErrInvalidResponse = errors.New("invalid response")

ErrDatabaseClosed = errors.New("database closed")
ErrDatabaseClosed = errors.New("database closed")
ErrUnSupportOperation = errors.New("unsupport operation")

ErrEmptyList = errors.New("list is empty")
)
56 changes: 56 additions & 0 deletions base/ticker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package base

import (
"context"
"time"
)

// Ticker
type Ticker struct {
ticker *time.Timer
ctx context.Context
f func()
reset chan struct{}
}

// NewTicker return a ticker.
func NewTicker(ctx context.Context, interval time.Duration, f func()) *Ticker {
if interval <= 0 {
panic("invalid interval")
}

t := &Ticker{
ticker: time.NewTimer(interval),
ctx: ctx,
f: f,
reset: make(chan struct{}),
}

go func() {
for {
select {
case <-t.ticker.C:
f()

case <-t.reset:
t.ticker.Reset(interval)

case <-ctx.Done():
return
}
}
}()

return t
}

func (t *Ticker) ForceFunc() error {
select {
case <-t.ctx.Done():
return ErrDatabaseClosed

default:
t.f()
return nil
}
}
58 changes: 27 additions & 31 deletions rotom.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,28 +87,28 @@ var cmdTable = []Cmd{
e.m.SetTx(*b2s(args[1]), args[3], ts)

case TypeList:
var ls List
ls := structx.NewList[string]()
if err := ls.UnmarshalJSON(args[3]); err != nil {
return err
}
e.m.Set(*b2s(args[1]), ls)

case TypeSet:
var s Set
s := structx.NewSet[string]()
if err := s.UnmarshalJSON(args[3]); err != nil {
return err
}
e.m.Set(*b2s(args[1]), s)

case TypeMap:
var m Map
m := structx.NewSyncMap[string, []byte]()
if err := m.UnmarshalJSON(args[3]); err != nil {
return err
}
e.m.Set(*b2s(args[1]), m)

case TypeBitmap:
var m BitMap
m := structx.NewBitmap()
if err := m.UnmarshalBinary(args[3]); err != nil {
return err
}
Expand Down Expand Up @@ -359,10 +359,10 @@ type Config struct {

SyncPolicy base.SyncPolicy // Data sync policy.

SyncInterval time.Duration // Job for db sync to disk.
ShrinkInterval time.Duration // Job for shrink db file size.
SyncInterval time.Duration // Sync to disk interval.
ShrinkInterval time.Duration // Shrink db file interval.

RunSkipLoadError bool // Starts when loading database file error.
RunSkipLoadError bool // Starts when loading db file error.

Logger *slog.Logger // Logger for db, set <nil> if you don't want to use it.
}
Expand All @@ -372,11 +372,13 @@ type Engine struct {
sync.Mutex
*Config

// context
ctx context.Context
cancel context.CancelFunc
// context.
ctx context.Context
cancel context.CancelFunc
tickers [3]*base.Ticker

loading bool // if db loading encode() not allowed.
// if db loading encode() not allowed.
loading bool

buf *bytes.Buffer
rwbuf *bytes.Buffer
Expand All @@ -395,6 +397,7 @@ func Open(conf *Config) (*Engine, error) {
buf: bytes.NewBuffer(nil),
rwbuf: bytes.NewBuffer(nil),
m: cache.New(conf.ShardCount),
tickers: [3]*base.Ticker{},
}

// load db from disk.
Expand All @@ -406,12 +409,14 @@ func Open(conf *Config) (*Engine, error) {

// runtime monitor.
if e.Logger != nil {
e.backend(time.Minute, func() { e.printRuntimeStats() })
e.tickers[0] = base.NewTicker(ctx, time.Minute, func() {
e.printRuntimeStats()
})
}

if e.SyncPolicy == base.EveryInterval {
// sync buffer to disk.
e.backend(e.SyncInterval, func() {
e.tickers[1] = base.NewTicker(ctx, e.SyncInterval, func() {
e.Lock()
n, err := e.writeTo(e.buf, e.Path)
e.Unlock()
Expand All @@ -423,7 +428,7 @@ func Open(conf *Config) (*Engine, error) {
})

// shrink db.
e.backend(e.ShrinkInterval, func() {
e.tickers[2] = base.NewTicker(ctx, e.ShrinkInterval, func() {
e.Lock()
e.shrink()
e.Unlock()
Expand Down Expand Up @@ -1038,6 +1043,14 @@ func (e *Engine) shrink() {
e.logInfo("rotom rewrite done")
}

// ForceShrink force to shrink db file.
func (e *Engine) ForceShrink() error {
if e.tickers[2] == nil {
return base.ErrUnSupportOperation
}
return e.tickers[2].ForceFunc()
}

// fetchMap
func (e *Engine) fetchMap(key string, setnx ...bool) (m Map, err error) {
return fetch(e, key, func() Map {
Expand Down Expand Up @@ -1106,23 +1119,6 @@ func formatSize[T base.Integer](size T) string {
}
}

// backend
func (e *Engine) backend(t time.Duration, f func()) {
if t <= 0 {
panic("invalid interval")
}
go func() {
for {
select {
case <-time.After(t):
f()
case <-e.ctx.Done():
return
}
}
}()
}

// printRuntimeStats
func (e *Engine) printRuntimeStats() {
var stats debug.GCStats
Expand Down
2 changes: 2 additions & 0 deletions rotom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,9 @@ func TestSetAndBitmap(t *testing.T) {
}

// load
assert.Nil(db.ForceShrink())
db.Close()
assert.Equal(db.ForceShrink(), base.ErrDatabaseClosed)

db, err = Open(cfg)
assert.Nil(err)
Expand Down
23 changes: 3 additions & 20 deletions structx/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,11 @@ func NewSet[T comparable](args ...T) *Set[T] {
return s
}

func (s *Set[T]) add(v T) bool {
if s.m.Has(v) {
return false
}
s.m.Put(v, struct{}{})

return true
}

// Add
func (s *Set[T]) Add(v T) bool {
func (s *Set[T]) Add(v T) {
s.Lock()
ok := s.add(v)
s.m.Put(v, struct{}{})
s.Unlock()
return ok
}

// Remove
Expand Down Expand Up @@ -157,18 +147,11 @@ func (s *Set[T]) MarshalJSON() ([]byte, error) {

// UnmarshalJSON
func (s *Set[T]) UnmarshalJSON(src []byte) error {
s.Lock()
defer s.Unlock()

var tmp []T
if err := json.Unmarshal(src, &tmp); err != nil {
return err
}

s.m = NewMap[T, struct{}](len(tmp))
for _, v := range tmp {
s.m.Put(v, struct{}{})
}
*s = *NewSet(tmp...)

return nil
}
3 changes: 1 addition & 2 deletions structx/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ func TestSet(t *testing.T) {
s := NewSet[int]()

for i := 0; i < 10; i++ {
assert.True(s.Add(i))
assert.False(s.Add(i))
s.Add(i)
}
assert.Equal(s.Len(), 10)

Expand Down

0 comments on commit 6460348

Please sign in to comment.