forked from nsqio/libnsq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnsqd_connection.c
119 lines (91 loc) · 3.54 KB
/
nsqd_connection.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#include "nsq.h"
#ifdef DEBUG
#define _DEBUG(...) fprintf(stdout, __VA_ARGS__)
#else
#define _DEBUG(...) do {;} while (0)
#endif
static void nsqd_connection_read_size(struct BufferedSocket *buffsock, void *arg);
static void nsqd_connection_read_data(struct BufferedSocket *buffsock, void *arg);
static void nsqd_connection_connect_cb(struct BufferedSocket *buffsock, void *arg)
{
struct NSQDConnection *conn = (struct NSQDConnection *)arg;
_DEBUG("%s: %p\n", __FUNCTION__, arg);
if (conn->connect_callback) {
conn->connect_callback(conn, conn->arg);
}
buffered_socket_read_bytes(buffsock, 4, nsqd_connection_read_size, conn);
}
static void nsqd_connection_read_size(struct BufferedSocket *buffsock, void *arg)
{
struct NSQDConnection *conn = (struct NSQDConnection *)arg;
uint32_t *msg_size_be;
_DEBUG("%s: %p\n", __FUNCTION__, arg);
msg_size_be = (uint32_t *)buffsock->read_buf->data;
buffer_drain(buffsock->read_buf, 4);
// convert message length header from big-endian
conn->current_msg_size = ntohl(*msg_size_be);
_DEBUG("%s: msg_size = %d bytes \n", __FUNCTION__, conn->current_msg_size);
buffered_socket_read_bytes(buffsock, conn->current_msg_size, nsqd_connection_read_data, conn);
}
static void nsqd_connection_read_data(struct BufferedSocket *buffsock, void *arg)
{
struct NSQDConnection *conn = (struct NSQDConnection *)arg;
uint32_t *frame_type_be;
frame_type_be = (uint32_t *)buffsock->read_buf->data;
conn->current_frame_type = ntohl(*frame_type_be);
buffer_drain(buffsock->read_buf, 4);
conn->current_msg_size -= 4;
_DEBUG("%s: frame type %d, data: %.*s\n", __FUNCTION__, conn->current_frame_type,
conn->current_msg_size, buffsock->read_buf->data);
conn->current_data = buffsock->read_buf->data;
if (conn->data_callback) {
conn->data_callback(conn, conn->arg);
}
buffer_drain(buffsock->read_buf, conn->current_msg_size);
buffered_socket_read_bytes(buffsock, 4, nsqd_connection_read_size, conn);
}
static void nsqd_connection_close_cb(struct BufferedSocket *buffsock, void *arg)
{
struct NSQDConnection *conn = (struct NSQDConnection *)arg;
_DEBUG("%s: %p\n", __FUNCTION__, arg);
if (conn->close_callback) {
conn->close_callback(conn, conn->arg);
}
}
static void nsqd_connection_error_cb(struct BufferedSocket *buffsock, void *arg)
{
struct NSQDConnection *conn = (struct NSQDConnection *)arg;
_DEBUG("%s: %p\n", __FUNCTION__, arg);
}
struct NSQDConnection *new_nsqd_connection(const char *address, int port,
NSQDConnectionCallback connect_callback,
NSQDConnectionCallback close_callback,
NSQDConnectionCallback data_callback,
void *arg)
{
struct NSQDConnection *conn;
conn = malloc(sizeof(struct NSQDConnection));
conn->command_buf = new_buffer(4096, 4096);
conn->current_msg_size = 0;
conn->connect_callback = connect_callback;
conn->close_callback = close_callback;
conn->data_callback = data_callback;
conn->arg = arg;
conn->bs = new_buffered_socket(address, port,
nsqd_connection_connect_cb, nsqd_connection_close_cb,
NULL, NULL, nsqd_connection_error_cb,
conn);
return conn;
}
void free_nsqd_connection(struct NSQDConnection *conn)
{
if (conn) {
free_buffer(conn->command_buf);
free_buffered_socket(conn->bs);
free(conn);
}
}
int nsqd_connection_connect(struct NSQDConnection *conn)
{
return buffered_socket_connect(conn->bs);
}