-
Notifications
You must be signed in to change notification settings - Fork 5
/
helpers.c
108 lines (89 loc) · 3.25 KB
/
helpers.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#include <zookeeper.h>
#include <pthread.h>
#include <string.h>
#include "helpers.h"
static pthread_mutex_t watch_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t watch_available = PTHREAD_COND_INITIALIZER;
static watch_data *first_watch = NULL;
completion_data* create_completion_data() {
completion_data *data = malloc(sizeof(completion_data));
pthread_mutex_init(&data->mutex, NULL);
pthread_mutex_lock(&data->mutex);
return data;
}
void destroy_completion_data(completion_data *data) {
pthread_mutex_destroy(&data->mutex);
free(data);
}
void wait_for_completion(completion_data *data) {
pthread_mutex_lock(&data->mutex);
}
void _handle_void_completion(int rc, const void *data_) {
completion_data *data = (completion_data*)data_;
data->data = (void*)(size_t)rc;
pthread_mutex_unlock(&data->mutex);
}
void _watch_handler(zhandle_t *zh, int event_type, int connection_state,
const char *event_path, void *watch_context)
{
pthread_mutex_lock(&watch_mutex);
{
watch_data *data = malloc(sizeof(watch_data)); // XXX Check data.
data->connection_state = connection_state;
data->event_type = event_type;
data->event_path = strdup(event_path); // XXX Check event_path.
data->watch_context = watch_context;
data->next = NULL;
if (first_watch == NULL) {
first_watch = data;
} else {
watch_data *last_watch = first_watch;
while (last_watch->next != NULL) {
last_watch = last_watch->next;
}
last_watch->next = data;
}
pthread_cond_signal(&watch_available);
}
pthread_mutex_unlock(&watch_mutex);
}
watch_data *wait_for_watch() {
watch_data *data = NULL;
pthread_mutex_lock(&watch_mutex);
{
while (first_watch == NULL) {
pthread_cond_wait(&watch_available, &watch_mutex);
}
data = first_watch;
first_watch = first_watch->next;
data->next = NULL; // Just in case.
}
pthread_mutex_unlock(&watch_mutex);
return data;
}
void destroy_watch_data(watch_data *data) {
free(data->event_path);
free(data);
}
// Cgo doesn't like to use function addresses as variables.
watcher_fn watch_handler = _watch_handler;
void_completion_t handle_void_completion = _handle_void_completion;
zhandle_t *zookeeper_init_int(const char *host, watcher_fn fn,
int recv_timeout, const clientid_t *clientid, unsigned long context, int flags) {
return zookeeper_init(host, fn, recv_timeout, clientid, (void*)context, flags);
}
int zoo_wget_int(zhandle_t *zh, const char *path,
watcher_fn watcher, unsigned long watcherCtx,
char *buffer, int* buffer_len, struct Stat *stat) {
return zoo_wget(zh, path, watcher, (void*)watcherCtx, buffer, buffer_len, stat);
}
int zoo_wget_children2_int(zhandle_t *zh, const char *path,
watcher_fn watcher, unsigned long watcherCtx,
struct String_vector *strings, struct Stat *stat) {
return zoo_wget_children2(zh, path, watcher, (void*)watcherCtx, strings, stat);
}
int zoo_wexists_int(zhandle_t *zh, const char *path,
watcher_fn watcher, unsigned long watcherCtx, struct Stat *stat) {
return zoo_wexists(zh, path, watcher, (void*)watcherCtx, stat);
}
// vim:ts=4:sw=4:et