Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.5.0 #13

Merged
merged 8 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.16
go-version: 1.18
- name: Test
run: go test -v ./...
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

[4]: LICENSE

[5]: https://img.shields.io/badge/go-%3E%3D1.16-30dff3?style=flat-square&logo=go
[5]: https://img.shields.io/badge/go-%3E%3D1.18-30dff3?style=flat-square&logo=go

[6]: https://github.com/lxzan/gws

Expand All @@ -34,14 +34,15 @@
- [Upgrade from HTTP](#upgrade-from-http)
- [Unix Domain Socket](#unix-domain-socket)
- [Broadcast](#broadcast)
- [Write JSON](#write-json)
- [Autobahn Test](#autobahn-test)
- [Benchmark](#benchmark)
- [Communication](#communication)
- [Acknowledgments](#acknowledgments)

### Highlight

- No dependency
- Single dependency
- IO multiplexing support, concurrent message processing and asynchronous non-blocking message writing
- High IOPS and low latency, low CPU usage
- Support fast parsing WebSocket protocol directly from TCP, faster handshake, 30% lower memory usage
Expand Down Expand Up @@ -207,6 +208,11 @@ func Broadcast(conns []*gws.Conn, opcode gws.Opcode, payload []byte) {
}
```

#### Write JSON
```go
socket.WriteAny(gws.JsonCodec, gws.OpcodeText, data)
```

### Autobahn Test

```bash
Expand Down
84 changes: 79 additions & 5 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,55 @@ package gws

import (
"bytes"
"compress/flate"
"encoding/binary"
"github.com/klauspost/compress/flate"
"github.com/lxzan/gws/internal"
"io"
"math"
"sync"
"sync/atomic"
)

const numCompressor = 32

type compressors struct {
sync.RWMutex
serial uint64
compressors [12][numCompressor]*compressor
}

func (c *compressors) Select(level int) *compressor {
var i = level + 2
var j = atomic.AddUint64(&c.serial, 1) & (numCompressor - 1)
c.RLock()
var cps = c.compressors[i][j]
c.RUnlock()

if cps == nil {
c.Lock()
cps = newCompressor(level)
c.compressors[i][j] = cps
c.Unlock()
}
return cps
}

func newCompressor(level int) *compressor {
fw, _ := flate.NewWriter(nil, level)
return &compressor{fw: fw}
return &compressor{fw: fw, mu: &sync.Mutex{}}
}

// 压缩器
type compressor struct {
mu *sync.Mutex
fw *flate.Writer
}

// Compress 压缩
func (c *compressor) Compress(content []byte, buf *bytes.Buffer) error {
c.mu.Lock()
defer c.mu.Unlock()

c.fw.Reset(buf)
if err := internal.WriteN(c.fw, content, len(content)); err != nil {
return err
Expand All @@ -37,20 +67,64 @@ func (c *compressor) Compress(content []byte, buf *bytes.Buffer) error {
return nil
}

func newDecompressor() *decompressor { return &decompressor{fr: flate.NewReader(nil)} }
func (c *compressor) CompressAny(codec Codec, v interface{}, buf *bytes.Buffer) error {
c.mu.Lock()
defer c.mu.Unlock()

c.fw.Reset(buf)
if err := codec.NewEncoder(c.fw).Encode(v); err != nil {
return err
}
if err := c.fw.Flush(); err != nil {
return err
}
if n := buf.Len(); n >= 4 {
compressedContent := buf.Bytes()
if tail := compressedContent[n-4:]; binary.BigEndian.Uint32(tail) == math.MaxUint16 {
buf.Truncate(n - 4)
}
}
return nil
}

type decompressors struct {
serial uint64
decompressors [numCompressor]*decompressor
}

func (c *decompressors) init() *decompressors {
for i, _ := range c.decompressors {
c.decompressors[i] = newDecompressor()
}
return c
}

func (c *decompressors) Select() *decompressor {
var index = atomic.AddUint64(&c.serial, 1) & (numCompressor - 1)
return c.decompressors[index]
}

func newDecompressor() *decompressor {
return &decompressor{fr: flate.NewReader(nil), mu: &sync.Mutex{}}
}

type decompressor struct {
fr io.ReadCloser
mu *sync.Mutex
fr io.ReadCloser
buffer [internal.Lv3]byte
}

// Decompress 解压
func (c *decompressor) Decompress(payload *bytes.Buffer) (*bytes.Buffer, error) {
c.mu.Lock()
defer c.mu.Unlock()

_, _ = payload.Write(internal.FlateTail)
resetter := c.fr.(flate.Resetter)
_ = resetter.Reset(payload, nil) // must return a null pointer

var buf = _bpool.Get(3 * payload.Len())
_, err := io.Copy(buf, c.fr)
_, err := io.CopyBuffer(buf, c.fr, c.buffer[0:])
_bpool.Put(payload)
return buf, err
}
10 changes: 0 additions & 10 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,10 @@ type Conn struct {
config *Config
// read buffer
rbuf *bufio.Reader
// flate decompressor
decompressor *decompressor
// continuation frame
continuationFrame continuationFrame
// frame header for read
fh frameHeader
// flate compressor
compressor *compressor
// WebSocket Event Handler
handler Event

Expand Down Expand Up @@ -62,12 +58,6 @@ func serveWebSocket(isServer bool, config *Config, session SessionStorage, netCo
readQueue: workerQueue{maxConcurrency: int32(config.ReadAsyncGoLimit), capacity: config.ReadAsyncCap},
writeQueue: workerQueue{maxConcurrency: 1, capacity: config.WriteAsyncCap},
}

if c.compressEnabled {
c.compressor = newCompressor(config.CompressLevel)
c.decompressor = newDecompressor()
}

return c
}

Expand Down
25 changes: 7 additions & 18 deletions examples/chatroom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,17 @@ func main() {
_, _ = writer.Write(html)
})

if err := http.ListenAndServe(":3000", nil); err != nil {
if err := http.ListenAndServe(":8000", nil); err != nil {
log.Fatalf("%+v", err)
}
}

func NewWebSocket() *WebSocket {
return &WebSocket{sessions: gws.NewConcurrentMap(16)}
return &WebSocket{sessions: gws.NewConcurrentMap[string, *gws.Conn](16)}
}

type WebSocket struct {
sessions *gws.ConcurrentMap // 使用内置的ConcurrentMap存储连接, 可以减少锁冲突
sessions *gws.ConcurrentMap[string, *gws.Conn] // 使用内置的ConcurrentMap存储连接, 可以减少锁冲突
}

func (c *WebSocket) getName(socket *gws.Conn) string {
Expand All @@ -69,21 +69,11 @@ func (c *WebSocket) getKey(socket *gws.Conn) string {
return name.(string)
}

// 根据用户名获取WebSocket连接
func (c *WebSocket) GetSocket(name string) (*gws.Conn, bool) {
if v0, ok0 := c.sessions.Load(name); ok0 {
if v1, ok1 := v0.(*gws.Conn); ok1 {
return v1, true
}
}
return nil, false
}

// RemoveSocket 移除WebSocket连接
func (c *WebSocket) RemoveSocket(socket *gws.Conn) {
name := c.getName(socket)
key := c.getKey(socket)
if mSocket, ok := c.GetSocket(name); ok {
if mSocket, ok := c.sessions.Load(name); ok {
if mKey := c.getKey(mSocket); mKey == key {
c.sessions.Delete(name)
}
Expand All @@ -92,8 +82,7 @@ func (c *WebSocket) RemoveSocket(socket *gws.Conn) {

func (c *WebSocket) OnOpen(socket *gws.Conn) {
name := c.getName(socket)
if v, ok := c.sessions.Load(name); ok {
var conn = v.(*gws.Conn)
if conn, ok := c.sessions.Load(name); ok {
conn.WriteClose(1000, []byte("connection replaced"))
}
socket.SetDeadline(time.Now().Add(3 * PingInterval))
Expand Down Expand Up @@ -134,7 +123,7 @@ func (c *WebSocket) OnMessage(socket *gws.Conn, message *gws.Message) {

var input = &Input{}
_ = json.Unmarshal(message.Data.Bytes(), input)
if v, ok := c.sessions.Load(input.To); ok {
v.(*gws.Conn).WriteMessage(gws.OpcodeText, message.Data.Bytes())
if conn, ok := c.sessions.Load(input.To); ok {
conn.WriteMessage(gws.OpcodeText, message.Data.Bytes())
}
}
13 changes: 11 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
module github.com/lxzan/gws

go 1.16
go 1.18

require github.com/stretchr/testify v1.8.1
require (
github.com/klauspost/compress v1.16.5
github.com/stretchr/testify v1.8.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI=
github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
12 changes: 12 additions & 0 deletions init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package gws

import "github.com/lxzan/gws/internal"

var (
_bpool = internal.NewBufferPool()
_cps = new(compressors)
_dps = new(decompressors).init()
_padding = frameHeader{}

JsonCodec = new(jsonCodec)
)
4 changes: 1 addition & 3 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"github.com/lxzan/gws/internal"
)

var _bpool = internal.NewBufferPool()

func (c *Conn) checkMask(enabled bool) error {
// RFC6455: All frames sent from client to server have this bit set to 1.
if (c.isServer && !enabled) || (!c.isServer && enabled) {
Expand Down Expand Up @@ -149,7 +147,7 @@ func (c *Conn) readMessage() error {

func (c *Conn) emitMessage(msg *Message, compressed bool) error {
if compressed {
data, err := c.decompressor.Decompress(msg.Data)
data, err := _dps.Select().Decompress(msg.Data)
if err != nil {
return internal.NewError(internal.CloseInternalServerErr, err)
}
Expand Down
Loading