BSD: add kqueue support
[tinc] / src / event.c
1 /*
2     event.c -- I/O, timeout and signal event handling
3     Copyright (C) 2012-2022 Guus Sliepen <guus@tinc-vpn.org>
4
5     This program is free software; you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation; either version 2 of the License, or
8     (at your option) any later version.
9
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License along
16     with this program; if not, write to the Free Software Foundation, Inc.,
17     51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include "system.h"
21
22 #ifdef HAVE_WINDOWS
23 #  include <assert.h>
24 #else
25 #  if defined(HAVE_SYS_EPOLL_H)
26 #    include <sys/epoll.h>
27 #    define HAVE_EPOLL 1
28 #  elif defined(HAVE_SYS_EVENT_H)
29 #    include <sys/event.h>
30 #    define HAVE_KQUEUE 1
31 #  else
32 #    define HAVE_SELECT 1
33 #  endif
34 #endif
35
36 #include "event.h"
37 #include "utils.h"
38 #include "net.h"
39
40 struct timeval now;
41 static bool running;
42
43 #if defined(HAVE_EPOLL) || defined(HAVE_KQUEUE)
44 static int event_fd = 0;
45 #elif defined(HAVE_SELECT)
46 static fd_set readfds;
47 static fd_set writefds;
48 #elif defined(HAVE_WINDOWS)
49 static const long READ_EVENTS = FD_READ | FD_ACCEPT | FD_CLOSE;
50 static const long WRITE_EVENTS = FD_WRITE | FD_CONNECT;
51 static DWORD event_count = 0;
52 #endif
53
54 static inline void event_init(void) {
55 #if defined(HAVE_EPOLL) || defined(HAVE_KQUEUE)
56
57         if(!event_fd) {
58 #if defined(HAVE_EPOLL)
59                 /* NOTE: 1024 limit is only used on ancient (pre 2.6.27) kernels.
60                    Decent kernels will ignore this value making it unlimited.
61                    epoll_create1 might be better, but these kernels would not be supported
62                    in that case. */
63                 event_fd = epoll_create(1024);
64 #else
65                 event_fd = kqueue();
66 #endif
67
68                 if(event_fd == -1) {
69                         logger(DEBUG_ALWAYS, LOG_EMERG, "Could not initialize events: %s", strerror(errno));
70                         abort();
71                 }
72         }
73
74 #endif
75 }
76
77 static void event_deinit(void) {
78 #if defined(HAVE_EPOLL) || defined(HAVE_KQUEUE)
79
80         if(event_fd > 0) {
81                 close(event_fd);
82                 event_fd = 0;
83         }
84
85 #endif
86 }
87
88 static int io_compare(const io_t *a, const io_t *b) {
89 #ifndef HAVE_WINDOWS
90         return a->fd - b->fd;
91 #else
92
93         if(a->event < b->event) {
94                 return -1;
95         }
96
97         if(a->event > b->event) {
98                 return 1;
99         }
100
101         return 0;
102 #endif
103 }
104
105 static int timeout_compare(const timeout_t *a, const timeout_t *b) {
106         struct timeval diff;
107         timersub(&a->tv, &b->tv, &diff);
108
109         if(diff.tv_sec < 0) {
110                 return -1;
111         }
112
113         if(diff.tv_sec > 0) {
114                 return 1;
115         }
116
117         if(diff.tv_usec < 0) {
118                 return -1;
119         }
120
121         if(diff.tv_usec > 0) {
122                 return 1;
123         }
124
125         if(a < b) {
126                 return -1;
127         }
128
129         if(a > b) {
130                 return 1;
131         }
132
133         return 0;
134 }
135
136 static splay_tree_t io_tree = {.compare = (splay_compare_t)io_compare};
137 static splay_tree_t timeout_tree = {.compare = (splay_compare_t)timeout_compare};
138
139 void io_add(io_t *io, io_cb_t cb, void *data, int fd, int flags) {
140         if(io->cb) {
141                 return;
142         }
143
144         io->fd = fd;
145 #ifdef HAVE_WINDOWS
146
147         if(io->fd != -1) {
148                 io->event = WSACreateEvent();
149
150                 if(io->event == WSA_INVALID_EVENT) {
151                         abort();
152                 }
153         }
154
155         event_count++;
156 #endif
157         io->cb = cb;
158         io->data = data;
159         io->node.data = io;
160
161         io_set(io, flags);
162
163 #ifdef HAVE_SELECT
164
165         if(!splay_insert_node(&io_tree, &io->node)) {
166                 abort();
167         }
168
169 #endif
170 }
171
172 #ifdef HAVE_WINDOWS
173 void io_add_event(io_t *io, io_cb_t cb, void *data, WSAEVENT event) {
174         io->event = event;
175         io_add(io, cb, data, -1, 0);
176 }
177 #endif
178
179 void io_set(io_t *io, int flags) {
180         event_init();
181
182         if(flags == io->flags) {
183                 return;
184         }
185
186         io->flags = flags;
187
188         if(io->fd == -1) {
189                 return;
190         }
191
192 #ifndef HAVE_WINDOWS
193 #ifdef HAVE_EPOLL
194         epoll_ctl(event_fd, EPOLL_CTL_DEL, io->fd, NULL);
195
196         struct epoll_event ev = {
197                 .events = 0,
198                 .data.ptr = io,
199         };
200
201         if(flags & IO_READ) {
202                 ev.events |= EPOLLIN;
203         }
204
205         if(flags & IO_WRITE) {
206                 ev.events |= EPOLLOUT;
207         } else if(ev.events == 0) {
208                 io_tree.generation++;
209                 return;
210         }
211
212         if(epoll_ctl(event_fd, EPOLL_CTL_ADD, io->fd, &ev) < 0) {
213                 perror("epoll_ctl_add");
214         }
215
216 #endif
217
218 #ifdef HAVE_KQUEUE
219         const struct kevent change[] = {
220                 {
221                         .ident = io->fd,
222                         .filter = EVFILT_READ,
223                         .flags = EV_RECEIPT | (flags & IO_READ ? EV_ADD : EV_DELETE),
224                         .udata = io,
225                 },
226                 {
227                         .ident = io->fd,
228                         .filter = EVFILT_WRITE,
229                         .flags = EV_RECEIPT | (flags & IO_WRITE ? EV_ADD : EV_DELETE),
230                         .udata = io,
231                 },
232         };
233         struct kevent result[2];
234
235         if(kevent(event_fd, change, 2, result, 2, NULL) < 0) {
236                 logger(DEBUG_ALWAYS, LOG_EMERG, "kevent failed: %s", strerror(errno));
237                 abort();
238         }
239
240         int rerr = (int)result[0].data;
241         int werr = (int)result[1].data;
242
243         if((rerr && rerr != ENOENT) || (werr && werr != ENOENT)) {
244                 logger(DEBUG_ALWAYS, LOG_EMERG, "kevent errors: %s, %s", strerror(rerr), strerror(werr));
245                 abort();
246         }
247
248         if(!flags) {
249                 io_tree.generation++;
250         }
251
252 #endif
253
254 #ifdef HAVE_SELECT
255
256         if(flags & IO_READ) {
257                 FD_SET(io->fd, &readfds);
258         } else {
259                 FD_CLR(io->fd, &readfds);
260         }
261
262         if(flags & IO_WRITE) {
263                 FD_SET(io->fd, &writefds);
264         } else {
265                 FD_CLR(io->fd, &writefds);
266         }
267
268 #endif
269
270 #else
271         long events = 0;
272
273         if(flags & IO_WRITE) {
274                 events |= WRITE_EVENTS;
275         }
276
277         if(flags & IO_READ) {
278                 events |= READ_EVENTS;
279         }
280
281         if(WSAEventSelect(io->fd, io->event, events) != 0) {
282                 abort();
283         }
284
285 #endif
286 }
287
288 void io_del(io_t *io) {
289         if(!io->cb) {
290                 return;
291         }
292
293         io_set(io, 0);
294 #ifdef HAVE_WINDOWS
295
296         if(io->fd != -1 && WSACloseEvent(io->event) == FALSE) {
297                 abort();
298         }
299
300         event_count--;
301 #endif
302
303 #if HAVE_SELECT
304         splay_unlink_node(&io_tree, &io->node);
305 #endif
306         io->cb = NULL;
307 }
308
309 void timeout_add(timeout_t *timeout, timeout_cb_t cb, void *data, const struct timeval *tv) {
310         timeout->cb = cb;
311         timeout->data = data;
312         timeout->node.data = timeout;
313
314         timeout_set(timeout, tv);
315 }
316
317 void timeout_set(timeout_t *timeout, const struct timeval *tv) {
318         if(timerisset(&timeout->tv)) {
319                 splay_unlink_node(&timeout_tree, &timeout->node);
320         }
321
322         if(!now.tv_sec) {
323                 gettimeofday(&now, NULL);
324         }
325
326         timeradd(&now, tv, &timeout->tv);
327
328         if(!splay_insert_node(&timeout_tree, &timeout->node)) {
329                 abort();
330         }
331 }
332
333 void timeout_del(timeout_t *timeout) {
334         if(!timeout->cb) {
335                 return;
336         }
337
338         splay_unlink_node(&timeout_tree, &timeout->node);
339         timeout->cb = 0;
340         timeout->tv = (struct timeval) {
341                 0, 0
342         };
343 }
344
345 #ifndef HAVE_WINDOWS
346
347 // From Matz's Ruby
348 #ifndef NSIG
349 # define NSIG (_SIGMAX + 1)      /* For QNX */
350 #endif
351
352
353 static io_t signalio;
354 static int pipefd[2] = {-1, -1};
355 static signal_t *signal_handle[NSIG + 1] = {NULL};
356
357 static void signal_handler(int signum) {
358         unsigned char num = signum;
359
360         if(write(pipefd[1], &num, 1) != 1) {
361                 // Pipe full or broken, nothing we can do about it.
362         }
363 }
364
365 static void signalio_handler(void *data, int flags) {
366         (void)data;
367         (void)flags;
368         unsigned char signum;
369
370         if(read(pipefd[0], &signum, 1) != 1) {
371                 return;
372         }
373
374         signal_t *sig = signal_handle[signum];
375
376         if(sig) {
377                 sig->cb(sig->data);
378         }
379 }
380
381 static void pipe_init(void) {
382         if(!pipe(pipefd)) {
383                 io_add(&signalio, signalio_handler, NULL, pipefd[0], IO_READ);
384         }
385 }
386
387 void signal_add(signal_t *sig, signal_cb_t cb, void *data, int signum) {
388         if(sig->cb) {
389                 return;
390         }
391
392         sig->signum = signum;
393         sig->cb = cb;
394         sig->data = data;
395
396         if(pipefd[0] == -1) {
397                 pipe_init();
398         }
399
400         signal(signum, signal_handler);
401
402         signal_handle[signum] = sig;
403 }
404
405 void signal_del(signal_t *sig) {
406         if(!sig->cb) {
407                 return;
408         }
409
410         signal(sig->signum, SIG_DFL);
411
412         signal_handle[sig->signum] = NULL;
413         sig->cb = NULL;
414 }
415 #endif
416
417 static struct timeval *timeout_execute(struct timeval *diff) {
418         gettimeofday(&now, NULL);
419         struct timeval *tv = NULL;
420
421         while(timeout_tree.head) {
422                 timeout_t *timeout = timeout_tree.head->data;
423                 timersub(&timeout->tv, &now, diff);
424
425                 if(diff->tv_sec < 0) {
426                         timeout->cb(timeout->data);
427
428                         if(timercmp(&timeout->tv, &now, <)) {
429                                 timeout_del(timeout);
430                         }
431                 } else {
432                         tv = diff;
433                         break;
434                 }
435         }
436
437         return tv;
438 }
439
440 bool event_loop(void) {
441         event_init();
442         running = true;
443
444 #ifndef HAVE_WINDOWS
445
446 #ifdef HAVE_SELECT
447         fd_set readable;
448         fd_set writable;
449 #endif
450
451         while(running) {
452                 struct timeval diff;
453                 struct timeval *tv = timeout_execute(&diff);
454
455 #ifdef HAVE_SELECT
456                 memcpy(&readable, &readfds, sizeof(readable));
457                 memcpy(&writable, &writefds, sizeof(writable));
458 #endif
459
460 #ifdef HAVE_EPOLL
461                 struct epoll_event events[MAX_EVENTS_PER_LOOP];
462                 long timeout = (tv->tv_sec * 1000) + (tv->tv_usec / 1000);
463
464                 if(timeout > INT_MAX) {
465                         timeout = INT_MAX;
466                 }
467
468                 int n = epoll_wait(event_fd, events, MAX_EVENTS_PER_LOOP, (int)timeout);
469 #endif
470
471 #ifdef HAVE_KQUEUE
472                 struct kevent events[MAX_EVENTS_PER_LOOP];
473
474                 const struct timespec ts = {
475                         .tv_sec = tv->tv_sec,
476                         .tv_nsec = tv->tv_usec * 1000,
477                 };
478
479                 int n = kevent(event_fd, NULL, 0, events, MAX_EVENTS_PER_LOOP, &ts);
480 #endif
481
482 #ifdef HAVE_SELECT
483                 int maxfds =  0;
484
485                 if(io_tree.tail) {
486                         io_t *last = io_tree.tail->data;
487                         maxfds = last->fd + 1;
488                 }
489
490                 int n = select(maxfds, &readable, &writable, NULL, tv);
491 #endif
492
493                 if(n < 0) {
494                         if(sockwouldblock(sockerrno)) {
495                                 continue;
496                         } else {
497                                 return false;
498                         }
499                 }
500
501                 if(!n) {
502                         continue;
503                 }
504
505                 unsigned int curgen = io_tree.generation;
506
507
508 #ifdef HAVE_EPOLL
509
510                 for(int i = 0; i < n; i++) {
511                         io_t *io = events[i].data.ptr;
512
513                         if(events[i].events & EPOLLOUT && io->flags & IO_WRITE) {
514                                 io->cb(io->data, IO_WRITE);
515                         }
516
517                         if(curgen != io_tree.generation) {
518                                 break;
519                         }
520
521                         if(events[i].events & EPOLLIN && io->flags & IO_READ) {
522                                 io->cb(io->data, IO_READ);
523                         }
524
525                         if(curgen != io_tree.generation) {
526                                 break;
527                         }
528                 }
529
530 #endif
531
532 #ifdef HAVE_KQUEUE
533
534                 for(int i = 0; i < n; i++) {
535                         const struct kevent *evt = &events[i];
536                         const io_t *io = evt->udata;
537
538                         if(evt->filter == EVFILT_WRITE) {
539                                 io->cb(io->data, IO_WRITE);
540                         } else if(evt->filter == EVFILT_READ) {
541                                 io->cb(io->data, IO_READ);
542                         } else {
543                                 continue;
544                         }
545
546                         if(curgen != io_tree.generation) {
547                                 break;
548                         }
549                 }
550
551 #endif
552
553 #ifdef HAVE_SELECT
554
555                 for splay_each(io_t, io, &io_tree) {
556                         if(FD_ISSET(io->fd, &writable)) {
557                                 io->cb(io->data, IO_WRITE);
558                         } else if(FD_ISSET(io->fd, &readable)) {
559                                 io->cb(io->data, IO_READ);
560                         } else {
561                                 continue;
562                         }
563
564                         /*
565                                 There are scenarios in which the callback will remove another io_t from the tree
566                                 (e.g. closing a double connection). Since splay_each does not support that, we
567                                 need to exit the loop if that happens. That's okay, since any remaining events will
568                                 get picked up by the next select() call.
569                         */
570                         if(curgen != io_tree.generation) {
571                                 break;
572                         }
573                 }
574
575 #endif
576         }
577
578 #else
579         assert(WSA_WAIT_EVENT_0 == 0);
580
581         while(running) {
582                 struct timeval diff;
583                 struct timeval *tv = timeout_execute(&diff);
584                 DWORD timeout_ms = tv ? (DWORD)(tv->tv_sec * 1000 + tv->tv_usec / 1000 + 1) : WSA_INFINITE;
585
586                 if(!event_count) {
587                         Sleep(timeout_ms);
588                         continue;
589                 }
590
591                 /*
592                    For some reason, Microsoft decided to make the FD_WRITE event edge-triggered instead of level-triggered,
593                    which is the opposite of what select() does. In practice, that means that if a FD_WRITE event triggers,
594                    it will never trigger again until a send() returns EWOULDBLOCK. Since the semantics of this event loop
595                    is that write events are level-triggered (i.e. they continue firing until the socket is full), we need
596                    to emulate these semantics by making sure we fire each IO_WRITE that is still writeable.
597
598                    Note that technically FD_CLOSE has the same problem, but it's okay because user code does not rely on
599                    this event being fired again if ignored.
600                 */
601                 unsigned int curgen = io_tree.generation;
602
603                 for splay_each(io_t, io, &io_tree) {
604                         if(io->flags & IO_WRITE && send(io->fd, NULL, 0, 0) == 0) {
605                                 io->cb(io->data, IO_WRITE);
606
607                                 if(curgen != io_tree.generation) {
608                                         break;
609                                 }
610                         }
611                 }
612
613                 if(event_count > WSA_MAXIMUM_WAIT_EVENTS) {
614                         WSASetLastError(WSA_INVALID_PARAMETER);
615                         return(false);
616                 }
617
618                 WSAEVENT events[WSA_MAXIMUM_WAIT_EVENTS];
619                 io_t *io_map[WSA_MAXIMUM_WAIT_EVENTS];
620                 DWORD event_index = 0;
621
622                 for splay_each(io_t, io, &io_tree) {
623                         events[event_index] = io->event;
624                         io_map[event_index] = io;
625                         event_index++;
626                 }
627
628                 /*
629                  * If the generation number changes due to event addition
630                  * or removal by a callback we restart the loop.
631                  */
632                 curgen = io_tree.generation;
633
634                 for(DWORD event_offset = 0; event_offset < event_count;) {
635                         DWORD result = WSAWaitForMultipleEvents(event_count - event_offset, &events[event_offset], FALSE, timeout_ms, FALSE);
636
637                         if(result == WSA_WAIT_TIMEOUT) {
638                                 break;
639                         }
640
641                         if(result >= event_count - event_offset) {
642                                 return false;
643                         }
644
645                         /* Look up io in the map by index. */
646                         event_index = result + event_offset;
647                         io_t *io = io_map[event_index];
648
649                         if(io->fd == -1) {
650                                 io->cb(io->data, 0);
651
652                                 if(curgen != io_tree.generation) {
653                                         break;
654                                 }
655                         } else {
656                                 WSANETWORKEVENTS network_events;
657
658                                 if(WSAEnumNetworkEvents(io->fd, io->event, &network_events) != 0) {
659                                         return(false);
660                                 }
661
662                                 if(network_events.lNetworkEvents & READ_EVENTS) {
663                                         io->cb(io->data, IO_READ);
664
665                                         if(curgen != io_tree.generation) {
666                                                 break;
667                                         }
668                                 }
669
670                                 /*
671                                     The fd might be available for write too. However, if we already fired the read callback, that
672                                     callback might have deleted the io (e.g. through terminate_connection()), so we can't fire the
673                                     write callback here. Instead, we loop back and let the writable io loop above handle it.
674                                  */
675                         }
676
677                         /* Continue checking the rest of the events. */
678                         event_offset = event_index + 1;
679
680                         /* Just poll the next time through. */
681                         timeout_ms = 0;
682                 }
683         }
684
685 #endif
686
687         event_deinit();
688         return true;
689 }
690
691 void event_exit(void) {
692         running = false;
693 }