Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop goroutines when closing a connection #39

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cookie.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ func (c Cookie) replyChecked() ([]byte, error) {
return reply, nil
case err := <-c.errorChan:
return nil, err
case <-c.conn.done:
return nil, errors.New("X connection was closed")
}
}

Expand All @@ -121,6 +123,8 @@ func (c Cookie) replyUnchecked() ([]byte, error) {
return reply, nil
case <-c.pingChan:
return nil, nil
case <-c.conn.done:
return nil, errors.New("X connection was closed")
}
}

Expand Down Expand Up @@ -161,5 +165,7 @@ func (c Cookie) Check() error {
return err
case <-c.pingChan:
return nil
case <-c.conn.done:
return errors.New("X connection was closed")
}
}
135 changes: 84 additions & 51 deletions xgb.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ type Conn struct {
xidChan chan xid
seqChan chan uint16
reqChan chan *request
closing chan chan struct{}
done chan struct{}
wg sync.WaitGroup

// ExtLock is a lock used whenever new extensions are initialized.
// It should not be used. It is exported for use in the extension
Expand Down Expand Up @@ -101,7 +102,7 @@ func NewConnDisplay(display string) (*Conn, error) {
return postNewConn(conn)
}

// NewConnDisplay is just like NewConn, but allows a specific net.Conn
// NewConnNet is just like NewConn, but allows a specific net.Conn
// to be used.
func NewConnNet(netConn net.Conn) (*Conn, error) {
conn := &Conn{}
Expand All @@ -125,8 +126,9 @@ func postNewConn(conn *Conn) (*Conn, error) {
conn.seqChan = make(chan uint16, seqBuffer)
conn.reqChan = make(chan *request, reqBuffer)
conn.eventChan = make(chan eventOrError, eventBuffer)
conn.closing = make(chan chan struct{}, 1)
conn.done = make(chan struct{})

conn.wg.Add(4)
go conn.generateXIds()
go conn.generateSeqIds()
go conn.sendRequests()
Expand All @@ -137,7 +139,19 @@ func postNewConn(conn *Conn) (*Conn, error) {

// Close gracefully closes the connection to the X server.
func (c *Conn) Close() {
close(c.reqChan)
c.broadcastDone()
c.wg.Wait()
c.conn.Close()
c.conn = nil
}

func (c *Conn) broadcastDone() {
select {
case <-c.done:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What sends on this channel?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When readResponses hits an unrecoverable read error on line 435 and 463, it closes this channel by calling broadcastDone, thus signaling to all goroutines that they should shutdown. Since receive on a closed channel does not block, having:

select {
case <-c.done:
   cleanup()
   return
case ...:
   ...
}

in a goroutine will shut it down after broadcastDone.

I am doing a nonblocking select here to prevent c.done from being closed twice, since that will trigger a panic.

return
default:
close(c.done)
}
}

// Event is an interface that can contain any of the events returned by the
Expand Down Expand Up @@ -217,8 +231,9 @@ type xid struct {
// This needs to be updated to use the XC Misc extension once we run out of
// new ids.
// Thanks to libxcb/src/xcb_xid.c. This code is greatly inspired by it.
func (conn *Conn) generateXIds() {
defer close(conn.xidChan)
func (c *Conn) generateXIds() {
defer c.wg.Done()
defer close(c.xidChan)

// This requires some explanation. From the horse's mouth:
// "The resource-id-mask contains a single contiguous set of bits (at least
Expand All @@ -236,23 +251,22 @@ func (conn *Conn) generateXIds() {
// 00111000 & 11001000 = 00001000.
// And we use that value to increment the last resource id to get a new one.
// (And then, of course, we OR it with resource-id-base.)
inc := conn.setupResourceIdMask & -conn.setupResourceIdMask
max := conn.setupResourceIdMask
inc := c.setupResourceIdMask & -c.setupResourceIdMask
max := c.setupResourceIdMask
last := uint32(0)
for {
// TODO: Use the XC Misc extension to look for released ids.
var id xid
if last > 0 && last >= max-inc+1 {
conn.xidChan <- xid{
id: 0,
err: errors.New("There are no more available resource" +
"identifiers."),
}
// TODO: Use the XC Misc extension to look for released ids.
id.err = errors.New("there are no more available resource identifiers")
} else {
last += inc
id.id = last | c.setupResourceIdBase
}

last += inc
conn.xidChan <- xid{
id: last | conn.setupResourceIdBase,
err: nil,
select {
case <-c.done:
return
case c.xidChan <- id:
}
}
}
Expand All @@ -271,15 +285,20 @@ func (c *Conn) newSequenceId() uint16 {
// N.B. As long as the cookie buffer is less than 2^16, there are no limitations
// on the number (or kind) of requests made in sequence.
func (c *Conn) generateSeqIds() {
defer c.wg.Done()
defer close(c.seqChan)

seqid := uint16(1)
for {
c.seqChan <- seqid
if seqid == uint16((1<<16)-1) {
seqid = 0
} else {
seqid++
select {
case <-c.done:
return
case c.seqChan <- seqid:
if seqid == uint16((1<<16)-1) {
seqid = 0
} else {
seqid++
}
}
}
}
Expand Down Expand Up @@ -315,37 +334,53 @@ type request struct {
// edits the generated code for the request you want to issue.
func (c *Conn) NewRequest(buf []byte, cookie *Cookie) {
seq := make(chan struct{})
c.reqChan <- &request{buf: buf, cookie: cookie, seq: seq}
<-seq

select {
case c.reqChan <- &request{buf: buf, cookie: cookie, seq: seq}:
case <-c.done:
// If connection was broken, all goroutines, including `sendRequests`, will be closed.
// This prevents NewRequest from blocking forever in c.reqChan <- &request if reqChan is full.
// We can't close c.reqChan since NewRequest will panic when sending to it.
return
}

select {
case <-seq:
case <-c.done:
}
}

// sendRequests is run as a single goroutine that takes requests and writes
// the bytes to the wire and adds the cookie to the cookie queue.
// It is meant to be run as its own goroutine.
func (c *Conn) sendRequests() {
defer close(c.cookieChan)
defer func() {
c.noop() // Flush the response reading goroutine, ignore error.
c.wg.Done()
}()

for req := range c.reqChan {
// ho there! if the cookie channel is nearly full, force a round
// trip to clear out the cookie buffer.
// Note that we circumvent the request channel, because we're *in*
// the request channel.
if len(c.cookieChan) == cookieBuffer-1 {
if err := c.noop(); err != nil {
// Shut everything down.
break
for {
select {
case req := <-c.reqChan:
// ho there! if the cookie channel is nearly full, force a round
// trip to clear out the cookie buffer.
// Note that we circumvent the request channel, because we're *in*
// the request channel.
if len(c.cookieChan) == cookieBuffer-1 {
if err := c.noop(); err != nil {
// Shut everything down.
return
}
}
req.cookie.Sequence = c.newSequenceId()
c.cookieChan <- req.cookie
c.writeBuffer(req.buf)
close(req.seq)
case <-c.done:
return
}
req.cookie.Sequence = c.newSequenceId()
c.cookieChan <- req.cookie
c.writeBuffer(req.buf)
close(req.seq)
}
response := make(chan struct{})
c.closing <- response
c.noop() // Flush the response reading goroutine, ignore error.
<-response
c.conn.Close()
}

// noop circumvents the usual request sending goroutines and forces a round
Expand All @@ -366,9 +401,8 @@ func (c *Conn) writeBuffer(buf []byte) error {
if _, err := c.conn.Write(buf); err != nil {
Logger.Printf("A write error is unrecoverable: %s", err)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if broadcastDone should be called here, too. writeBuffer returns an error which is ignored. Sending another request will likely fail.

return err
} else {
return nil
}
return nil
}

// readResponses is a goroutine that reads events, errors and
Expand All @@ -381,6 +415,7 @@ func (c *Conn) writeBuffer(buf []byte) error {
// channel. (It is an error if no such cookie exists in this case.)
// Finally, cookies that came "before" this reply are always cleaned up.
func (c *Conn) readResponses() {
defer c.wg.Done()
defer close(c.eventChan)

var (
Expand All @@ -391,18 +426,16 @@ func (c *Conn) readResponses() {

for {
select {
case respond := <-c.closing:
respond <- struct{}{}
case <-c.done:
return
default:
}

buf := make([]byte, 32)
err, seq = nil, 0
if _, err := io.ReadFull(c.conn, buf); err != nil {
Logger.Printf("A read error is unrecoverable: %s", err)
c.eventChan <- err
c.Close()
c.broadcastDone()
continue
}
switch buf[0] {
Expand Down Expand Up @@ -432,7 +465,7 @@ func (c *Conn) readResponses() {
if _, err := io.ReadFull(c.conn, biggerBuf[32:]); err != nil {
Logger.Printf("A read error is unrecoverable: %s", err)
c.eventChan <- err
c.Close()
c.broadcastDone()
continue
}
replyBytes = biggerBuf
Expand Down