forked from nsqio/libnsq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnsq.c
112 lines (89 loc) · 3.05 KB
/
nsq.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#include "nsq.h"
#ifdef DEBUG
#define _DEBUG(...) fprintf(stdout, __VA_ARGS__)
#else
#define _DEBUG(...) do {;} while (0)
#endif
static void nsq_reader_connect_cb(struct NSQDConnection *conn, void *arg)
{
struct NSQReader *rdr = (struct NSQReader *)arg;
_DEBUG("%s: %p\n", __FUNCTION__, rdr);
if (rdr->connect_callback) {
rdr->connect_callback(rdr, conn);
}
// send magic
buffered_socket_write(conn->bs, " V2", 4);
// subscribe
buffer_reset(conn->command_buf);
nsq_subscribe(conn->command_buf, rdr->topic, rdr->channel);
buffered_socket_write_buffer(conn->bs, conn->command_buf);
// send initial RDY
buffer_reset(conn->command_buf);
nsq_ready(conn->command_buf, rdr->max_in_flight);
buffered_socket_write_buffer(conn->bs, conn->command_buf);
}
static void nsq_reader_data_cb(struct NSQDConnection *conn, void *arg)
{
struct NSQReader *rdr = (struct NSQReader *)arg;
_DEBUG("%s: %p\n", __FUNCTION__, rdr);
switch (conn->current_frame_type) {
case NSQ_FRAME_TYPE_RESPONSE:
if (strncmp(conn->current_data, "_heartbeat_", 11) == 0) {
buffer_reset(conn->command_buf);
nsq_nop(conn->command_buf);
buffered_socket_write_buffer(conn->bs, conn->command_buf);
return;
}
break;
}
if (rdr->data_callback) {
rdr->data_callback(rdr, conn,
conn->current_frame_type, conn->current_msg_size, conn->current_data);
}
}
static void nsq_reader_close_cb(struct NSQDConnection *conn, void *arg)
{
struct NSQReader *rdr = (struct NSQReader *)arg;
_DEBUG("%s: %p\n", __FUNCTION__, rdr);
if (rdr->close_callback) {
rdr->close_callback(rdr, conn);
}
LL_DELETE(rdr->conns, conn);
}
struct NSQReader *new_nsq_reader(const char *topic, const char *channel,
void (*connect_callback)(struct NSQReader *rdr, struct NSQDConnection *conn),
void (*close_callback)(struct NSQReader *rdr, struct NSQDConnection *conn),
void (*data_callback)(struct NSQReader *rdr, struct NSQDConnection *conn,
uint32_t frame_type, uint32_t msg_size, char *data))
{
struct NSQReader *rdr;
rdr = malloc(sizeof(struct NSQReader));
rdr->topic = strdup(topic);
rdr->channel = strdup(channel);
rdr->max_in_flight = 1;
rdr->connect_callback = connect_callback;
rdr->close_callback = close_callback;
rdr->data_callback = data_callback;
rdr->conns = NULL;
return rdr;
}
void free_nsq_reader(struct NSQReader *rdr)
{
if (rdr) {
free(rdr->topic);
free(rdr->channel);
free(rdr);
}
}
int nsq_reader_connect_to_nsqd(struct NSQReader *rdr, const char *address, int port)
{
struct NSQDConnection *conn;
conn = new_nsqd_connection(address, port,
nsq_reader_connect_cb, nsq_reader_close_cb, nsq_reader_data_cb, rdr);
LL_APPEND(rdr->conns, conn);
return nsqd_connection_connect(conn);
}
void nsq_run(struct ev_loop *loop)
{
ev_loop(loop, 0);
}