forked from rancher/remotedialer
-
Notifications
You must be signed in to change notification settings - Fork 3
/
connection.go
121 lines (100 loc) · 2.37 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package remotedialer
import (
"io"
"net"
"time"
"github.com/rancher/remotedialer/metrics"
)
type connection struct {
err error
writeDeadline time.Time
buffer *readBuffer
addr addr
session *Session
connID int64
}
func newConnection(connID int64, session *Session, proto, address string) *connection {
c := &connection{
buffer: newReadBuffer(),
addr: addr{
proto: proto,
address: address,
},
connID: connID,
session: session,
}
metrics.IncSMTotalAddConnectionsForWS(session.clientKey, proto, address)
return c
}
func (c *connection) tunnelClose(err error) {
metrics.IncSMTotalRemoveConnectionsForWS(c.session.clientKey, c.addr.Network(), c.addr.String())
c.writeErr(err)
c.doTunnelClose(err)
}
func (c *connection) doTunnelClose(err error) {
if c.err != nil {
return
}
c.err = err
if c.err == nil {
c.err = io.ErrClosedPipe
}
c.buffer.Close(c.err)
}
func (c *connection) OnData(m *message) error {
return c.buffer.Offer(m.body)
}
func (c *connection) Close() error {
c.session.closeConnection(c.connID, io.EOF)
return nil
}
func (c *connection) Read(b []byte) (int, error) {
n, err := c.buffer.Read(b)
metrics.AddSMTotalReceiveBytesOnWS(c.session.clientKey, float64(n))
return n, err
}
func (c *connection) Write(b []byte) (int, error) {
if c.err != nil {
return 0, io.ErrClosedPipe
}
msg := newMessage(c.connID, b)
metrics.AddSMTotalTransmitBytesOnWS(c.session.clientKey, float64(len(msg.Bytes())))
return c.session.writeMessage(c.writeDeadline, msg)
}
func (c *connection) writeErr(err error) {
if err != nil {
msg := newErrorMessage(c.connID, err)
metrics.AddSMTotalTransmitErrorBytesOnWS(c.session.clientKey, float64(len(msg.Bytes())))
c.session.writeMessage(c.writeDeadline, msg)
}
}
func (c *connection) LocalAddr() net.Addr {
return c.addr
}
func (c *connection) RemoteAddr() net.Addr {
return c.addr
}
func (c *connection) SetDeadline(t time.Time) error {
if err := c.SetReadDeadline(t); err != nil {
return err
}
return c.SetWriteDeadline(t)
}
func (c *connection) SetReadDeadline(t time.Time) error {
c.buffer.deadline = t
return nil
}
func (c *connection) SetWriteDeadline(t time.Time) error {
c.writeDeadline = t
return nil
}
type addr struct {
proto string
address string
}
func (a addr) Network() string {
return a.proto
}
func (a addr) String() string {
return a.address
}