From 4329e4a7909986a78b93297926bc51d70406adcf Mon Sep 17 00:00:00 2001 From: Oleh Stolyar Date: Thu, 18 May 2017 10:44:37 +0300 Subject: [PATCH 1/5] Stop goroutines in xgb.Conn.Close() - don't panic when calling Close() twice --- xgb.go | 117 ++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 69 insertions(+), 48 deletions(-) diff --git a/xgb.go b/xgb.go index 3d2c61f..ebd012e 100644 --- a/xgb.go +++ b/xgb.go @@ -60,7 +60,8 @@ type Conn struct { xidChan chan xid seqChan chan uint16 reqChan chan *request - closing chan chan struct{} + done chan chan struct{} + wg sync.WaitGroup // ExtLock is a lock used whenever new extensions are initialized. // It should not be used. It is exported for use in the extension @@ -125,8 +126,9 @@ func postNewConn(conn *Conn) (*Conn, error) { conn.seqChan = make(chan uint16, seqBuffer) conn.reqChan = make(chan *request, reqBuffer) conn.eventChan = make(chan eventOrError, eventBuffer) - conn.closing = make(chan chan struct{}, 1) + conn.done = make(chan chan struct{}, 1) + conn.wg.Add(4) go conn.generateXIds() go conn.generateSeqIds() go conn.sendRequests() @@ -137,7 +139,18 @@ func postNewConn(conn *Conn) (*Conn, error) { // Close gracefully closes the connection to the X server. func (c *Conn) Close() { - close(c.reqChan) + c.close() + c.wg.Wait() + c.conn.Close() +} + +func (c *Conn) close() { + select { + case <-c.done: + return + default: + close(c.done) + } } // Event is an interface that can contain any of the events returned by the @@ -217,8 +230,9 @@ type xid struct { // This needs to be updated to use the XC Misc extension once we run out of // new ids. // Thanks to libxcb/src/xcb_xid.c. This code is greatly inspired by it. -func (conn *Conn) generateXIds() { - defer close(conn.xidChan) +func (c *Conn) generateXIds() { + defer c.wg.Done() + defer close(c.xidChan) // This requires some explanation. From the horse's mouth: // "The resource-id-mask contains a single contiguous set of bits (at least @@ -236,23 +250,22 @@ func (conn *Conn) generateXIds() { // 00111000 & 11001000 = 00001000. // And we use that value to increment the last resource id to get a new one. // (And then, of course, we OR it with resource-id-base.) - inc := conn.setupResourceIdMask & -conn.setupResourceIdMask - max := conn.setupResourceIdMask + inc := c.setupResourceIdMask & -c.setupResourceIdMask + max := c.setupResourceIdMask last := uint32(0) for { - // TODO: Use the XC Misc extension to look for released ids. + var id xid if last > 0 && last >= max-inc+1 { - conn.xidChan <- xid{ - id: 0, - err: errors.New("There are no more available resource" + - "identifiers."), - } + // TODO: Use the XC Misc extension to look for released ids. + id.err = errors.New("there are no more available resource identifiers") + } else { + last += inc + id.id = last | c.setupResourceIdBase } - - last += inc - conn.xidChan <- xid{ - id: last | conn.setupResourceIdBase, - err: nil, + select { + case <-c.done: + return + case c.xidChan <- id: } } } @@ -271,15 +284,20 @@ func (c *Conn) newSequenceId() uint16 { // N.B. As long as the cookie buffer is less than 2^16, there are no limitations // on the number (or kind) of requests made in sequence. func (c *Conn) generateSeqIds() { + defer c.wg.Done() defer close(c.seqChan) seqid := uint16(1) for { - c.seqChan <- seqid - if seqid == uint16((1<<16)-1) { - seqid = 0 - } else { - seqid++ + select { + case <-c.done: + return + case c.seqChan <- seqid: + if seqid == uint16((1<<16)-1) { + seqid = 0 + } else { + seqid++ + } } } } @@ -324,28 +342,33 @@ func (c *Conn) NewRequest(buf []byte, cookie *Cookie) { // It is meant to be run as its own goroutine. func (c *Conn) sendRequests() { defer close(c.cookieChan) + defer func() { + c.wg.Done() + c.noop() // Flush the response reading goroutine, ignore error. + c.Close() + }() - for req := range c.reqChan { - // ho there! if the cookie channel is nearly full, force a round - // trip to clear out the cookie buffer. - // Note that we circumvent the request channel, because we're *in* - // the request channel. - if len(c.cookieChan) == cookieBuffer-1 { - if err := c.noop(); err != nil { - // Shut everything down. - break + for { + select { + case req := <-c.reqChan: + // ho there! if the cookie channel is nearly full, force a round + // trip to clear out the cookie buffer. + // Note that we circumvent the request channel, because we're *in* + // the request channel. + if len(c.cookieChan) == cookieBuffer-1 { + if err := c.noop(); err != nil { + // Shut everything down. + return + } } + req.cookie.Sequence = c.newSequenceId() + c.cookieChan <- req.cookie + c.writeBuffer(req.buf) + close(req.seq) + case <-c.done: + return } - req.cookie.Sequence = c.newSequenceId() - c.cookieChan <- req.cookie - c.writeBuffer(req.buf) - close(req.seq) } - response := make(chan struct{}) - c.closing <- response - c.noop() // Flush the response reading goroutine, ignore error. - <-response - c.conn.Close() } // noop circumvents the usual request sending goroutines and forces a round @@ -366,9 +389,8 @@ func (c *Conn) writeBuffer(buf []byte) error { if _, err := c.conn.Write(buf); err != nil { Logger.Printf("A write error is unrecoverable: %s", err) return err - } else { - return nil } + return nil } // readResponses is a goroutine that reads events, errors and @@ -381,6 +403,7 @@ func (c *Conn) writeBuffer(buf []byte) error { // channel. (It is an error if no such cookie exists in this case.) // Finally, cookies that came "before" this reply are always cleaned up. func (c *Conn) readResponses() { + defer c.wg.Done() defer close(c.eventChan) var ( @@ -391,18 +414,16 @@ func (c *Conn) readResponses() { for { select { - case respond := <-c.closing: - respond <- struct{}{} + case <-c.done: return default: } - buf := make([]byte, 32) err, seq = nil, 0 if _, err := io.ReadFull(c.conn, buf); err != nil { Logger.Printf("A read error is unrecoverable: %s", err) c.eventChan <- err - c.Close() + c.close() continue } switch buf[0] { @@ -432,7 +453,7 @@ func (c *Conn) readResponses() { if _, err := io.ReadFull(c.conn, biggerBuf[32:]); err != nil { Logger.Printf("A read error is unrecoverable: %s", err) c.eventChan <- err - c.Close() + c.close() continue } replyBytes = biggerBuf From 4bded00e61d73a5949846125674a030239425fe6 Mon Sep 17 00:00:00 2001 From: Oleh Stolyar Date: Thu, 18 May 2017 16:27:50 +0300 Subject: [PATCH 2/5] Minor fixes, abort on conn.done in cookie.go --- cookie.go | 6 ++++++ xgb.go | 16 ++++++++-------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/cookie.go b/cookie.go index d5cdb29..1a9c667 100644 --- a/cookie.go +++ b/cookie.go @@ -98,6 +98,8 @@ func (c Cookie) replyChecked() ([]byte, error) { return reply, nil case err := <-c.errorChan: return nil, err + case <-c.conn.done: + return nil, errors.New("Connection closed while waiting in 'replyChecked'") } } @@ -121,6 +123,8 @@ func (c Cookie) replyUnchecked() ([]byte, error) { return reply, nil case <-c.pingChan: return nil, nil + case <-c.conn.done: + return nil, errors.New("Connection closed while waiting in 'replyChecked'") } } @@ -161,5 +165,7 @@ func (c Cookie) Check() error { return err case <-c.pingChan: return nil + case <-c.conn.done: + return errors.New("Connection closed while waiting in 'Check'") } } diff --git a/xgb.go b/xgb.go index ebd012e..dd5d550 100644 --- a/xgb.go +++ b/xgb.go @@ -102,7 +102,7 @@ func NewConnDisplay(display string) (*Conn, error) { return postNewConn(conn) } -// NewConnDisplay is just like NewConn, but allows a specific net.Conn +// NewConnNet is just like NewConn, but allows a specific net.Conn // to be used. func NewConnNet(netConn net.Conn) (*Conn, error) { conn := &Conn{} @@ -126,7 +126,7 @@ func postNewConn(conn *Conn) (*Conn, error) { conn.seqChan = make(chan uint16, seqBuffer) conn.reqChan = make(chan *request, reqBuffer) conn.eventChan = make(chan eventOrError, eventBuffer) - conn.done = make(chan chan struct{}, 1) + conn.done = make(chan chan struct{}) conn.wg.Add(4) go conn.generateXIds() @@ -139,12 +139,13 @@ func postNewConn(conn *Conn) (*Conn, error) { // Close gracefully closes the connection to the X server. func (c *Conn) Close() { - c.close() + c.broadcastDone() c.wg.Wait() c.conn.Close() + c.conn = nil } -func (c *Conn) close() { +func (c *Conn) broadcastDone() { select { case <-c.done: return @@ -343,9 +344,8 @@ func (c *Conn) NewRequest(buf []byte, cookie *Cookie) { func (c *Conn) sendRequests() { defer close(c.cookieChan) defer func() { - c.wg.Done() c.noop() // Flush the response reading goroutine, ignore error. - c.Close() + c.wg.Done() }() for { @@ -423,7 +423,7 @@ func (c *Conn) readResponses() { if _, err := io.ReadFull(c.conn, buf); err != nil { Logger.Printf("A read error is unrecoverable: %s", err) c.eventChan <- err - c.close() + c.broadcastDone() continue } switch buf[0] { @@ -453,7 +453,7 @@ func (c *Conn) readResponses() { if _, err := io.ReadFull(c.conn, biggerBuf[32:]); err != nil { Logger.Printf("A read error is unrecoverable: %s", err) c.eventChan <- err - c.close() + c.broadcastDone() continue } replyBytes = biggerBuf From 971f49620609625c2e5b57bfc079797b67dea2cf Mon Sep 17 00:00:00 2001 From: Oleh Stolyar Date: Thu, 17 Aug 2017 11:45:02 +0300 Subject: [PATCH 3/5] Don't block forever in Conn.NewRequest --- cookie.go | 6 +++--- xgb.go | 9 +++++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/cookie.go b/cookie.go index 1a9c667..9caa0a9 100644 --- a/cookie.go +++ b/cookie.go @@ -99,7 +99,7 @@ func (c Cookie) replyChecked() ([]byte, error) { case err := <-c.errorChan: return nil, err case <-c.conn.done: - return nil, errors.New("Connection closed while waiting in 'replyChecked'") + return nil, errors.New("X connection was closed") } } @@ -124,7 +124,7 @@ func (c Cookie) replyUnchecked() ([]byte, error) { case <-c.pingChan: return nil, nil case <-c.conn.done: - return nil, errors.New("Connection closed while waiting in 'replyChecked'") + return nil, errors.New("X connection was closed") } } @@ -166,6 +166,6 @@ func (c Cookie) Check() error { case <-c.pingChan: return nil case <-c.conn.done: - return errors.New("Connection closed while waiting in 'Check'") + return errors.New("X connection was closed") } } diff --git a/xgb.go b/xgb.go index dd5d550..4d4ee79 100644 --- a/xgb.go +++ b/xgb.go @@ -333,6 +333,15 @@ type request struct { // In all likelihood, you should be able to copy and paste with some minor // edits the generated code for the request you want to issue. func (c *Conn) NewRequest(buf []byte, cookie *Cookie) { + select { + case <-c.done: + // If connection was broken, the goroutine that processes c.reqChan will be closed + // This will cause NewRequest to block forever in <-seq + // We can't close c.reqChan since NewRequest will panic, potentially crashing the app + return + default: + } + seq := make(chan struct{}) c.reqChan <- &request{buf: buf, cookie: cookie, seq: seq} <-seq From 5735ebe94183457a0f96a4b394989b5fe0eacaf4 Mon Sep 17 00:00:00 2001 From: Oleh Stolyar Date: Thu, 17 Aug 2017 16:42:15 +0300 Subject: [PATCH 4/5] Eliminate race condition in NewRequest --- xgb.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/xgb.go b/xgb.go index 4d4ee79..683cb00 100644 --- a/xgb.go +++ b/xgb.go @@ -333,18 +333,17 @@ type request struct { // In all likelihood, you should be able to copy and paste with some minor // edits the generated code for the request you want to issue. func (c *Conn) NewRequest(buf []byte, cookie *Cookie) { + seq := make(chan struct{}) select { + case c.reqChan <- &request{buf: buf, cookie: cookie, seq: seq}: + case <-seq: + return case <-c.done: - // If connection was broken, the goroutine that processes c.reqChan will be closed - // This will cause NewRequest to block forever in <-seq - // We can't close c.reqChan since NewRequest will panic, potentially crashing the app + // If connection was broken, all goroutines, including `sendRequests`, will be closed. + // This prevents NewRequest from blocking forever in <-seq or in c.reqChan <- &request if reqChan is full. + // We can't close c.reqChan since NewRequest will panic when sending to it. return - default: } - - seq := make(chan struct{}) - c.reqChan <- &request{buf: buf, cookie: cookie, seq: seq} - <-seq } // sendRequests is run as a single goroutine that takes requests and writes From c25c2ccf26de506147610b76f63533d8a7608d55 Mon Sep 17 00:00:00 2001 From: Oleh Stolyar Date: Wed, 20 Sep 2017 14:48:51 +0300 Subject: [PATCH 5/5] Fix select in NewRequest --- xgb.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/xgb.go b/xgb.go index 683cb00..068d160 100644 --- a/xgb.go +++ b/xgb.go @@ -60,7 +60,7 @@ type Conn struct { xidChan chan xid seqChan chan uint16 reqChan chan *request - done chan chan struct{} + done chan struct{} wg sync.WaitGroup // ExtLock is a lock used whenever new extensions are initialized. @@ -126,7 +126,7 @@ func postNewConn(conn *Conn) (*Conn, error) { conn.seqChan = make(chan uint16, seqBuffer) conn.reqChan = make(chan *request, reqBuffer) conn.eventChan = make(chan eventOrError, eventBuffer) - conn.done = make(chan chan struct{}) + conn.done = make(chan struct{}) conn.wg.Add(4) go conn.generateXIds() @@ -334,16 +334,20 @@ type request struct { // edits the generated code for the request you want to issue. func (c *Conn) NewRequest(buf []byte, cookie *Cookie) { seq := make(chan struct{}) + select { case c.reqChan <- &request{buf: buf, cookie: cookie, seq: seq}: - case <-seq: - return case <-c.done: // If connection was broken, all goroutines, including `sendRequests`, will be closed. - // This prevents NewRequest from blocking forever in <-seq or in c.reqChan <- &request if reqChan is full. + // This prevents NewRequest from blocking forever in c.reqChan <- &request if reqChan is full. // We can't close c.reqChan since NewRequest will panic when sending to it. return } + + select { + case <-seq: + case <-c.done: + } } // sendRequests is run as a single goroutine that takes requests and writes