forked from bertha/berthad
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathberthad-vfs.c
1876 lines (1561 loc) · 57.9 KB
/
berthad-vfs.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#include "config.h"
#define _GNU_SOURCE
#include <arpa/inet.h>
#ifdef __FreeBSD__
# include <netinet/in.h>
#endif
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <inttypes.h>
#include <stdlib.h>
#include <signal.h>
#include <stdarg.h>
#include <string.h>
#include <unistd.h>
#include <dirent.h>
#include <stdio.h>
#include <netdb.h>
#include <fcntl.h>
#include <errno.h>
#include <glib.h>
#ifdef HAVE_FALLOCATE
# include <linux/falloc.h>
#endif
/*
* States of connections
*
* cmd states
* LIST INITIAL => LIST
* PUT INITIAL => PUT => SENDN
* SPUT INITIAL => RECVN => PUT => SENDN
* GET INITIAL => RECVN => GET
* SGET INITIAL => RECVN => SENDN => GET
* QUIT INITIAL
* SIZE INITIAL => RECVN => SENDN
* STATS INITIAL => SENDN
*/
/* Connection has been accepted. We wait for the command byte. */
#define BCONN_STATE_INITIAL 0
/* We are waiting to write keys to the socket */
#define BCONN_STATE_LIST 1
/* We are waiting for data on the socket or on the VFS to write this
* data a temporary file */
#define BCONN_STATE_PUT 2
/* We are waiting to splice the file to the socket*/
#define BCONN_STATE_GET 3
/* For a BERTHA_SPUT, we are waiting to receive the length;
* for a BERTHA_GET, BERTHA_SGET and BERTHA_SIZE, we are waiting to receive
* the key */
#define BCONN_STATE_RECVN 4
/* For a BERTHA_SGET, we are waiting to write the length;
* for a BERTHA_SPUT and a BERTHA_PUT, we are waiting to write the key */
#define BCONN_STATE_SENDN 5
/* Intermediate stage */
#define BCONN_STATE_NONE 255
/*
* Commands bytes in the bertha protocol
*/
#define BERTHA_LIST ((guint8)0)
#define BERTHA_PUT ((guint8)1)
#define BERTHA_GET ((guint8)2)
#define BERTHA_QUIT ((guint8)3)
#define BERTHA_SPUT ((guint8)4)
#define BERTHA_SGET ((guint8)5)
#define BERTHA_SIZE ((guint8)6)
#define BERTHA_STATS ((guint8)7)
#define BERTHA_NONE ((guint8)255)
typedef struct {
/* listening socket */
int lsock;
/* list of connections */
GList *conns;
/* Are we running? */
gboolean running;
/* path to the data directory */
char* dataPath;
/* path to the temporary directory */
char* tmpPath;
/* fd_sets for select (2) */
fd_set r_fds;
fd_set w_fds;
int highest_fd;
/* statistics */
gsize n_conns_accepted; /* number of connections accepted */
gsize n_conns_active; /* number of currently active connections */
gsize n_GET_sent; /* number of bytes sent for GETs */
gsize n_PUT_received; /* number of bytes received for PUTs */
gsize n_cycle; /* number of non-trivial main loop cycles */
#ifdef USE_THREADS
/* threadpool for async. fadvise and fallocate */
GThreadPool* threadpool;
#endif
} BProgram;
typedef struct {
/* The state of this connection */
int state;
/* the socket */
int sock;
/* remote address */
struct sockaddr addr;
socklen_t addrlen;
/* pointer to additional data associated with the state
* of the connection */
gpointer state_data;
/* Number of this connection */
gsize n;
/* the command send by the client */
guint8 cmd;
#ifdef USE_THREADS
/* Number of threads working on this object */
gint n_threads;
/* Condition/mutex pair used to join threads, if there are any. */
GCond* threads_cond;
GMutex* threads_mutex;
#endif /* USE_THREADS */
} BConn;
typedef struct {
/* Partial hexadecimal key */
GString* key;
/* Path to the directory */
GString* path;
/* Depth of this directory */
guint8 depth;
} BConnListEntry;
typedef struct {
/* handle of the current directory enumerated */
DIR* cdir_handle;
/* entry of the current directory enumerated */
BConnListEntry* cdir;
/* Stack of directories names to search */
GList* dirs;
/* The send buffer */
GByteArray* buf;
} BConnList;
typedef struct {
/* checksum state */
GChecksum* checksum;
/* temporary filename */
GString* tmp_fn;
/* file descriptor of target file */
int fd;
/* Is there data waiting on the socket? */
gboolean socket_ready;
/* Have we received an EOS on the socket? */
gboolean socket_eos;
/* Can we write data to the file? */
gboolean file_ready;
/* File write buffer */
GByteArray* buf;
/* The size advised by the client in case of a SPUT */
guint64 advised_size;
} BConnPut;
typedef struct {
/* The file descriptor of the file to send */
int fd;
#ifdef USE_SPLICE
/* The pipe used to splice from the file to the socket */
int pipe[2];
#endif
#ifdef USE_SENDFILE
/* The number of bytes read and transferred from the file */
size_t n_sent;
#endif
/* Can we send data over the socket? */
gboolean socket_ready;
/* Can we read data from the file? */
gboolean file_ready;
#ifdef USE_SPLICE
/* Is the file depleted? */
gboolean file_eos;
/* Can we splice data to the pipe? */
gboolean pipe_ready;
/* The number of bytes in the pipe */
size_t in_buffer;
#endif /* USE_SPLICE */
} BConnGet;
typedef struct {
/* The buffer */
guint8* buf;
/* bytes left to receive */
size_t left;
/* bytes to receive in total */
size_t size;
} BConnRecvN;
typedef struct {
/* The buffer */
guint8* buf;
/* bytes left to send */
size_t left;
/* bytes to send in total */
size_t size;
/* For BERTHA_SGET used to store the BConnGet structure */
gpointer next;
} BConnSendN;
#ifdef USE_THREADS
/* Call fadvise for file read for GET async, since it may block */
#define BJOB_FADVISE 0
/* Call fallocate for file written by PUT async, since it will block */
#define BJOB_FALLOCATE 1
typedef struct {
/* type of the job */
guint8 type;
/* pointer to the program */
BProgram* prog;
} BJob;
typedef struct {
BJob parent;
/* pointer to the connection */
BConn* conn;
} BJobConn;
#endif /* USE_THREADS */
/*
* Converts a single hexadecimal digit to a byte
*/
static guint8 hex_to_uint4 (char hex)
{
return ('0' <= hex && hex <= '9') ? hex - '0' : hex - 'a' + 10;
}
/*
* Converts a byte in the range [0, 15] to a hexadecimal digit
*/
static char uint4_to_hex (guint8 byte)
{
return (byte < 10) ? byte + '0' : byte - 10 + 'a';
}
/*
* Returns whether the path specified is a directory
*/
static gboolean path_is_dir (char* path)
{
struct stat st;
int ret;
ret = stat(path, &st);
if (ret != 0)
return FALSE;
return S_ISDIR(st.st_mode);
}
/*
* Ensures the specified directory exists.
*/
static void mkdirs (char* path)
{
char* path2 = g_strdup(path);
char* tmp;
gsize len;
int ret;
len = strlen(path2);
/* strip trailing / */
if(path2[len-1] == '/') {
path2[len-1] = '\0';
len--;
}
/* search down until we find a directory that does exist */
while (!path_is_dir(path2)) {
tmp = strrchr(path2, '/');
g_assert(tmp != NULL);
tmp[0] = '\0';
}
/* work up to create them */
while (strlen(path2) < len) {
path2[strlen(path2)] = '/';
ret = mkdir(path2, 0700);
g_assert(ret == 0);
}
g_free(path2);
}
/*
* Converts a hexadecimal string to an bytearray
* Free with g_slice_free1
*/
static guint8* hex_to_buf (char* hex, gsize* size)
{
gsize i;
gsize len = strlen(hex);
guint8* buf;
g_assert(size != NULL);
g_assert(strlen(hex) % 2 == 0);
*size = len / 2;
buf = g_slice_alloc(*size);
for (i = 0; i < *size; i++)
buf[i] = hex_to_uint4(hex[2*i]) * 16 + hex_to_uint4(hex[2*i+1]);
return buf;
}
/*
* Check whether a string is a string of hexadecimal characters
*/
static gboolean is_hex (gchar* str)
{
gsize i, len = strlen(str);
for (i = 0; i < len; i++)
if ((str[i] < 'a' || 'f' < str[i])
&& (str[i] < '0' || '9' < str[i]))
return FALSE;
return TRUE;
}
/*
* Converts a buffer to a hexadecimal string
* Free with g_slice_free1
*/
static gchar* buf_to_hex (guint8* buf, gsize size)
{
gssize i;
char* str = g_slice_alloc(size * 2 + 1);
str[size* 2] = 0;
for (i = 0; i < size; i++) {
str[2*i] = uint4_to_hex(buf[i] / 16);
str[2*i+1] = uint4_to_hex(buf[i] % 16);
}
return str;
}
/*
* Returns the path to the file for the blob with key <key>
* It may create directories that are missing
*/
static GString* key_to_path (BProgram* prog, char* key)
{
GString* fn = g_string_sized_new(128);
int i;
g_string_printf(fn, "%s/", prog->dataPath);
for (i = 0; i < CFG_DATADIR_DEPTH; i++) {
char* bit = g_strndup(key, CFG_DATADIR_WIDTH);
g_string_append_printf(fn, "%s/", bit);
g_free(bit);
key += CFG_DATADIR_WIDTH;
}
mkdirs(fn->str);
g_string_append(fn, key);
return fn;
}
/*
* Creates a human readable string from a struct sockaddr
*/
static GString* sockaddr_to_gstring (struct sockaddr* sa)
{
gsize len = MAX(INET_ADDRSTRLEN, INET6_ADDRSTRLEN);
char* buf = g_alloca(len);
gpointer src;
const char* ret;
if (sa->sa_family == AF_INET)
src = &(((struct sockaddr_in *)sa)->sin_addr);
else if (sa->sa_family == AF_INET6)
src = &(((struct sockaddr_in6 *)sa)->sin6_addr);
else
g_assert_not_reached();
ret = inet_ntop(sa->sa_family, src, buf, len);
g_assert(ret == buf);
return g_string_new(buf);
}
/*
* Sets a fd in non-blocking mode
*/
static void fd_set_nonblocking(int s)
{
int flags = fcntl(s, F_GETFL, 0);
int ret = fcntl(s, F_SETFL, flags | O_NONBLOCK);
g_assert(ret == 0);
}
/*
* Logs a message for a connection
*/
static void conn_log(BConn* conn, const char* format, ...)
{
va_list arglist;
GString* msg = g_string_sized_new(128);
struct timeval tv;
guint64 microts;
/* Get microseconds timestamp */
gettimeofday(&tv, NULL);
microts = (guint64)tv.tv_sec * 1000000 + tv.tv_usec;
/* Get their message */
va_start(arglist, format);
g_string_vprintf(msg, format, arglist);
va_end(arglist);
/* Print our message */
printf("%zd %"PRIu64" %s\n", conn->n, microts, msg->str);
g_string_free(msg, TRUE);
}
static void conn_sendn_free(BProgram* prog, GList* lhconn)
{
BConn* conn = lhconn->data;
BConnSendN* data = conn->state_data;
if (data->buf)
g_slice_free1(data->size, data->buf);
g_slice_free(BConnSendN, data);
conn->state_data = NULL;
conn->state = BCONN_STATE_NONE;
}
static void conn_recvn_free(BProgram* prog, GList* lhconn)
{
BConn* conn = lhconn->data;
BConnRecvN* data = conn->state_data;
if (data->buf)
g_slice_free1(data->size, data->buf);
g_slice_free(BConnRecvN, data);
conn->state_data = NULL;
conn->state = BCONN_STATE_NONE;
}
static void conn_list_free(BProgram* prog, GList* lhconn)
{
BConn* conn = lhconn->data;
BConnList* data = conn->state_data;
if (data->cdir_handle)
closedir(data->cdir_handle);
if (data->dirs) {
GList* lhdir;
for (lhdir = data->dirs; lhdir;
lhdir = g_list_next(lhdir)) {
BConnListEntry* e = lhdir->data;
g_string_free(e->key, TRUE);
g_string_free(e->path, TRUE);
g_slice_free(BConnListEntry, lhdir->data);
}
g_list_free(data->dirs);
}
if (data->cdir) {
g_string_free(data->cdir->key, TRUE);
g_string_free(data->cdir->path, TRUE);
g_slice_free(BConnListEntry, data->cdir);
}
if (data->buf)
g_byte_array_unref(data->buf);
g_slice_free(BConnList, data);
conn->state_data = NULL;
conn->state = BCONN_STATE_NONE;
}
static void conn_put_free(BProgram* prog, GList* lhconn)
{
BConn* conn = lhconn->data;
BConnPut* data = conn->state_data;
if (data->checksum)
g_checksum_free(data->checksum);
if (data->fd != -1)
close (data->fd);
if (data->tmp_fn) {
unlink(data->tmp_fn->str);
g_string_free(data->tmp_fn, TRUE);
}
if (data->buf)
g_byte_array_unref(data->buf);
g_slice_free(BConnPut, data);
conn->state_data = NULL;
conn->state = BCONN_STATE_NONE;
}
static void conn_get_free(BProgram* prog, GList* lhconn)
{
BConn* conn = lhconn->data;
BConnGet* data = conn->state_data;
if (data->fd != -1)
close(data->fd);
#ifdef USE_SPLICE
if (data->pipe[0])
close(data->pipe[0]);
if (data->pipe[1])
close(data->pipe[1]);
#endif /* USE_SPLICE */
g_slice_free(BConnGet, data);
conn->state_data = NULL;
conn->state = BCONN_STATE_NONE;
}
/*
* Closes and frees a connection
*/
static void conn_close(BProgram* prog, GList* lhconn)
{
BConn* conn = lhconn->data;
#ifdef USE_THREADS
/* Check if there are still threads working on this connection.
* And if so, wait on them */
if (conn->threads_mutex != NULL) {
g_assert(conn->threads_cond != NULL);
g_mutex_lock(conn->threads_mutex);
if(conn->n_threads > 0)
g_cond_wait(conn->threads_cond, conn->threads_mutex);
g_mutex_unlock(conn->threads_mutex);
g_assert(conn->n_threads == 0);
}
#endif /* USE_THREADS */
conn_log(conn, "close");
if (conn->state == BCONN_STATE_LIST)
conn_list_free(prog, lhconn);
else if (conn->state == BCONN_STATE_PUT)
conn_put_free(prog, lhconn);
else if (conn->state == BCONN_STATE_GET)
conn_get_free(prog, lhconn);
else if (conn->state == BCONN_STATE_RECVN)
conn_recvn_free(prog, lhconn);
else if (conn->state == BCONN_STATE_SENDN)
conn_sendn_free(prog, lhconn);
else if (conn->state == BCONN_STATE_INITIAL
|| conn->state == BCONN_STATE_NONE) {}
else
g_assert_not_reached();
#ifdef USE_THREADS
if (conn->threads_mutex)
g_mutex_free(conn->threads_mutex);
if (conn->threads_cond)
g_cond_free(conn->threads_cond);
#endif /* USE_THREADS */
/* Close the socket */
if (conn->sock)
close(conn->sock);
/* Free the connection */
prog->conns = g_list_delete_link(prog->conns, lhconn);
prog->n_conns_active--;
g_slice_free(BConn, conn);
}
#ifdef USE_THREADS
/*
* Starts a BJOB for a connection
* Used for async. fallocate and fadvise
*/
static void conn_start_job (BProgram* prog, BConn* conn, guint8 type)
{
GError* err = NULL;
BJobConn* job = g_slice_new0(BJobConn);
job->parent.type = type;
job->conn = conn;
/* initialize mutex and condition if we have not already */
if (conn->threads_mutex == NULL) {
g_assert(conn->threads_cond == NULL);
g_assert(conn->n_threads == 0);
conn->threads_mutex = g_mutex_new();
conn->threads_cond = g_cond_new();
}
g_mutex_lock(conn->threads_mutex);
conn->n_threads++;
g_mutex_unlock(conn->threads_mutex);
/* run it */
g_thread_pool_push(prog->threadpool, job, &err);
if (err)
g_error("g_thread_pool_push: %s", err->message);
}
#endif /* USE_THREADS */
static void conn_sendn_init(BProgram* prog, GList* lhconn, gpointer buf,
gsize size)
{
BConn* conn = lhconn->data;
BConnSendN* data = g_slice_new0(BConnSendN);
/* allocate the buffer */
data->size = size;
data->left = data->size;
data->buf = g_slice_alloc(data->size);
memcpy(data->buf, buf, size);
if (conn->cmd == BERTHA_PUT || conn->cmd == BERTHA_SPUT
|| conn->cmd == BERTHA_SIZE
|| conn->cmd == BERTHA_STATS) {
g_assert(!conn->state_data);
} else if (conn->cmd == BERTHA_SGET) {
g_assert(conn->state_data);
data->next = conn->state_data;
} else
g_assert_not_reached();
/* set new state */
conn->state = BCONN_STATE_SENDN;
conn->state_data = data;
}
static void conn_get_init(BProgram* prog, GList* lhconn)
{
BConn* conn = lhconn->data;
BConnGet* data = g_slice_new0(BConnGet);
BConnRecvN* pdata = conn->state_data;
int ret;
GString* fn;
char* hex_key;
/* Copy the key */
g_assert(pdata);
hex_key = buf_to_hex(pdata->buf, 32);
conn_recvn_free(prog, lhconn);
/* Set new state */
data->socket_ready = FALSE;
data->file_ready = FALSE;
#if defined(USE_SPLICE)
data->pipe_ready = FALSE;
data->file_eos = FALSE;
#elif defined(USE_SENDFILE)
data->n_sent = 0;
#endif
conn->state_data = data;
/* Open the file */
fn = key_to_path(prog, hex_key);
data->fd = open(fn->str, O_RDONLY, 0);
conn_log(conn, "%s %s", conn->cmd == BERTHA_SGET
? "SGET" : "GET", hex_key);
g_string_free(fn, TRUE);
g_slice_free1(65, hex_key);
/* The file couldn't be opened - break */
if(data->fd < 0) {
g_warning("GET Couldn't open file\n");
conn_close(prog, lhconn);
return;
}
#ifdef HAVE_POSIX_FADVISE
/* Advise the kernel on the access pattern */
ret = posix_fadvise(data->fd, 0, 0, POSIX_FADV_SEQUENTIAL);
g_assert(ret == 0);
#endif
/* Set file in non-blocking mode */
fd_set_nonblocking(data->fd);
#ifdef USE_SPLICE
/* Set up a pipeline. We will splice from fd to pipe[0] and
* then from pipe[1] to sock. */
ret = pipe2(data->pipe, O_NONBLOCK);
g_assert(ret == 0);
#endif
/* If the original command is SGET, we will first send the
* size of the file */
if (conn->cmd == BERTHA_SGET) {
struct stat st;
guint64 size;
/* Get the size of the file */
ret = fstat(data->fd, &st);
if(ret != 0) {
perror("fstat");
g_error("fstat failed\n");
}
size = GUINT64_TO_LE(st.st_size);
conn_sendn_init(prog, lhconn, &size, sizeof(size));
} else {
conn->state = BCONN_STATE_GET;
}
}
static void conn_put_init(BProgram* prog, GList* lhconn)
{
BConn* conn = lhconn->data;
BConnPut* data = g_slice_new0(BConnPut);
if (conn->cmd == BERTHA_SPUT) {
BConnRecvN* pdata = conn->state_data;
guint64* size_ptr;
g_assert(pdata);
size_ptr = (guint64*)pdata->buf;
data->advised_size = GUINT64_FROM_LE(*size_ptr);
conn_recvn_free(prog, lhconn);
} else if (conn->cmd == BERTHA_PUT)
g_assert(!conn->state_data);
else
g_assert_not_reached();
/* Set new connection state */
conn->state = BCONN_STATE_PUT;
conn->state_data = data;
data->file_ready = FALSE;
data->socket_ready = FALSE;
data->socket_eos = FALSE;
/* Create a temporary file */
data->tmp_fn = g_string_sized_new(128);
g_string_printf(data->tmp_fn, "%s/berthadtmp.XXXXXX",
prog->tmpPath);
data->fd = mkstemp(data->tmp_fn->str);
g_assert(data->fd != -1);
fd_set_nonblocking(data->fd);
/* Set up the checksum */
data->checksum = g_checksum_new(G_CHECKSUM_SHA256);
/* Initialize the write buffer */
data->buf = g_byte_array_sized_new(CFG_PUT_BUFFER);
if (conn->cmd == BERTHA_PUT)
conn_log(conn, "PUT %s", data->tmp_fn->str);
else if (conn->cmd == BERTHA_SPUT) {
conn_log(conn, "SPUT %s %ld", data->tmp_fn->str,
data->advised_size);
#ifdef HAVE_FALLOCATE
/* Pre-allocate the file in a separate thread */
conn_start_job(prog, conn, BJOB_FALLOCATE);
#endif
} else
g_assert_not_reached();
}
/*
* Accepts a new connection
*/
static void conn_accept(BProgram* prog)
{
BConn* conn = g_slice_new0(BConn);
GString* human_addr;
#ifdef SO_NOSIGPIPE
int opt, ret;
#endif
conn->n = prog->n_conns_accepted++;
conn->cmd = BERTHA_NONE;
conn->addrlen = sizeof(conn->addr);
/* accept the connection */
conn->sock = accept(prog->lsock, &conn->addr, &conn->addrlen);
g_assert(conn->sock >= 0);
human_addr = sockaddr_to_gstring(&conn->addr);
conn_log(conn, "accepted %s", human_addr->str);
g_string_free(human_addr, TRUE);
/* set the socket to non-blocking */
fd_set_nonblocking(conn->sock);
#ifdef SO_NOSIGPIPE
/* if possible, prevent SIGPIPE signals being raised by closed
* sockets. */
opt = 1;
ret = setsockopt(conn->sock, SOL_SOCKET, SO_NOSIGPIPE,
&opt, sizeof(opt));
g_assert(ret == 0);
#endif /* SO_NOSIGPIPE */
/* store it */
conn->state = BCONN_STATE_INITIAL;
prog->conns = g_list_prepend(prog->conns, conn);
prog->n_conns_active++;
}
#ifdef USE_SPLICE
/*
* Splice data from the file into a pipe and from that pipe into the socket
*/
static inline void conn_get_handle__splice(BProgram* prog, GList* lhconn)
{
BConn* conn = lhconn->data;
BConnGet* data = conn->state_data;
ssize_t spliced;
/* Is there data to splice from the file to the pipe? And is there
* room in the pipe? Then splice some data! */
if (data->file_ready && data->pipe_ready) {
splice_some_to_pipe:
/* Splice from file to pipe */
spliced = splice(data->fd, NULL, data->pipe[1], NULL,
65536, SPLICE_F_MOVE |
SPLICE_F_NONBLOCK |
SPLICE_F_MORE);
if (spliced == -1) {
if (errno == EAGAIN) {
g_warning("splice returned EAGAIN\n");
goto bail_splice_to_pipe;
}
perror("splice");
g_error("Splice failed?!\n");
}
data->file_ready = FALSE;
data->pipe_ready = FALSE;
/* Check for end of file */
if (spliced == 0) {
data->file_eos = TRUE;
/* If we've got some data to write to the socket and
* the socket is ready, we will give it a try. */
if (data->socket_ready && data->in_buffer > 0)
goto splice_some_from_pipe;
/* Otherwise check whether we are done */
goto check_if_done;
} else
data->in_buffer += spliced;
}
bail_splice_to_pipe:
/* Is there data to splice from the pipe to the socket? And
* is the socket ready? */
if (data->socket_ready && data->in_buffer > 0) {
splice_some_from_pipe:
/* Splice from pipe to socket */
spliced = splice(data->pipe[0], NULL, conn->sock, NULL,
data->in_buffer, SPLICE_F_MOVE |
SPLICE_F_NONBLOCK |
SPLICE_F_MORE);
if (spliced == -1) {
if (errno == EAGAIN) {
g_warning("splice returned EAGAIN\n");
return;
}
/* socket has been closed or is in some kind of
* error. */
if (errno == EPIPE || errno == ETIMEDOUT || errno == ECONNRESET) {
conn_log(conn, strerror(errno));
conn_close(prog, lhconn);
return;
}
perror("splice");
g_error("Splice failed?!\n");
}
g_assert(spliced > 0);
prog->n_GET_sent += spliced;
data->socket_ready = FALSE;
data->in_buffer -= spliced;
/* Check if we have enough room to read from the file*/
if (data->file_ready && data->pipe_ready)
goto splice_some_to_pipe;
goto check_if_done;
}
return;
check_if_done:
if (data->file_eos && data->in_buffer == 0) {
/* We're done! */
shutdown(conn->sock, SHUT_RDWR);
conn_close(prog, lhconn);
}
}
#endif /* USE_SPLICE */
#ifdef USE_SENDFILE
/*
* Use sendfile() to transfer data from the file to the socket.
* This function is used on FreeBSD. See conn_get_handle__splice for
* the version used on Linux.
*/
static inline void conn_get_handle__sendfile(BProgram* prog, GList* lhconn)
{
BConn* conn = lhconn->data;
BConnGet* data = conn->state_data;
off_t sent = 0;
int res;
gboolean done = FALSE;
if(!data->file_ready || !data->socket_ready)
return;
res = sendfile(data->fd, conn->sock, data->n_sent, 0, NULL,
&sent, SF_NODISKIO | SF_MNOWAIT);
if (res == 0)
/* Everything is written. */
done = TRUE;
else if (res == -1) {
if (errno == EPIPE) {
/* Peer closed the socket */
conn_log(conn, "EPIPE");
conn_close(prog, lhconn);
return;
}
if(errno == EAGAIN) {
/* Socket buffers are full */
data->socket_ready = FALSE;
} else if(errno == EBUSY) {
/* File buffers are depleted */
data->file_ready = FALSE;
#ifdef __FreeBSD__
/* Workaround for #5. Trigger the kernel to read