Skip to content

Commit

Permalink
Merge pull request #13 from lxzan/dev
Browse files Browse the repository at this point in the history
v1.5.0
  • Loading branch information
lxzan committed May 5, 2023
2 parents 2ef392d + 25e89fd commit f177ad3
Show file tree
Hide file tree
Showing 13 changed files with 300 additions and 84 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.16
go-version: 1.18
- name: Test
run: go test -v ./...
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

[4]: LICENSE

[5]: https://img.shields.io/badge/go-%3E%3D1.16-30dff3?style=flat-square&logo=go
[5]: https://img.shields.io/badge/go-%3E%3D1.18-30dff3?style=flat-square&logo=go

[6]: https://github.com/lxzan/gws

Expand All @@ -34,14 +34,15 @@
- [Upgrade from HTTP](#upgrade-from-http)
- [Unix Domain Socket](#unix-domain-socket)
- [Broadcast](#broadcast)
- [Write JSON](#write-json)
- [Autobahn Test](#autobahn-test)
- [Benchmark](#benchmark)
- [Communication](#communication)
- [Acknowledgments](#acknowledgments)

### Highlight

- No dependency
- Single dependency
- IO multiplexing support, concurrent message processing and asynchronous non-blocking message writing
- High IOPS and low latency, low CPU usage
- Support fast parsing WebSocket protocol directly from TCP, faster handshake, 30% lower memory usage
Expand Down Expand Up @@ -207,6 +208,11 @@ func Broadcast(conns []*gws.Conn, opcode gws.Opcode, payload []byte) {
}
```

#### Write JSON
```go
socket.WriteAny(gws.JsonCodec, gws.OpcodeText, data)
```

### Autobahn Test

```bash
Expand Down
84 changes: 79 additions & 5 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,55 @@ package gws

import (
"bytes"
"compress/flate"
"encoding/binary"
"github.com/klauspost/compress/flate"
"github.com/lxzan/gws/internal"
"io"
"math"
"sync"
"sync/atomic"
)

const numCompressor = 32

type compressors struct {
sync.RWMutex
serial uint64
compressors [12][numCompressor]*compressor
}

func (c *compressors) Select(level int) *compressor {
var i = level + 2
var j = atomic.AddUint64(&c.serial, 1) & (numCompressor - 1)
c.RLock()
var cps = c.compressors[i][j]
c.RUnlock()

if cps == nil {
c.Lock()
cps = newCompressor(level)
c.compressors[i][j] = cps
c.Unlock()
}
return cps
}

func newCompressor(level int) *compressor {
fw, _ := flate.NewWriter(nil, level)
return &compressor{fw: fw}
return &compressor{fw: fw, mu: &sync.Mutex{}}
}

// 压缩器
type compressor struct {
mu *sync.Mutex
fw *flate.Writer
}

// Compress 压缩
func (c *compressor) Compress(content []byte, buf *bytes.Buffer) error {
c.mu.Lock()
defer c.mu.Unlock()

c.fw.Reset(buf)
if err := internal.WriteN(c.fw, content, len(content)); err != nil {
return err
Expand All @@ -37,20 +67,64 @@ func (c *compressor) Compress(content []byte, buf *bytes.Buffer) error {
return nil
}

func newDecompressor() *decompressor { return &decompressor{fr: flate.NewReader(nil)} }
func (c *compressor) CompressAny(codec Codec, v interface{}, buf *bytes.Buffer) error {
c.mu.Lock()
defer c.mu.Unlock()

c.fw.Reset(buf)
if err := codec.NewEncoder(c.fw).Encode(v); err != nil {
return err
}
if err := c.fw.Flush(); err != nil {
return err
}
if n := buf.Len(); n >= 4 {
compressedContent := buf.Bytes()
if tail := compressedContent[n-4:]; binary.BigEndian.Uint32(tail) == math.MaxUint16 {
buf.Truncate(n - 4)
}
}
return nil
}

type decompressors struct {
serial uint64
decompressors [numCompressor]*decompressor
}

func (c *decompressors) init() *decompressors {
for i, _ := range c.decompressors {
c.decompressors[i] = newDecompressor()
}
return c
}

func (c *decompressors) Select() *decompressor {
var index = atomic.AddUint64(&c.serial, 1) & (numCompressor - 1)
return c.decompressors[index]
}

func newDecompressor() *decompressor {
return &decompressor{fr: flate.NewReader(nil), mu: &sync.Mutex{}}
}

type decompressor struct {
fr io.ReadCloser
mu *sync.Mutex
fr io.ReadCloser
buffer [internal.Lv3]byte
}

// Decompress 解压
func (c *decompressor) Decompress(payload *bytes.Buffer) (*bytes.Buffer, error) {
c.mu.Lock()
defer c.mu.Unlock()

_, _ = payload.Write(internal.FlateTail)
resetter := c.fr.(flate.Resetter)
_ = resetter.Reset(payload, nil) // must return a null pointer

var buf = _bpool.Get(3 * payload.Len())
_, err := io.Copy(buf, c.fr)
_, err := io.CopyBuffer(buf, c.fr, c.buffer[0:])
_bpool.Put(payload)
return buf, err
}
10 changes: 0 additions & 10 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,10 @@ type Conn struct {
config *Config
// read buffer
rbuf *bufio.Reader
// flate decompressor
decompressor *decompressor
// continuation frame
continuationFrame continuationFrame
// frame header for read
fh frameHeader
// flate compressor
compressor *compressor
// WebSocket Event Handler
handler Event

Expand Down Expand Up @@ -62,12 +58,6 @@ func serveWebSocket(isServer bool, config *Config, session SessionStorage, netCo
readQueue: workerQueue{maxConcurrency: int32(config.ReadAsyncGoLimit), capacity: config.ReadAsyncCap},
writeQueue: workerQueue{maxConcurrency: 1, capacity: config.WriteAsyncCap},
}

if c.compressEnabled {
c.compressor = newCompressor(config.CompressLevel)
c.decompressor = newDecompressor()
}

return c
}

Expand Down
25 changes: 7 additions & 18 deletions examples/chatroom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,17 @@ func main() {
_, _ = writer.Write(html)
})

if err := http.ListenAndServe(":3000", nil); err != nil {
if err := http.ListenAndServe(":8000", nil); err != nil {
log.Fatalf("%+v", err)
}
}

func NewWebSocket() *WebSocket {
return &WebSocket{sessions: gws.NewConcurrentMap(16)}
return &WebSocket{sessions: gws.NewConcurrentMap[string, *gws.Conn](16)}
}

type WebSocket struct {
sessions *gws.ConcurrentMap // 使用内置的ConcurrentMap存储连接, 可以减少锁冲突
sessions *gws.ConcurrentMap[string, *gws.Conn] // 使用内置的ConcurrentMap存储连接, 可以减少锁冲突
}

func (c *WebSocket) getName(socket *gws.Conn) string {
Expand All @@ -69,21 +69,11 @@ func (c *WebSocket) getKey(socket *gws.Conn) string {
return name.(string)
}

// 根据用户名获取WebSocket连接
func (c *WebSocket) GetSocket(name string) (*gws.Conn, bool) {
if v0, ok0 := c.sessions.Load(name); ok0 {
if v1, ok1 := v0.(*gws.Conn); ok1 {
return v1, true
}
}
return nil, false
}

// RemoveSocket 移除WebSocket连接
func (c *WebSocket) RemoveSocket(socket *gws.Conn) {
name := c.getName(socket)
key := c.getKey(socket)
if mSocket, ok := c.GetSocket(name); ok {
if mSocket, ok := c.sessions.Load(name); ok {
if mKey := c.getKey(mSocket); mKey == key {
c.sessions.Delete(name)
}
Expand All @@ -92,8 +82,7 @@ func (c *WebSocket) RemoveSocket(socket *gws.Conn) {

func (c *WebSocket) OnOpen(socket *gws.Conn) {
name := c.getName(socket)
if v, ok := c.sessions.Load(name); ok {
var conn = v.(*gws.Conn)
if conn, ok := c.sessions.Load(name); ok {
conn.WriteClose(1000, []byte("connection replaced"))
}
socket.SetDeadline(time.Now().Add(3 * PingInterval))
Expand Down Expand Up @@ -134,7 +123,7 @@ func (c *WebSocket) OnMessage(socket *gws.Conn, message *gws.Message) {

var input = &Input{}
_ = json.Unmarshal(message.Data.Bytes(), input)
if v, ok := c.sessions.Load(input.To); ok {
v.(*gws.Conn).WriteMessage(gws.OpcodeText, message.Data.Bytes())
if conn, ok := c.sessions.Load(input.To); ok {
conn.WriteMessage(gws.OpcodeText, message.Data.Bytes())
}
}
13 changes: 11 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
module github.com/lxzan/gws

go 1.16
go 1.18

require github.com/stretchr/testify v1.8.1
require (
github.com/klauspost/compress v1.16.5
github.com/stretchr/testify v1.8.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI=
github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
12 changes: 12 additions & 0 deletions init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package gws

import "github.com/lxzan/gws/internal"

var (
_bpool = internal.NewBufferPool()
_cps = new(compressors)
_dps = new(decompressors).init()
_padding = frameHeader{}

JsonCodec = new(jsonCodec)
)
4 changes: 1 addition & 3 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"github.com/lxzan/gws/internal"
)

var _bpool = internal.NewBufferPool()

func (c *Conn) checkMask(enabled bool) error {
// RFC6455: All frames sent from client to server have this bit set to 1.
if (c.isServer && !enabled) || (!c.isServer && enabled) {
Expand Down Expand Up @@ -149,7 +147,7 @@ func (c *Conn) readMessage() error {

func (c *Conn) emitMessage(msg *Message, compressed bool) error {
if compressed {
data, err := c.decompressor.Decompress(msg.Data)
data, err := _dps.Select().Decompress(msg.Data)
if err != nil {
return internal.NewError(internal.CloseInternalServerErr, err)
}
Expand Down
Loading

0 comments on commit f177ad3

Please sign in to comment.