-
Notifications
You must be signed in to change notification settings - Fork 7
/
ut_bridge.c
260 lines (224 loc) · 6.61 KB
/
ut_bridge.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stddef.h>
#include <unistd.h>
#include <time.h>
#include <signal.h>
#include <fcntl.h>
#include <syslog.h>
#include <errno.h>
#include <assert.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <sys/select.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <netdb.h>
#include "library.h"
struct bridge_conn_ctx {
int tcpfd;
struct sockaddr_storage server_addr;
struct bridge_conn_ctx *mate_ctx; /* the other connection of the pair */
time_t last_tcp_recv;
time_t last_tcp_send;
size_t tcp_rx_dlen;
char tcp_rx_buf[UT_TCP_RX_BUFFER_SIZE];
};
static inline void init_bridge_conn_ctx_pair(struct bridge_conn_ctx ctx[2])
{
memset(ctx, 0x0, sizeof(*ctx) * 2);
ctx[0].tcpfd = -1;
ctx[0].mate_ctx = &ctx[1];
ctx[1].tcpfd = -1;
ctx[1].mate_ctx = &ctx[0];
}
static inline void bridge_conn_established(struct bridge_conn_ctx *ctx)
{
int b_sockopt = 1;
setsockopt(ctx->tcpfd, IPPROTO_TCP, TCP_NODELAY, &b_sockopt,
sizeof(b_sockopt));
set_nonblock(ctx->tcpfd);
ctx->tcp_rx_dlen = 0;
ctx->last_tcp_recv = ctx->last_tcp_send = time(NULL);
}
static inline void destroy_bridge_connection(struct bridge_conn_ctx *ctx)
{
close(ctx->tcpfd);
ctx->tcpfd = -1;
/* Rewind the receive pointer */
ctx->tcp_rx_dlen = 0;
ctx->last_tcp_recv = ctx->last_tcp_send = 0;
}
static int process_bridge_conn_receive(struct bridge_conn_ctx *ctx)
{
size_t rpos = 0, remain;
time_t current_ts = time(NULL);
int rc;
rc = recv(ctx->tcpfd, ctx->tcp_rx_buf + ctx->tcp_rx_dlen,
UT_TCP_RX_BUFFER_SIZE - ctx->tcp_rx_dlen, 0);
if (rc <= 0) {
syslog(LOG_INFO, "Bridge connection broken.\n");
destroy_bridge_connection(ctx);
return -1;
}
ctx->tcp_rx_dlen += rc;
/* >>>> Handle the received data - begin <<<< */
while ((remain = ctx->tcp_rx_dlen - rpos) >= UT_TCP_HDR_LEN) {
struct ut_tcp_hdr *hdr = (void *)(ctx->tcp_rx_buf + rpos);
/* char *pkt_data = ctx->tcp_rx_buf + rpos + UT_TCP_HDR_LEN; */
size_t pkt_len = ntohs(hdr->data_len);
if (pkt_len == 0) {
/* Keep-alive frame */
ctx->last_tcp_recv = current_ts;
printf("Heartbeat received.\n");
} else if (remain - UT_TCP_HDR_LEN >= pkt_len) {
/* A complete UDP packet seen */
ctx->last_tcp_recv = current_ts;
/* Send to the other side of the bridge */
if (ctx->mate_ctx->tcpfd >= 0) {
int rc = send_all(ctx->mate_ctx->tcpfd, hdr,
UT_TCP_HDR_LEN + pkt_len, 0);
if (rc > 0) {
ctx->mate_ctx->last_tcp_send = current_ts;
} else {
syslog(LOG_INFO, "Bridge connection broken.\n");
destroy_bridge_connection(ctx->mate_ctx);
}
}
} else if (pkt_len > UT_TCP_RX_BUFFER_SIZE - UT_TCP_HDR_LEN) {
/* Illegal length */
syslog(LOG_INFO, "Bogus packet length '%u', dropping the connection.\n",
(unsigned)pkt_len);
destroy_bridge_connection(ctx);
return -1;
} else {
break;
}
/* Prepare buffer pointer for the next frame */
rpos += UT_TCP_HDR_LEN + pkt_len;
}
/* Keep the incomplete packet data in buffer */
if (rpos > 0) {
memmove(ctx->tcp_rx_buf, ctx->tcp_rx_buf + rpos, ctx->tcp_rx_dlen - rpos);
ctx->tcp_rx_dlen -= rpos;
}
/* >>>> Handle the received data - end <<<< */
return 0;
}
static void send_bridge_keepalive(struct bridge_conn_ctx *ctx)
{
struct ut_tcp_hdr hdr;
if (ctx->tcpfd < 0)
return;
hdr.data_len = htons(0);
send_all(ctx->tcpfd, &hdr, UT_TCP_HDR_LEN, 0);
printf("Heartbeat sent. TCP buffer: %lu\n", (unsigned long)ctx->tcp_rx_dlen);;
}
static void print_help(int argc, char *argv[])
{
printf("Usage:\n");
printf(" %s server1:port1 server2:port2 [-d]\n", argv[0]);
}
int main(int argc, char *argv[])
{
struct bridge_conn_ctx conn_pair[2];
int opt;
bool is_daemon = false;
while ((opt = getopt(argc, argv, "dh")) > 0) {
switch (opt) {
case 'd': is_daemon = true; break;
default: print_help(argc, argv); exit(1);
}
}
if (argc - optind < 2) {
print_help(argc, argv);
exit(1);
}
openlog_x("ut-bridge", LOG_PID|LOG_CONS|LOG_PERROR|LOG_NDELAY, LOG_USER);
init_bridge_conn_ctx_pair(conn_pair);
get_sockaddr_inx_pair(argv[optind++], &conn_pair[0].server_addr);
get_sockaddr_inx_pair(argv[optind++], &conn_pair[1].server_addr);
if (is_daemon)
do_daemonize();
signal(SIGPIPE, SIG_IGN);
for (;;) {
fd_set rset;
int maxfd, i;
/* 1. Check both connections and reconnect if neccessary */
for (i = 0; i < 2; i++) {
struct bridge_conn_ctx *ctx = &conn_pair[i];
/* Check and close it if an existing connection is dead */
if (ctx->tcpfd >= 0 &&
time(NULL) - ctx->last_tcp_recv >= TCP_DEAD_TIMEOUT) {
syslog_x(LOG_WARNING, "Close TCP connection due to keepalive failure.\n");
destroy_bridge_connection(ctx);
}
/* Try to reconnect */
if (ctx->tcpfd < 0 && time(NULL) - ctx->last_tcp_send >= 5) {
char s_addr[64] = ""; int port = 0;
sockaddr_to_print(&ctx->server_addr, s_addr, &port);
ctx->tcpfd = socket(AF_INET, SOCK_STREAM, 0);
assert(ctx->tcpfd >= 0);
if (connect(ctx->tcpfd, (struct sockaddr *)&ctx->server_addr,
sizeof_sockaddr(&ctx->server_addr)) == 0) {
bridge_conn_established(ctx);
syslog_x(LOG_INFO, "Connected to server '%s:%d'.\n", s_addr, port);
} else {
destroy_bridge_connection(ctx);
syslog_x(LOG_WARNING, "Failed to connect '%s:%d': %s. Retrying later.\n",
s_addr, port, strerror(errno));
/* Mark the failure time to avoid retrying too fast */
ctx->last_tcp_send = time(NULL);
}
}
}
/* 2. Process receiving of each socket */
FD_ZERO(&rset);
maxfd = -1;
for (i = 0; i < 2; i++) {
struct bridge_conn_ctx *ctx = &conn_pair[i];
if (ctx->tcpfd >= 0) {
FD_SET(ctx->tcpfd, &rset);
SET_IF_LARGER(maxfd, ctx->tcpfd);
}
}
if (maxfd >= 0) {
struct timeval timeo = { 0, 300 * 1000 };
int nfds;
nfds = select(maxfd + 1, &rset, NULL, NULL, &timeo);
if (nfds == 0) {
/* No receive event, just do keep-alive */
goto heartbeat;
} else if (nfds < 0) {
if (errno == EINTR || errno == ERESTART) {
continue;
} else {
syslog_x(LOG_ERR, "*** select() error: %s.\n", strerror(errno));
exit(1);
}
}
} else {
/* Delay for reconnecting */
usleep(300 * 1000);
continue;
}
for (i = 0; i < 2; i++) {
struct bridge_conn_ctx *ctx = &conn_pair[i];
if (ctx->tcpfd >= 0 && FD_ISSET(ctx->tcpfd, &rset))
process_bridge_conn_receive(ctx);
}
heartbeat:
/* 3. Send keep-alive packet */
for (i = 0; i < 2; i++) {
struct bridge_conn_ctx *ctx = &conn_pair[i];
if (ctx->tcpfd >= 0 &&
time(NULL) - ctx->last_tcp_send >= KEEPALIVE_INTERVAL) {
ctx->last_tcp_send = time(NULL);
send_bridge_keepalive(ctx);
}
}
}
return 0;
}