Skip to content

Commit

Permalink
add async method
Browse files Browse the repository at this point in the history
  • Loading branch information
lxzan committed May 30, 2024
1 parent 834d703 commit 37b60f2
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 32 deletions.
34 changes: 18 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,31 +329,33 @@ or more topics via websocket, and when a message is posted to that topic, all su
package main

import (
"github.com/lxzan/event_emitter"
"github.com/lxzan/gws"
"github.com/lxzan/event_emitter"
"github.com/lxzan/gws"
)

type Socket struct{ *gws.Conn }
type Socket gws.Conn

func NewSocket(conn *gws.Conn) *Socket { return (*Socket)(conn) }

func (c *Socket) GetSubscriberID() int64 {
userId, _ := c.Session().Load("userId")
return userId.(int64)
userId, _ := c.GetMetadata().Load("userId")
return userId.(int64)
}

func (c *Socket) GetMetadata() event_emitter.Metadata {
return c.Conn.Session()
}
func (c *Socket) GetMetadata() event_emitter.Metadata { return c.Conn().Session() }

func Sub(em *event_emitter.EventEmitter[*Socket], topic string, socket *Socket) {
em.Subscribe(socket, topic, func(subscriber *Socket, msg any) {
_ = msg.(*gws.Broadcaster).Broadcast(subscriber.Conn)
})
func (c *Socket) Conn() *gws.Conn { return (*gws.Conn)(c) }

func Sub(em *event_emitter.EventEmitter[int64, *Socket], socket *Socket, topic string) {
em.Subscribe(socket, topic, func(subscriber *Socket, msg any) {
_ = msg.(*gws.Broadcaster).Broadcast(subscriber.Conn())
})
}

func Pub(em *event_emitter.EventEmitter[*Socket], topic string, op gws.Opcode, msg []byte) {
var broadcaster = gws.NewBroadcaster(op, msg)
defer broadcaster.Close()
em.Publish(topic, broadcaster)
func Pub(em *event_emitter.EventEmitter[int64, *Socket], topic string, op gws.Opcode, msg []byte) {
var broadcaster = gws.NewBroadcaster(op, msg)
defer broadcaster.Close()
em.Publish(topic, broadcaster)
}
```

Expand Down
34 changes: 18 additions & 16 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,31 +316,33 @@ func WriteWithTimeout(socket *gws.Conn, p []byte, timeout time.Duration) error {
package main

import (
"github.com/lxzan/event_emitter"
"github.com/lxzan/gws"
"github.com/lxzan/event_emitter"
"github.com/lxzan/gws"
)

type Socket struct{ *gws.Conn }
type Socket gws.Conn

func NewSocket(conn *gws.Conn) *Socket { return (*Socket)(conn) }

func (c *Socket) GetSubscriberID() int64 {
userId, _ := c.Session().Load("userId")
return userId.(int64)
userId, _ := c.GetMetadata().Load("userId")
return userId.(int64)
}

func (c *Socket) GetMetadata() event_emitter.Metadata {
return c.Conn.Session()
}
func (c *Socket) GetMetadata() event_emitter.Metadata { return c.Conn().Session() }

func Sub(em *event_emitter.EventEmitter[*Socket], topic string, socket *Socket) {
em.Subscribe(socket, topic, func(subscriber *Socket, msg any) {
_ = msg.(*gws.Broadcaster).Broadcast(subscriber.Conn)
})
func (c *Socket) Conn() *gws.Conn { return (*gws.Conn)(c) }

func Sub(em *event_emitter.EventEmitter[int64, *Socket], socket *Socket, topic string) {
em.Subscribe(socket, topic, func(subscriber *Socket, msg any) {
_ = msg.(*gws.Broadcaster).Broadcast(subscriber.Conn())
})
}

func Pub(em *event_emitter.EventEmitter[*Socket], topic string, op gws.Opcode, msg []byte) {
var broadcaster = gws.NewBroadcaster(op, msg)
defer broadcaster.Close()
em.Publish(topic, broadcaster)
func Pub(em *event_emitter.EventEmitter[int64, *Socket], topic string, op gws.Opcode, msg []byte) {
var broadcaster = gws.NewBroadcaster(op, msg)
defer broadcaster.Close()
em.Publish(topic, broadcaster)
}
```

Expand Down
9 changes: 9 additions & 0 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ func (c *Conn) WritevAsync(opcode Opcode, payloads [][]byte, callback func(error
})
}

// Async 异步
// 将任务加入发送队列(并发度为1), 执行异步操作
// 注意: 不要加入长时间阻塞的任务
// Add the task to the send queue (concurrency 1), perform asynchronous operation.
// Note: Don't add tasks that are blocking for a long time.
func (c *Conn) Async(f func()) {
c.writeQueue.Push(f)
}

// 执行写入逻辑, 注意妥善维护压缩字典
func (c *Conn) doWrite(opcode Opcode, payload internal.Payload) error {
c.mu.Lock()
Expand Down
20 changes: 20 additions & 0 deletions writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,3 +489,23 @@ func TestConn_Writev(t *testing.T) {
assert.Error(t, err)
})
}

func TestConn_Async(t *testing.T) {
var conn = &Conn{writeQueue: workerQueue{maxConcurrency: 1}}
var wg = sync.WaitGroup{}
wg.Add(100)
var arr1, arr2 []int64
var mu = &sync.Mutex{}
for i := 1; i <= 100; i++ {
var x = int64(i)
arr1 = append(arr1, x)
conn.Async(func() {
mu.Lock()
arr2 = append(arr2, x)
mu.Unlock()
wg.Done()
})
}
wg.Wait()
assert.True(t, internal.IsSameSlice(arr1, arr2))
}

0 comments on commit 37b60f2

Please sign in to comment.