Skip to content

Commit

Permalink
fix concurrent close db
Browse files Browse the repository at this point in the history
  • Loading branch information
xgzlucario committed Oct 23, 2023
1 parent c27b9cf commit 0398e41
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 61 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ cpu: 13th Gen Intel(R) Core(TM) i5-13600KF

### Rotom

200 clients inserting a total of 1 million data, completed in 628ms, reaching a qps of 1.59 million, p99 latency is 1.1ms.
200 clients inserting a total of 1 million data, completed in 556ms, reaching a qps of 1.79 million, p99 latency is 1.1ms.

```bash
$ go run client/*.go
1000000 requests cost: 628.747032ms
[qps] 1590415.82 req/sec
[latency] avg: 115.852µs | min: 3.837µs | p50: 49.395µs | p95: 216.273µs | p99: 1.155185ms | max: 15.9881ms
1000000 requests cost: 556.955696ms
[qps] 1795418.85 req/sec
[latency] avg: 98.74µs | min: 3.632µs | p50: 40.903µs | p95: 175.456µs | p99: 1.09595ms | max: 13.305872ms
```

### Redis
Expand Down
8 changes: 4 additions & 4 deletions README_ZN.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ cpu: 13th Gen Intel(R) Core(TM) i5-13600KF

### Rotom

使用 200 个 clients 插入共 100 万数据,663ms 完成,qps 达到 150 万,p99 延迟 1.2ms
使用 200 个 clients 插入共 100 万数据,556ms 完成,qps 达到 179 万,p99 延迟 1.1ms

```bash
$ go run client/*.go
1000000 requests cost: 663.97797ms
[qps] 1506028.48 req/sec
[latency] avg: 119.645µs | min: 4.052µs | p50: 49.464µs | p95: 425.006µs | p99: 1.195428ms | max: 17.713702ms
1000000 requests cost: 556.955696ms
[qps] 1795418.85 req/sec
[latency] avg: 98.74µs | min: 3.632µs | p50: 40.903µs | p95: 175.456µs | p99: 1.09595ms | max: 13.305872ms
```

### Redis
Expand Down
49 changes: 28 additions & 21 deletions rotom.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rotom

import (
"bytes"
"context"
"fmt"
"log/slog"
"os"
Expand Down Expand Up @@ -180,17 +181,24 @@ type Config struct {
type Engine struct {
sync.Mutex
*Config
closed bool
buf *bytes.Buffer
rwbuf *bytes.Buffer
m *cache.GigaCache

// context
ctx context.Context
cancel context.CancelFunc

buf *bytes.Buffer
rwbuf *bytes.Buffer
m *cache.GigaCache
}

// Open opens a database specified by config.
// The file will be created automatically if not exist.
func Open(conf *Config) (*Engine, error) {
ctx, cancel := context.WithCancel(context.Background())
e := &Engine{
Config: conf,
ctx: ctx,
cancel: cancel,
buf: bytes.NewBuffer(nil),
rwbuf: bytes.NewBuffer(nil),
m: cache.New(conf.ShardCount),
Expand All @@ -204,9 +212,9 @@ func Open(conf *Config) (*Engine, error) {
}

// runtime monitor.
e.backend(time.Minute, func() {
e.printRuntimeStats()
})
if e.Logger != nil {
e.backend(time.Minute, func() { e.printRuntimeStats() })
}

if e.SyncPolicy == base.EveryInterval {
// sync buffer to disk.
Expand Down Expand Up @@ -242,15 +250,16 @@ func (e *Engine) Listen(addr string) error {

// Close closes the db engine.
func (e *Engine) Close() error {
e.Lock()
defer e.Unlock()
if e.closed {
select {
case <-e.ctx.Done():
return base.ErrDatabaseClosed
default:
e.Lock()
_, err := e.writeTo(e.buf, e.Path)
e.Unlock()
e.cancel()
return err
}
_, err := e.writeTo(e.buf, e.Path)
e.closed = true

return err
}

// encode
Expand Down Expand Up @@ -1140,21 +1149,18 @@ func (e *Engine) backend(t time.Duration, f func()) {
}
go func() {
for {
time.Sleep(t)
if e.closed {
select {
case <-time.After(t):
f()
case <-e.ctx.Done():
return
}
f()
}
}()
}

// printRuntimeStats
func (e *Engine) printRuntimeStats() {
if e.Logger == nil {
return
}

var stats debug.GCStats
var memStats runtime.MemStats

Expand All @@ -1165,6 +1171,7 @@ func (e *Engine) printRuntimeStats() {
With("alloc", formatSize(memStats.Alloc)).
With("sys", formatSize(memStats.Sys)).
With("gctime", stats.NumGC).
With("heapObj", memStats.HeapObjects).
With("gcpause", stats.PauseTotal/time.Duration(stats.NumGC)).
Info("[Runtime]")
}
Expand Down
27 changes: 0 additions & 27 deletions rotom_bench_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion rotom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestDB(t *testing.T) {

// close
assert.Nil(db.Close())
assert.NotNil(db.Close())
assert.Equal(db.Close(), base.ErrDatabaseClosed)

// load error
os.WriteFile(cfg.Path, []byte("fake data"), 0644)
Expand Down
5 changes: 1 addition & 4 deletions server.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package rotom

import (
"io"

"github.com/panjf2000/gnet/v2"
"github.com/xgzlucario/rotom/base"
)
Expand All @@ -14,7 +12,6 @@ const (
RES_SUCCESS RespCode = iota + 1
RES_ERROR
RES_TIMEOUT
RES_LIMITED
)

type RotomEngine struct {
Expand All @@ -24,7 +21,7 @@ type RotomEngine struct {

// OnTraffic
func (e *RotomEngine) OnTraffic(conn gnet.Conn) gnet.Action {
buf, err := io.ReadAll(conn)
buf, err := conn.Next(-1)
if err != nil {
return gnet.Close
}
Expand Down

0 comments on commit 0398e41

Please sign in to comment.