-
Notifications
You must be signed in to change notification settings - Fork 2
/
tru_net.go
280 lines (240 loc) · 7.04 KB
/
tru_net.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
// Copyright 2022 Kirill Scherba <[email protected]>. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// TRU process standard golang net.Listener and net.Conn interfaces
package tru
import (
"fmt"
"io"
"net"
"strconv"
"strings"
"sync"
"time"
)
// Trunet is receiver to create Tru net.Listener interface
type Trunet struct {
*Tru
accept acceptChannel
}
type acceptChannel chan *Channel
// Common tru object
var truCommon struct {
*Tru
sync.Mutex
}
// Listener is a tru generic network listener for stream-oriented protocols.
func Listen(network, address string) (listener net.Listener, err error) {
listener, err = newTrunet(address, true)
return
}
// Dial connects to the address on the tru network
func Dial(network, address string) (conn net.Conn, err error) {
trunet, err := newTrunet(":0", true)
if err != nil {
return
}
ch, err := trunet.Connect(address)
if err != nil {
return
}
c := trunet.newConn(ch)
ch.setReader(c.reader)
conn = c
return
}
// newTrunet creates new Trunet object
func newTrunet(address string, useTruCommon bool) (trunet *Trunet, err error) {
trunet = new(Trunet)
// Parse the address and get port value
addr := strings.Split(address, ":")
if len(addr) != 2 {
err = fmt.Errorf("wrong address: %s", address)
return
}
port, err := strconv.Atoi(addr[1])
if err != nil {
err = fmt.Errorf("wrong port %s in address: %s", addr[1], address)
return
}
// Make accept golan channel
trunet.accept = make(acceptChannel)
// Create new tru object or use existing
truCommon.Lock()
defer truCommon.Unlock()
var truObj = truCommon.Tru
if !useTruCommon || truObj == nil {
truObj, err = New(port, trunet.connected, logLevel, Stat(showStats))
if err != nil {
err = fmt.Errorf("can't create new tru object, error: %s", err)
return
}
if useTruCommon {
truCommon.Tru = truObj
}
}
trunet.Tru = truObj
return
}
// reader reads packets from connected peers
func (conn *Conn) reader(ch *Channel, pac *Packet, err error) (processed bool) {
if conn.ch == nil {
return
}
// Check channel destroyed and close Conn (connection)
if err != nil {
if err == ErrChannelDestroyed {
err = conn.Close()
if err != nil {
// if connection already closed
return
}
// tru channel destroyed, connection closed
return
}
// Some other errors (I think it never happens)
log.Error.Println("got error in reader:", err)
return
}
defer func() {
if err := recover(); err != nil {
// This error happens if users application don't read data messages
// sent to it, when connection closed.
log.Debugvv.Println("send on closed channel panic occurred:", err)
}
}()
conn.read.ch <- pac.Data()
return
}
// connected to tru callback function
func (t Trunet) connected(ch *Channel, err error) {
if err != nil {
return
}
t.accept <- ch
}
// Create new Conn object
func (t Trunet) newConn(ch *Channel) *Conn {
return &Conn{ch, t.LocalAddr(), ch.Addr(), connRead{ch: make(connReadChan)}}
}
// Accept waits for and returns the next connection to the listener.
func (t Trunet) Accept() (conn net.Conn, err error) {
ch := <-t.accept
c := t.newConn(ch)
ch.setReader(c.reader)
conn = c
return
}
// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
func (t Trunet) Close() (err error) {
t.Tru.Close()
return
}
// Addr returns the listener's network address.
func (t Trunet) Addr() (addr net.Addr) { return }
// Conn is a generic stream-oriented network connection.
//
// Multiple goroutines may invoke methods on a Conn simultaneously.
type Conn struct {
ch *Channel
localAddr net.Addr
remoteAddr net.Addr
read connRead
}
type connRead struct {
ch connReadChan // Read channel receive data form tru channel reader
buf []byte // Read data which was not send in previuse call
cont bool // Need send os.EOF
}
type connReadChan chan connReadChanData
type connReadChanData []byte
// Read reads data from the connection.
// Read can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetReadDeadline.
func (c *Conn) Read(b []byte) (n int, err error) {
// Return io.EOF
if len(c.read.buf) == 0 && c.read.cont {
c.read.cont = false
err = io.EOF
return
}
// Get data from reader
if len(c.read.buf) == 0 {
chanData, ok := <-c.read.ch
if !ok {
// Read channel closed
err = ErrChannelDestroyed
return
}
c.read.buf = chanData
c.read.cont = true
}
// Copy data to input slice
n = copy(b, c.read.buf)
c.read.buf = c.read.buf[n:]
return
}
// Write writes data to the connection.
// Write can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetWriteDeadline.
func (c Conn) Write(b []byte) (n int, err error) {
if c.ch == nil {
err = ErrChannelDestroyed
return
}
_, err = c.ch.WriteTo(b)
if err == nil {
n = len(b)
}
return
}
// Close closes the connection.
// Any blocked Read or Write operations will be unblocked and return errors.
func (c *Conn) Close() (err error) {
if c.ch == nil {
err = ErrChannelAlreadyDestroyed
return
}
ch := c.ch
c.ch = nil
ch.Close()
close(c.read.ch)
return
}
// LocalAddr returns the local network address, if known.
func (c Conn) LocalAddr() net.Addr { return c.localAddr }
// RemoteAddr returns the remote network address, if known.
func (c Conn) RemoteAddr() net.Addr { return c.remoteAddr }
// SetDeadline sets the read and write deadlines associated
// with the connection. It is equivalent to calling both
// SetReadDeadline and SetWriteDeadline.
//
// A deadline is an absolute time after which I/O operations
// fail instead of blocking. The deadline applies to all future
// and pending I/O, not just the immediately following call to
// Read or Write. After a deadline has been exceeded, the
// connection can be refreshed by setting a deadline in the future.
//
// If the deadline is exceeded a call to Read or Write or to other
// I/O methods will return an error that wraps os.ErrDeadlineExceeded.
// This can be tested using errors.Is(err, os.ErrDeadlineExceeded).
// The error's Timeout method will return true, but note that there
// are other possible errors for which the Timeout method will
// return true even if the deadline has not been exceeded.
//
// An idle timeout can be implemented by repeatedly extending
// the deadline after successful Read or Write calls.
//
// A zero value for t means I/O operations will not time out.
func (c Conn) SetDeadline(t time.Time) (err error) { return }
// SetReadDeadline sets the deadline for future Read calls
// and any currently-blocked Read call.
// A zero value for t means Read will not time out.
func (c Conn) SetReadDeadline(t time.Time) (err error) { return }
// SetWriteDeadline sets the deadline for future Write calls
// and any currently-blocked Write call.
// Even if write times out, it may return n > 0, indicating that
// some of the data was successfully written.
// A zero value for t means Write will not time out.
func (c Conn) SetWriteDeadline(t time.Time) (err error) { return }