Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.4.8 release #9

Merged
merged 7 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type Event interface {
#### Examples

- [chat room](examples/chatroom/main.go)
- [echo](examples/wss-server/server.go)
- [echo](examples/echo/main.go)

#### Quick Start

Expand All @@ -92,12 +92,12 @@ func main() {
})

http.HandleFunc("/connect", func(writer http.ResponseWriter, request *http.Request) {
socket, err := upgrader.Accept(writer, request)
socket, err := upgrader.Upgrade(writer, request)
if err != nil {
log.Printf("Accept: " + err.Error())
return
}
go socket.Listen()
socket.ReadLoop()
})

if err := http.ListenAndServe(":3000", nil); err != nil {
Expand Down Expand Up @@ -135,7 +135,7 @@ func main() {
log.Printf(err.Error())
return
}
socket.Listen()
socket.ReadLoop()
}

type WebSocket struct {
Expand Down Expand Up @@ -175,11 +175,11 @@ func main() {
app := gin.New()
upgrader := gws.NewUpgrader(new(WebSocket), nil)
app.GET("/connect", func(ctx *gin.Context) {
socket, err := upgrader.Accept(ctx.Writer, ctx.Request)
socket, err := upgrader.Upgrade(ctx.Writer, ctx.Request)
if err != nil {
return
}
go upgrader.Listen(socket)
go upgrader.ReadLoop(socket)
})
if err := app.Run(":8080"); err != nil {
panic(err)
Expand All @@ -190,7 +190,7 @@ func main() {
- HeartBeat

```go
const PingInterval = 5 * time.Second
const PingInterval = 10 * time.Second

type Websocket struct {
gws.BuiltinEventHandler
Expand Down
28 changes: 10 additions & 18 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,25 @@ func newCompressor(level int) *compressor {

// 压缩器
type compressor struct {
buffer *bytes.Buffer
fw *flate.Writer
}

func (c *compressor) Close() {
_bpool.Put(c.buffer)
c.buffer = nil
fw *flate.Writer
}

// Compress 压缩
func (c *compressor) Compress(content *bytes.Buffer) (*bytes.Buffer, error) {
c.buffer = _bpool.Get(content.Len() / 3)
c.fw.Reset(c.buffer)
if err := internal.WriteN(c.fw, content.Bytes(), content.Len()); err != nil {
return nil, err
func (c *compressor) Compress(content []byte, buf *bytes.Buffer) error {
c.fw.Reset(buf)
if err := internal.WriteN(c.fw, content, len(content)); err != nil {
return err
}
if err := c.fw.Flush(); err != nil {
return nil, err
return err
}

if n := c.buffer.Len(); n >= 4 {
compressedContent := c.buffer.Bytes()
if n := buf.Len(); n >= 4 {
compressedContent := buf.Bytes()
if tail := compressedContent[n-4:]; binary.BigEndian.Uint32(tail) == math.MaxUint16 {
c.buffer.Truncate(n - 4)
buf.Truncate(n - 4)
}
}
return c.buffer, nil
return nil
}

func newDecompressor() *decompressor { return &decompressor{fr: flate.NewReader(nil)} }
Expand Down
22 changes: 9 additions & 13 deletions compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@ func TestFlate(t *testing.T) {
var dps = newDecompressor()
var n = internal.AlphabetNumeric.Intn(1024)
var rawText = internal.AlphabetNumeric.Generate(n)
var buf = bytes.NewBufferString("")
buf.Write(rawText)
compressedText, err := cps.Compress(buf)
if err != nil {
var compressedBuf = bytes.NewBufferString("")
if err := cps.Compress(rawText, compressedBuf); err != nil {
as.NoError(err)
return
}

buf.Reset()
buf.Write(compressedText.Bytes())
var buf = bytes.NewBufferString("")
buf.Write(compressedBuf.Bytes())
plainText, err := dps.Decompress(buf)
if err != nil {
as.NoError(err)
Expand All @@ -41,18 +39,16 @@ func TestFlate(t *testing.T) {
var dps = newDecompressor()
var n = internal.AlphabetNumeric.Intn(1024)
var rawText = internal.AlphabetNumeric.Generate(n)
var buf = bytes.NewBufferString("")
buf.Write(rawText)
compressedText, err := cps.Compress(buf)
if err != nil {
var compressedBuf = bytes.NewBufferString("")
if err := cps.Compress(rawText, compressedBuf); err != nil {
as.NoError(err)
return
}

buf.Reset()
buf.Write(compressedText.Bytes())
var buf = bytes.NewBufferString("")
buf.Write(compressedBuf.Bytes())
buf.WriteString("1234")
_, err = dps.Decompress(buf)
_, err := dps.Decompress(buf)
as.Error(err)
})
}
11 changes: 9 additions & 2 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,19 @@ func serveWebSocket(isServer bool, config *Config, session SessionStorage, netCo
return c
}

// Listen listening to websocket messages through a dead loop
// 监听websocket消息
// Listen 监听websocket消息
// Deprecated: Listen will be deprecated in future versions, please use ReadLoop instead.
func (c *Conn) Listen() {
c.ReadLoop()
}

// ReadLoop start a read message loop
// 启动一个读消息的死循环
func (c *Conn) ReadLoop() {
defer c.conn.Close()

c.handler.OnOpen(c)

for {
if err := c.readMessage(); err != nil {
c.emitError(err)
Expand Down
2 changes: 1 addition & 1 deletion examples/autobahn/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func testCase(id int) {
log.Println(err.Error())
return
}
go socket.Listen()
go socket.ReadLoop()
<-handler.onexit
}

Expand Down
4 changes: 2 additions & 2 deletions examples/autobahn/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ func main() {
})

http.HandleFunc("/connect", func(writer http.ResponseWriter, request *http.Request) {
socket, err := upgrader.Accept(writer, request)
socket, err := upgrader.Upgrade(writer, request)
if err != nil {
return
}
go socket.Listen()
socket.ReadLoop()
})

_ = http.ListenAndServe(":3000", nil)
Expand Down
2 changes: 1 addition & 1 deletion examples/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func main() {
log.Printf(err.Error())
return
}
go socket.Listen()
go socket.ReadLoop()

for {
var text = ""
Expand Down
27 changes: 27 additions & 0 deletions examples/echo/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import (
"github.com/lxzan/gws"
"log"
)

func main() {
var app = gws.NewServer(new(Handler), &gws.ServerOption{
CompressEnabled: true,
CheckUtf8Enabled: true,
})
log.Fatalf("%v", app.Run(":3000"))
}

type Handler struct {
gws.BuiltinEventHandler
}

func (c *Handler) OnPing(socket *gws.Conn, payload []byte) {
socket.WritePong(payload)
}

func (c *Handler) OnMessage(socket *gws.Conn, message *gws.Message) {
defer message.Close()
_ = socket.WriteMessage(message.Opcode, message.Bytes())
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
4 changes: 3 additions & 1 deletion protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"io"
)

const frameHeaderSize = 14

type Opcode uint8

const (
Expand Down Expand Up @@ -62,7 +64,7 @@ func (b BuiltinEventHandler) OnPong(socket *Conn, payload []byte) {}

func (b BuiltinEventHandler) OnMessage(socket *Conn, message *Message) {}

type frameHeader [internal.FrameHeaderSize]byte
type frameHeader [frameHeaderSize]byte

func (c *frameHeader) GetFIN() bool {
return ((*c)[0] >> 7) == 1
Expand Down
25 changes: 21 additions & 4 deletions updrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,19 @@ func (c *Upgrader) connectHandshake(r *http.Request, responseHeader http.Header,
}

// Accept http upgrade to websocket protocol
// Deprecated: Accept will be deprecated in future versions, please use Upgrade instead.
func (c *Upgrader) Accept(w http.ResponseWriter, r *http.Request) (*Conn, error) {
return c.Upgrade(w, r)
}

// Upgrade http upgrade to websocket protocol
func (c *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request) (*Conn, error) {
netConn, br, err := c.hijack(w)
if err != nil {
return nil, err
}

socket, err := c.doAccept(r, netConn, br)
socket, err := c.doUpgrade(r, netConn, br)
if err != nil {
_ = netConn.Close()
return nil, err
Expand All @@ -93,7 +99,7 @@ func (c *Upgrader) hijack(w http.ResponseWriter) (net.Conn, *bufio.Reader, error
return netConn, brw.Reader, nil
}

func (c *Upgrader) doAccept(r *http.Request, netConn net.Conn, br *bufio.Reader) (*Conn, error) {
func (c *Upgrader) doUpgrade(r *http.Request, netConn net.Conn, br *bufio.Reader) (*Conn, error) {
var session = new(sliceMap)
var header = c.option.ResponseHeader.Clone()
if !c.option.CheckOrigin(r, session) {
Expand Down Expand Up @@ -138,6 +144,10 @@ func (c *Upgrader) doAccept(r *http.Request, netConn net.Conn, br *bufio.Reader)
type Server struct {
upgrader *Upgrader

// OnConnect 建立连接事件, 用于处理限流, 熔断和安全问题; 返回错误将会断开连接.
// Creates connection events for current limit, fuse and security issues; returning an error will disconnect.
OnConnect func(conn net.Conn) error

// OnError 接收握手过程中产生的错误回调
// Receive error callbacks generated during the handshake
OnError func(conn net.Conn, err error)
Expand All @@ -147,6 +157,7 @@ type Server struct {
// create a websocket server
func NewServer(eventHandler Event, option *ServerOption) *Server {
var c = &Server{upgrader: NewUpgrader(eventHandler, option)}
c.OnConnect = func(conn net.Conn) error { return nil }
c.OnError = func(conn net.Conn, err error) {}
return c
}
Expand Down Expand Up @@ -236,6 +247,12 @@ func (c *Server) serve(listener net.Listener) error {
}

go func() {
if err := c.OnConnect(conn); err != nil {
_ = conn.Close()
c.OnError(conn, err)
return
}

br := bufio.NewReaderSize(conn, c.upgrader.option.ReadBufferSize)
r, err := c.parseRequest(conn, br)
if err != nil {
Expand All @@ -244,13 +261,13 @@ func (c *Server) serve(listener net.Listener) error {
return
}

socket, err := c.upgrader.doAccept(r, conn, br)
socket, err := c.upgrader.doUpgrade(r, conn, br)
if err != nil {
_ = conn.Close()
c.OnError(conn, err)
return
}
socket.Listen()
socket.ReadLoop()
}()
}
}
Loading