Line data Source code
1 : /*
2 : Unix SMB/CIFS implementation.
3 :
4 : testing of the events subsystem
5 :
6 : Copyright (C) Stefan Metzmacher 2006-2009
7 : Copyright (C) Jeremy Allison 2013
8 :
9 : ** NOTE! The following LGPL license applies to the tevent
10 : ** library. This does NOT imply that all of Samba is released
11 : ** under the LGPL
12 :
13 : This library is free software; you can redistribute it and/or
14 : modify it under the terms of the GNU Lesser General Public
15 : License as published by the Free Software Foundation; either
16 : version 3 of the License, or (at your option) any later version.
17 :
18 : This library is distributed in the hope that it will be useful,
19 : but WITHOUT ANY WARRANTY; without even the implied warranty of
20 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21 : Lesser General Public License for more details.
22 :
23 : You should have received a copy of the GNU Lesser General Public
24 : License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 : */
26 :
27 : #include "includes.h"
28 : #define TEVENT_DEPRECATED 1
29 : #include "tevent.h"
30 : #include "system/filesys.h"
31 : #include "system/select.h"
32 : #include "system/network.h"
33 : #include "torture/torture.h"
34 : #include "torture/local/proto.h"
35 : #ifdef HAVE_PTHREAD
36 : #include "system/threads.h"
37 : #include <assert.h>
38 : #endif
39 :
40 : static int fde_count;
41 :
42 0 : static void do_read(int fd, void *buf, size_t count)
43 : {
44 : ssize_t ret;
45 :
46 : do {
47 0 : ret = read(fd, buf, count);
48 0 : } while (ret == -1 && errno == EINTR);
49 0 : }
50 :
51 0 : static void fde_handler_read(struct tevent_context *ev_ctx, struct tevent_fd *f,
52 : uint16_t flags, void *private_data)
53 : {
54 0 : int *fd = (int *)private_data;
55 : char c;
56 : #ifdef SA_SIGINFO
57 0 : kill(getpid(), SIGUSR1);
58 : #endif
59 0 : kill(getpid(), SIGALRM);
60 :
61 0 : do_read(fd[0], &c, 1);
62 0 : fde_count++;
63 0 : }
64 :
65 0 : static void do_write(int fd, void *buf, size_t count)
66 : {
67 : ssize_t ret;
68 :
69 : do {
70 0 : ret = write(fd, buf, count);
71 0 : } while (ret == -1 && errno == EINTR);
72 0 : }
73 :
74 0 : static void fde_handler_write(struct tevent_context *ev_ctx, struct tevent_fd *f,
75 : uint16_t flags, void *private_data)
76 : {
77 0 : int *fd = (int *)private_data;
78 0 : char c = 0;
79 :
80 0 : do_write(fd[1], &c, 1);
81 0 : }
82 :
83 :
84 : /* This will only fire if the fd's returned from pipe() are bi-directional. */
85 0 : static void fde_handler_read_1(struct tevent_context *ev_ctx, struct tevent_fd *f,
86 : uint16_t flags, void *private_data)
87 : {
88 0 : int *fd = (int *)private_data;
89 : char c;
90 : #ifdef SA_SIGINFO
91 0 : kill(getpid(), SIGUSR1);
92 : #endif
93 0 : kill(getpid(), SIGALRM);
94 :
95 0 : do_read(fd[1], &c, 1);
96 0 : fde_count++;
97 0 : }
98 :
99 : /* This will only fire if the fd's returned from pipe() are bi-directional. */
100 0 : static void fde_handler_write_1(struct tevent_context *ev_ctx, struct tevent_fd *f,
101 : uint16_t flags, void *private_data)
102 : {
103 0 : int *fd = (int *)private_data;
104 0 : char c = 0;
105 0 : do_write(fd[0], &c, 1);
106 0 : }
107 :
108 0 : static void finished_handler(struct tevent_context *ev_ctx, struct tevent_timer *te,
109 : struct timeval tval, void *private_data)
110 : {
111 0 : int *finished = (int *)private_data;
112 0 : (*finished) = 1;
113 0 : }
114 :
115 0 : static void count_handler(struct tevent_context *ev_ctx, struct tevent_signal *te,
116 : int signum, int count, void *info, void *private_data)
117 : {
118 0 : int *countp = (int *)private_data;
119 0 : (*countp) += count;
120 0 : }
121 :
122 0 : static bool test_event_context(struct torture_context *test,
123 : const void *test_data)
124 : {
125 : struct tevent_context *ev_ctx;
126 0 : int fd[2] = { -1, -1 };
127 0 : const char *backend = (const char *)test_data;
128 0 : int alarm_count=0, info_count=0;
129 : struct tevent_fd *fde_read;
130 : struct tevent_fd *fde_read_1;
131 : struct tevent_fd *fde_write;
132 : struct tevent_fd *fde_write_1;
133 : #ifdef SA_RESTART
134 0 : struct tevent_signal *se1 = NULL;
135 : #endif
136 : #ifdef SA_RESETHAND
137 0 : struct tevent_signal *se2 = NULL;
138 : #endif
139 : #ifdef SA_SIGINFO
140 0 : struct tevent_signal *se3 = NULL;
141 : #endif
142 0 : int finished=0;
143 : struct timeval t;
144 : int ret;
145 :
146 0 : ev_ctx = tevent_context_init_byname(test, backend);
147 0 : if (ev_ctx == NULL) {
148 0 : torture_comment(test, "event backend '%s' not supported\n", backend);
149 0 : return true;
150 : }
151 :
152 0 : torture_comment(test, "backend '%s' - %s\n",
153 : backend, __FUNCTION__);
154 :
155 : /* reset globals */
156 0 : fde_count = 0;
157 :
158 : /* create a pipe */
159 0 : ret = pipe(fd);
160 0 : torture_assert_int_equal(test, ret, 0, "pipe failed");
161 :
162 0 : fde_read = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_READ,
163 : fde_handler_read, fd);
164 0 : fde_write_1 = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_WRITE,
165 : fde_handler_write_1, fd);
166 :
167 0 : fde_write = tevent_add_fd(ev_ctx, ev_ctx, fd[1], TEVENT_FD_WRITE,
168 : fde_handler_write, fd);
169 0 : fde_read_1 = tevent_add_fd(ev_ctx, ev_ctx, fd[1], TEVENT_FD_READ,
170 : fde_handler_read_1, fd);
171 :
172 0 : tevent_fd_set_auto_close(fde_read);
173 0 : tevent_fd_set_auto_close(fde_write);
174 :
175 0 : tevent_add_timer(ev_ctx, ev_ctx, timeval_current_ofs(2,0),
176 : finished_handler, &finished);
177 :
178 : #ifdef SA_RESTART
179 0 : se1 = tevent_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESTART, count_handler, &alarm_count);
180 0 : torture_assert(test, se1 != NULL, "failed to setup se1");
181 : #endif
182 : #ifdef SA_RESETHAND
183 0 : se2 = tevent_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESETHAND, count_handler, &alarm_count);
184 0 : torture_assert(test, se2 != NULL, "failed to setup se2");
185 : #endif
186 : #ifdef SA_SIGINFO
187 0 : se3 = tevent_add_signal(ev_ctx, ev_ctx, SIGUSR1, SA_SIGINFO, count_handler, &info_count);
188 0 : torture_assert(test, se3 != NULL, "failed to setup se3");
189 : #endif
190 :
191 0 : t = timeval_current();
192 0 : while (!finished) {
193 0 : errno = 0;
194 0 : if (tevent_loop_once(ev_ctx) == -1) {
195 0 : TALLOC_FREE(ev_ctx);
196 0 : torture_fail(test, talloc_asprintf(test, "Failed event loop %s\n", strerror(errno)));
197 : return false;
198 : }
199 : }
200 :
201 0 : talloc_free(fde_read_1);
202 0 : talloc_free(fde_write_1);
203 0 : talloc_free(fde_read);
204 0 : talloc_free(fde_write);
205 :
206 0 : while (alarm_count < fde_count+1) {
207 0 : if (tevent_loop_once(ev_ctx) == -1) {
208 0 : break;
209 : }
210 : }
211 :
212 0 : torture_comment(test, "Got %.2f pipe events/sec\n", fde_count/timeval_elapsed(&t));
213 :
214 : #ifdef SA_RESTART
215 0 : talloc_free(se1);
216 : #endif
217 :
218 0 : torture_assert_int_equal(test, alarm_count, 1+fde_count, "alarm count mismatch");
219 :
220 : #ifdef SA_RESETHAND
221 : /*
222 : * we do not call talloc_free(se2)
223 : * because it is already gone,
224 : * after triggering the event handler.
225 : */
226 : #endif
227 :
228 : #ifdef SA_SIGINFO
229 0 : talloc_free(se3);
230 0 : torture_assert_int_equal(test, info_count, fde_count, "info count mismatch");
231 : #endif
232 :
233 0 : talloc_free(ev_ctx);
234 :
235 0 : return true;
236 : }
237 :
238 : struct test_event_fd1_state {
239 : struct torture_context *tctx;
240 : const char *backend;
241 : struct tevent_context *ev;
242 : int sock[2];
243 : struct tevent_timer *te;
244 : struct tevent_fd *fde0;
245 : struct tevent_fd *fde1;
246 : bool got_write;
247 : bool got_read;
248 : bool drain;
249 : bool drain_done;
250 : unsigned loop_count;
251 : bool finished;
252 : const char *error;
253 : };
254 :
255 0 : static void test_event_fd1_fde_handler(struct tevent_context *ev_ctx,
256 : struct tevent_fd *fde,
257 : uint16_t flags,
258 : void *private_data)
259 : {
260 0 : struct test_event_fd1_state *state =
261 : (struct test_event_fd1_state *)private_data;
262 :
263 0 : if (state->drain_done) {
264 0 : state->finished = true;
265 0 : state->error = __location__;
266 0 : return;
267 : }
268 :
269 0 : if (state->drain) {
270 : ssize_t ret;
271 0 : uint8_t c = 0;
272 :
273 0 : if (!(flags & TEVENT_FD_READ)) {
274 0 : state->finished = true;
275 0 : state->error = __location__;
276 0 : return;
277 : }
278 :
279 0 : ret = read(state->sock[0], &c, 1);
280 0 : if (ret == 1) {
281 0 : return;
282 : }
283 :
284 : /*
285 : * end of test...
286 : */
287 0 : tevent_fd_set_flags(fde, 0);
288 0 : state->drain_done = true;
289 0 : return;
290 : }
291 :
292 0 : if (!state->got_write) {
293 0 : uint8_t c = 0;
294 :
295 0 : if (flags != TEVENT_FD_WRITE) {
296 0 : state->finished = true;
297 0 : state->error = __location__;
298 0 : return;
299 : }
300 0 : state->got_write = true;
301 :
302 : /*
303 : * we write to the other socket...
304 : */
305 0 : do_write(state->sock[1], &c, 1);
306 0 : TEVENT_FD_NOT_WRITEABLE(fde);
307 0 : TEVENT_FD_READABLE(fde);
308 0 : return;
309 : }
310 :
311 0 : if (!state->got_read) {
312 0 : if (flags != TEVENT_FD_READ) {
313 0 : state->finished = true;
314 0 : state->error = __location__;
315 0 : return;
316 : }
317 0 : state->got_read = true;
318 :
319 0 : TEVENT_FD_NOT_READABLE(fde);
320 0 : return;
321 : }
322 :
323 0 : state->finished = true;
324 0 : state->error = __location__;
325 0 : return;
326 : }
327 :
328 0 : static void test_event_fd1_finished(struct tevent_context *ev_ctx,
329 : struct tevent_timer *te,
330 : struct timeval tval,
331 : void *private_data)
332 : {
333 0 : struct test_event_fd1_state *state =
334 : (struct test_event_fd1_state *)private_data;
335 :
336 0 : if (state->drain_done) {
337 0 : state->finished = true;
338 0 : return;
339 : }
340 :
341 0 : if (!state->got_write) {
342 0 : state->finished = true;
343 0 : state->error = __location__;
344 0 : return;
345 : }
346 :
347 0 : if (!state->got_read) {
348 0 : state->finished = true;
349 0 : state->error = __location__;
350 0 : return;
351 : }
352 :
353 0 : state->loop_count++;
354 0 : if (state->loop_count > 3) {
355 0 : state->finished = true;
356 0 : state->error = __location__;
357 0 : return;
358 : }
359 :
360 0 : state->got_write = false;
361 0 : state->got_read = false;
362 :
363 0 : tevent_fd_set_flags(state->fde0, TEVENT_FD_WRITE);
364 :
365 0 : if (state->loop_count > 2) {
366 0 : state->drain = true;
367 0 : TALLOC_FREE(state->fde1);
368 0 : TEVENT_FD_READABLE(state->fde0);
369 : }
370 :
371 0 : state->te = tevent_add_timer(state->ev, state->ev,
372 : timeval_current_ofs(0,2000),
373 : test_event_fd1_finished, state);
374 : }
375 :
376 0 : static bool test_event_fd1(struct torture_context *tctx,
377 : const void *test_data)
378 : {
379 : struct test_event_fd1_state state;
380 : int ret;
381 :
382 0 : ZERO_STRUCT(state);
383 0 : state.tctx = tctx;
384 0 : state.backend = (const char *)test_data;
385 :
386 0 : state.ev = tevent_context_init_byname(tctx, state.backend);
387 0 : if (state.ev == NULL) {
388 0 : torture_skip(tctx, talloc_asprintf(tctx,
389 : "event backend '%s' not supported\n",
390 : state.backend));
391 : return true;
392 : }
393 :
394 0 : tevent_set_debug_stderr(state.ev);
395 0 : torture_comment(tctx, "backend '%s' - %s\n",
396 : state.backend, __FUNCTION__);
397 :
398 : /*
399 : * This tests the following:
400 : *
401 : * It monitors the state of state.sock[0]
402 : * with tevent_fd, but we never read/write on state.sock[0]
403 : * while state.sock[1] * is only used to write a few bytes.
404 : *
405 : * We have a loop:
406 : * - we wait only for TEVENT_FD_WRITE on state.sock[0]
407 : * - we write 1 byte to state.sock[1]
408 : * - we wait only for TEVENT_FD_READ on state.sock[0]
409 : * - we disable events on state.sock[0]
410 : * - the timer event restarts the loop
411 : * Then we close state.sock[1]
412 : * We have a loop:
413 : * - we wait for TEVENT_FD_READ/WRITE on state.sock[0]
414 : * - we try to read 1 byte
415 : * - if the read gets an error of returns 0
416 : * we disable the event handler
417 : * - the timer finishes the test
418 : */
419 0 : state.sock[0] = -1;
420 0 : state.sock[1] = -1;
421 :
422 0 : ret = socketpair(AF_UNIX, SOCK_STREAM, 0, state.sock);
423 0 : torture_assert(tctx, ret == 0, "socketpair() failed");
424 :
425 0 : state.te = tevent_add_timer(state.ev, state.ev,
426 : timeval_current_ofs(0,10000),
427 : test_event_fd1_finished, &state);
428 0 : state.fde0 = tevent_add_fd(state.ev, state.ev,
429 : state.sock[0], TEVENT_FD_WRITE,
430 : test_event_fd1_fde_handler, &state);
431 : /* state.fde1 is only used to auto close */
432 0 : state.fde1 = tevent_add_fd(state.ev, state.ev,
433 : state.sock[1], 0,
434 : test_event_fd1_fde_handler, &state);
435 :
436 0 : tevent_fd_set_auto_close(state.fde0);
437 0 : tevent_fd_set_auto_close(state.fde1);
438 :
439 0 : while (!state.finished) {
440 0 : errno = 0;
441 0 : if (tevent_loop_once(state.ev) == -1) {
442 0 : talloc_free(state.ev);
443 0 : torture_fail(tctx, talloc_asprintf(tctx,
444 : "Failed event loop %s\n",
445 : strerror(errno)));
446 : }
447 : }
448 :
449 0 : talloc_free(state.ev);
450 :
451 0 : torture_assert(tctx, state.error == NULL, talloc_asprintf(tctx,
452 : "%s", state.error));
453 :
454 0 : return true;
455 : }
456 :
457 : struct test_event_fd2_state {
458 : struct torture_context *tctx;
459 : const char *backend;
460 : struct tevent_context *ev;
461 : struct tevent_timer *te;
462 : struct test_event_fd2_sock {
463 : struct test_event_fd2_state *state;
464 : int fd;
465 : struct tevent_fd *fde;
466 : size_t num_written;
467 : size_t num_read;
468 : bool got_full;
469 : } sock0, sock1;
470 : bool finished;
471 : const char *error;
472 : };
473 :
474 0 : static void test_event_fd2_sock_handler(struct tevent_context *ev_ctx,
475 : struct tevent_fd *fde,
476 : uint16_t flags,
477 : void *private_data)
478 : {
479 0 : struct test_event_fd2_sock *cur_sock =
480 : (struct test_event_fd2_sock *)private_data;
481 0 : struct test_event_fd2_state *state = cur_sock->state;
482 0 : struct test_event_fd2_sock *oth_sock = NULL;
483 0 : uint8_t v = 0, c;
484 : ssize_t ret;
485 :
486 0 : if (cur_sock == &state->sock0) {
487 0 : oth_sock = &state->sock1;
488 : } else {
489 0 : oth_sock = &state->sock0;
490 : }
491 :
492 0 : if (oth_sock->num_written == 1) {
493 0 : if (flags != (TEVENT_FD_READ | TEVENT_FD_WRITE)) {
494 0 : state->finished = true;
495 0 : state->error = __location__;
496 0 : return;
497 : }
498 : }
499 :
500 0 : if (cur_sock->num_read == oth_sock->num_written) {
501 0 : state->finished = true;
502 0 : state->error = __location__;
503 0 : return;
504 : }
505 :
506 0 : if (!(flags & TEVENT_FD_READ)) {
507 0 : state->finished = true;
508 0 : state->error = __location__;
509 0 : return;
510 : }
511 :
512 0 : if (oth_sock->num_read >= PIPE_BUF) {
513 : /*
514 : * On Linux we become writable once we've read
515 : * one byte. On Solaris we only become writable
516 : * again once we've read 4096 bytes. PIPE_BUF
517 : * is probably a safe bet to test against.
518 : *
519 : * There should be room to write a byte again
520 : */
521 0 : if (!(flags & TEVENT_FD_WRITE)) {
522 0 : state->finished = true;
523 0 : state->error = __location__;
524 0 : return;
525 : }
526 : }
527 :
528 0 : if ((flags & TEVENT_FD_WRITE) && !cur_sock->got_full) {
529 0 : v = (uint8_t)cur_sock->num_written;
530 0 : ret = write(cur_sock->fd, &v, 1);
531 0 : if (ret != 1) {
532 0 : state->finished = true;
533 0 : state->error = __location__;
534 0 : return;
535 : }
536 0 : cur_sock->num_written++;
537 0 : if (cur_sock->num_written > 0x80000000) {
538 0 : state->finished = true;
539 0 : state->error = __location__;
540 0 : return;
541 : }
542 0 : return;
543 : }
544 :
545 0 : if (!cur_sock->got_full) {
546 0 : cur_sock->got_full = true;
547 :
548 0 : if (!oth_sock->got_full) {
549 : /*
550 : * cur_sock is full,
551 : * lets wait for oth_sock
552 : * to be filled
553 : */
554 0 : tevent_fd_set_flags(cur_sock->fde, 0);
555 0 : return;
556 : }
557 :
558 : /*
559 : * oth_sock waited for cur_sock,
560 : * lets restart it
561 : */
562 0 : tevent_fd_set_flags(oth_sock->fde,
563 : TEVENT_FD_READ|TEVENT_FD_WRITE);
564 : }
565 :
566 0 : ret = read(cur_sock->fd, &v, 1);
567 0 : if (ret != 1) {
568 0 : state->finished = true;
569 0 : state->error = __location__;
570 0 : return;
571 : }
572 0 : c = (uint8_t)cur_sock->num_read;
573 0 : if (c != v) {
574 0 : state->finished = true;
575 0 : state->error = __location__;
576 0 : return;
577 : }
578 0 : cur_sock->num_read++;
579 :
580 0 : if (cur_sock->num_read < oth_sock->num_written) {
581 : /* there is more to read */
582 0 : return;
583 : }
584 : /*
585 : * we read everything, we need to remove TEVENT_FD_WRITE
586 : * to avoid spinning
587 : */
588 0 : TEVENT_FD_NOT_WRITEABLE(cur_sock->fde);
589 :
590 0 : if (oth_sock->num_read == cur_sock->num_written) {
591 : /*
592 : * both directions are finished
593 : */
594 0 : state->finished = true;
595 : }
596 :
597 0 : return;
598 : }
599 :
600 0 : static void test_event_fd2_finished(struct tevent_context *ev_ctx,
601 : struct tevent_timer *te,
602 : struct timeval tval,
603 : void *private_data)
604 : {
605 0 : struct test_event_fd2_state *state =
606 : (struct test_event_fd2_state *)private_data;
607 :
608 : /*
609 : * this should never be triggered
610 : */
611 0 : state->finished = true;
612 0 : state->error = __location__;
613 0 : }
614 :
615 0 : static bool test_event_fd2(struct torture_context *tctx,
616 : const void *test_data)
617 : {
618 : struct test_event_fd2_state state;
619 : int sock[2];
620 0 : uint8_t c = 0;
621 :
622 0 : ZERO_STRUCT(state);
623 0 : state.tctx = tctx;
624 0 : state.backend = (const char *)test_data;
625 :
626 0 : state.ev = tevent_context_init_byname(tctx, state.backend);
627 0 : if (state.ev == NULL) {
628 0 : torture_skip(tctx, talloc_asprintf(tctx,
629 : "event backend '%s' not supported\n",
630 : state.backend));
631 : return true;
632 : }
633 :
634 0 : tevent_set_debug_stderr(state.ev);
635 0 : torture_comment(tctx, "backend '%s' - %s\n",
636 : state.backend, __FUNCTION__);
637 :
638 : /*
639 : * This tests the following
640 : *
641 : * - We write 1 byte to each socket
642 : * - We wait for TEVENT_FD_READ/WRITE on both sockets
643 : * - When we get TEVENT_FD_WRITE we write 1 byte
644 : * until both socket buffers are full, which
645 : * means both sockets only get TEVENT_FD_READ.
646 : * - Then we read 1 byte until we have consumed
647 : * all bytes the other end has written.
648 : */
649 0 : sock[0] = -1;
650 0 : sock[1] = -1;
651 0 : socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
652 :
653 : /*
654 : * the timer should never expire
655 : */
656 0 : state.te = tevent_add_timer(state.ev, state.ev,
657 : timeval_current_ofs(600, 0),
658 : test_event_fd2_finished, &state);
659 0 : state.sock0.state = &state;
660 0 : state.sock0.fd = sock[0];
661 0 : state.sock0.fde = tevent_add_fd(state.ev, state.ev,
662 : state.sock0.fd,
663 : TEVENT_FD_READ | TEVENT_FD_WRITE,
664 : test_event_fd2_sock_handler,
665 : &state.sock0);
666 0 : state.sock1.state = &state;
667 0 : state.sock1.fd = sock[1];
668 0 : state.sock1.fde = tevent_add_fd(state.ev, state.ev,
669 : state.sock1.fd,
670 : TEVENT_FD_READ | TEVENT_FD_WRITE,
671 : test_event_fd2_sock_handler,
672 : &state.sock1);
673 :
674 0 : tevent_fd_set_auto_close(state.sock0.fde);
675 0 : tevent_fd_set_auto_close(state.sock1.fde);
676 :
677 0 : do_write(state.sock0.fd, &c, 1);
678 0 : state.sock0.num_written++;
679 0 : do_write(state.sock1.fd, &c, 1);
680 0 : state.sock1.num_written++;
681 :
682 0 : while (!state.finished) {
683 0 : errno = 0;
684 0 : if (tevent_loop_once(state.ev) == -1) {
685 0 : talloc_free(state.ev);
686 0 : torture_fail(tctx, talloc_asprintf(tctx,
687 : "Failed event loop %s\n",
688 : strerror(errno)));
689 : }
690 : }
691 :
692 0 : talloc_free(state.ev);
693 :
694 0 : torture_assert(tctx, state.error == NULL, talloc_asprintf(tctx,
695 : "%s", state.error));
696 :
697 0 : return true;
698 : }
699 :
700 : struct test_wrapper_state {
701 : struct torture_context *tctx;
702 : int num_events;
703 : int num_wrap_handlers;
704 : };
705 :
706 0 : static bool test_wrapper_before_use(struct tevent_context *wrap_ev,
707 : void *private_data,
708 : struct tevent_context *main_ev,
709 : const char *location)
710 : {
711 0 : struct test_wrapper_state *state =
712 0 : talloc_get_type_abort(private_data,
713 : struct test_wrapper_state);
714 :
715 0 : torture_comment(state->tctx, "%s\n", __func__);
716 0 : state->num_wrap_handlers++;
717 0 : return true;
718 : }
719 :
720 0 : static void test_wrapper_after_use(struct tevent_context *wrap_ev,
721 : void *private_data,
722 : struct tevent_context *main_ev,
723 : const char *location)
724 : {
725 0 : struct test_wrapper_state *state =
726 0 : talloc_get_type_abort(private_data,
727 : struct test_wrapper_state);
728 :
729 0 : torture_comment(state->tctx, "%s\n", __func__);
730 0 : state->num_wrap_handlers++;
731 0 : }
732 :
733 0 : static void test_wrapper_before_fd_handler(struct tevent_context *wrap_ev,
734 : void *private_data,
735 : struct tevent_context *main_ev,
736 : struct tevent_fd *fde,
737 : uint16_t flags,
738 : const char *handler_name,
739 : const char *location)
740 : {
741 0 : struct test_wrapper_state *state =
742 0 : talloc_get_type_abort(private_data,
743 : struct test_wrapper_state);
744 :
745 0 : torture_comment(state->tctx, "%s\n", __func__);
746 0 : state->num_wrap_handlers++;
747 0 : }
748 :
749 0 : static void test_wrapper_after_fd_handler(struct tevent_context *wrap_ev,
750 : void *private_data,
751 : struct tevent_context *main_ev,
752 : struct tevent_fd *fde,
753 : uint16_t flags,
754 : const char *handler_name,
755 : const char *location)
756 : {
757 0 : struct test_wrapper_state *state =
758 0 : talloc_get_type_abort(private_data,
759 : struct test_wrapper_state);
760 :
761 0 : torture_comment(state->tctx, "%s\n", __func__);
762 0 : state->num_wrap_handlers++;
763 0 : }
764 :
765 0 : static void test_wrapper_before_timer_handler(struct tevent_context *wrap_ev,
766 : void *private_data,
767 : struct tevent_context *main_ev,
768 : struct tevent_timer *te,
769 : struct timeval requested_time,
770 : struct timeval trigger_time,
771 : const char *handler_name,
772 : const char *location)
773 : {
774 0 : struct test_wrapper_state *state =
775 0 : talloc_get_type_abort(private_data,
776 : struct test_wrapper_state);
777 :
778 0 : torture_comment(state->tctx, "%s\n", __func__);
779 0 : state->num_wrap_handlers++;
780 0 : }
781 :
782 0 : static void test_wrapper_after_timer_handler(struct tevent_context *wrap_ev,
783 : void *private_data,
784 : struct tevent_context *main_ev,
785 : struct tevent_timer *te,
786 : struct timeval requested_time,
787 : struct timeval trigger_time,
788 : const char *handler_name,
789 : const char *location)
790 : {
791 0 : struct test_wrapper_state *state =
792 0 : talloc_get_type_abort(private_data,
793 : struct test_wrapper_state);
794 :
795 0 : torture_comment(state->tctx, "%s\n", __func__);
796 0 : state->num_wrap_handlers++;
797 0 : }
798 :
799 0 : static void test_wrapper_before_immediate_handler(struct tevent_context *wrap_ev,
800 : void *private_data,
801 : struct tevent_context *main_ev,
802 : struct tevent_immediate *im,
803 : const char *handler_name,
804 : const char *location)
805 : {
806 0 : struct test_wrapper_state *state =
807 0 : talloc_get_type_abort(private_data,
808 : struct test_wrapper_state);
809 :
810 0 : torture_comment(state->tctx, "%s\n", __func__);
811 0 : state->num_wrap_handlers++;
812 0 : }
813 :
814 0 : static void test_wrapper_after_immediate_handler(struct tevent_context *wrap_ev,
815 : void *private_data,
816 : struct tevent_context *main_ev,
817 : struct tevent_immediate *im,
818 : const char *handler_name,
819 : const char *location)
820 : {
821 0 : struct test_wrapper_state *state =
822 0 : talloc_get_type_abort(private_data,
823 : struct test_wrapper_state);
824 :
825 0 : torture_comment(state->tctx, "%s\n", __func__);
826 0 : state->num_wrap_handlers++;
827 0 : }
828 :
829 0 : static void test_wrapper_before_signal_handler(struct tevent_context *wrap_ev,
830 : void *private_data,
831 : struct tevent_context *main_ev,
832 : struct tevent_signal *se,
833 : int signum,
834 : int count,
835 : void *siginfo,
836 : const char *handler_name,
837 : const char *location)
838 : {
839 0 : struct test_wrapper_state *state =
840 0 : talloc_get_type_abort(private_data,
841 : struct test_wrapper_state);
842 :
843 0 : torture_comment(state->tctx, "%s\n", __func__);
844 0 : state->num_wrap_handlers++;
845 0 : }
846 :
847 0 : static void test_wrapper_after_signal_handler(struct tevent_context *wrap_ev,
848 : void *private_data,
849 : struct tevent_context *main_ev,
850 : struct tevent_signal *se,
851 : int signum,
852 : int count,
853 : void *siginfo,
854 : const char *handler_name,
855 : const char *location)
856 : {
857 0 : struct test_wrapper_state *state =
858 0 : talloc_get_type_abort(private_data,
859 : struct test_wrapper_state);
860 :
861 0 : torture_comment(state->tctx, "%s\n", __func__);
862 0 : state->num_wrap_handlers++;
863 0 : }
864 :
865 : static const struct tevent_wrapper_ops test_wrapper_ops = {
866 : .name = "test_wrapper",
867 : .before_use = test_wrapper_before_use,
868 : .after_use = test_wrapper_after_use,
869 : .before_fd_handler = test_wrapper_before_fd_handler,
870 : .after_fd_handler = test_wrapper_after_fd_handler,
871 : .before_timer_handler = test_wrapper_before_timer_handler,
872 : .after_timer_handler = test_wrapper_after_timer_handler,
873 : .before_immediate_handler = test_wrapper_before_immediate_handler,
874 : .after_immediate_handler = test_wrapper_after_immediate_handler,
875 : .before_signal_handler = test_wrapper_before_signal_handler,
876 : .after_signal_handler = test_wrapper_after_signal_handler,
877 : };
878 :
879 0 : static void test_wrapper_timer_handler(struct tevent_context *ev,
880 : struct tevent_timer *te,
881 : struct timeval tv,
882 : void *private_data)
883 : {
884 0 : struct test_wrapper_state *state =
885 : (struct test_wrapper_state *)private_data;
886 :
887 :
888 0 : torture_comment(state->tctx, "timer handler\n");
889 :
890 0 : state->num_events++;
891 0 : talloc_free(te);
892 0 : return;
893 : }
894 :
895 0 : static void test_wrapper_fd_handler(struct tevent_context *ev,
896 : struct tevent_fd *fde,
897 : unsigned short fd_flags,
898 : void *private_data)
899 : {
900 0 : struct test_wrapper_state *state =
901 : (struct test_wrapper_state *)private_data;
902 :
903 0 : torture_comment(state->tctx, "fd handler\n");
904 :
905 0 : state->num_events++;
906 0 : talloc_free(fde);
907 0 : return;
908 : }
909 :
910 0 : static void test_wrapper_immediate_handler(struct tevent_context *ev,
911 : struct tevent_immediate *im,
912 : void *private_data)
913 : {
914 0 : struct test_wrapper_state *state =
915 : (struct test_wrapper_state *)private_data;
916 :
917 0 : state->num_events++;
918 0 : talloc_free(im);
919 :
920 0 : torture_comment(state->tctx, "immediate handler\n");
921 0 : return;
922 : }
923 :
924 0 : static void test_wrapper_signal_handler(struct tevent_context *ev,
925 : struct tevent_signal *se,
926 : int signum,
927 : int count,
928 : void *siginfo,
929 : void *private_data)
930 : {
931 0 : struct test_wrapper_state *state =
932 : (struct test_wrapper_state *)private_data;
933 :
934 0 : torture_comment(state->tctx, "signal handler\n");
935 :
936 0 : state->num_events++;
937 0 : talloc_free(se);
938 0 : return;
939 : }
940 :
941 0 : static bool test_wrapper(struct torture_context *tctx,
942 : const void *test_data)
943 : {
944 0 : struct test_wrapper_state *state = NULL;
945 0 : int sock[2] = { -1, -1};
946 0 : uint8_t c = 0;
947 0 : const int num_events = 4;
948 0 : const char *backend = (const char *)test_data;
949 0 : struct tevent_context *ev = NULL;
950 0 : struct tevent_context *wrap_ev = NULL;
951 0 : struct tevent_fd *fde = NULL;
952 0 : struct tevent_timer *te = NULL;
953 0 : struct tevent_signal *se = NULL;
954 0 : struct tevent_immediate *im = NULL;
955 : int ret;
956 0 : bool ok = false;
957 : bool ret2;
958 :
959 0 : ev = tevent_context_init_byname(tctx, backend);
960 0 : if (ev == NULL) {
961 0 : torture_skip(tctx, talloc_asprintf(tctx,
962 : "event backend '%s' not supported\n",
963 : backend));
964 : return true;
965 : }
966 :
967 0 : tevent_set_debug_stderr(ev);
968 0 : torture_comment(tctx, "tevent backend '%s'\n", backend);
969 :
970 0 : wrap_ev = tevent_context_wrapper_create(
971 : ev, ev, &test_wrapper_ops, &state, struct test_wrapper_state);
972 0 : torture_assert_not_null_goto(tctx, wrap_ev, ok, done,
973 : "tevent_context_wrapper_create failed\n");
974 0 : *state = (struct test_wrapper_state) {
975 : .tctx = tctx,
976 : };
977 :
978 0 : ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
979 0 : torture_assert_goto(tctx, ret == 0, ok, done, "socketpair failed\n");
980 :
981 0 : te = tevent_add_timer(wrap_ev, wrap_ev,
982 : timeval_current_ofs(0, 0),
983 : test_wrapper_timer_handler, state);
984 0 : torture_assert_not_null_goto(tctx, te, ok, done,
985 : "tevent_add_timer failed\n");
986 :
987 0 : fde = tevent_add_fd(wrap_ev, wrap_ev,
988 : sock[1],
989 : TEVENT_FD_READ,
990 : test_wrapper_fd_handler,
991 : state);
992 0 : torture_assert_not_null_goto(tctx, fde, ok, done,
993 : "tevent_add_fd failed\n");
994 :
995 0 : im = tevent_create_immediate(wrap_ev);
996 0 : torture_assert_not_null_goto(tctx, im, ok, done,
997 : "tevent_create_immediate failed\n");
998 :
999 0 : se = tevent_add_signal(wrap_ev, wrap_ev,
1000 : SIGUSR1,
1001 : 0,
1002 : test_wrapper_signal_handler,
1003 : state);
1004 0 : torture_assert_not_null_goto(tctx, se, ok, done,
1005 : "tevent_add_signal failed\n");
1006 :
1007 0 : do_write(sock[0], &c, 1);
1008 0 : kill(getpid(), SIGUSR1);
1009 0 : tevent_schedule_immediate(im,
1010 : wrap_ev,
1011 : test_wrapper_immediate_handler,
1012 : state);
1013 :
1014 0 : ret2 = tevent_context_push_use(wrap_ev);
1015 0 : torture_assert_goto(tctx, ret2, ok, done, "tevent_context_push_use(wrap_ev) failed\n");
1016 0 : ret2 = tevent_context_push_use(ev);
1017 0 : torture_assert_goto(tctx, ret2, ok, pop_use, "tevent_context_push_use(ev) failed\n");
1018 0 : tevent_context_pop_use(ev);
1019 0 : tevent_context_pop_use(wrap_ev);
1020 :
1021 0 : ret = tevent_loop_wait(ev);
1022 0 : torture_assert_int_equal_goto(tctx, ret, 0, ok, done, "tevent_loop_wait failed\n");
1023 :
1024 0 : torture_comment(tctx, "Num events: %d\n", state->num_events);
1025 0 : torture_comment(tctx, "Num wrap handlers: %d\n",
1026 0 : state->num_wrap_handlers);
1027 :
1028 0 : torture_assert_int_equal_goto(tctx, state->num_events, num_events, ok, done,
1029 : "Wrong event count\n");
1030 0 : torture_assert_int_equal_goto(tctx, state->num_wrap_handlers,
1031 : num_events*2+2,
1032 : ok, done, "Wrong wrapper count\n");
1033 :
1034 0 : ok = true;
1035 :
1036 0 : done:
1037 0 : TALLOC_FREE(wrap_ev);
1038 0 : TALLOC_FREE(ev);
1039 :
1040 0 : if (sock[0] != -1) {
1041 0 : close(sock[0]);
1042 : }
1043 0 : if (sock[1] != -1) {
1044 0 : close(sock[1]);
1045 : }
1046 0 : return ok;
1047 0 : pop_use:
1048 0 : tevent_context_pop_use(wrap_ev);
1049 0 : goto done;
1050 : }
1051 :
1052 0 : static void test_free_wrapper_signal_handler(struct tevent_context *ev,
1053 : struct tevent_signal *se,
1054 : int signum,
1055 : int count,
1056 : void *siginfo,
1057 : void *private_data)
1058 : {
1059 0 : struct torture_context *tctx =
1060 0 : talloc_get_type_abort(private_data,
1061 : struct torture_context);
1062 :
1063 0 : torture_comment(tctx, "signal handler\n");
1064 :
1065 0 : talloc_free(se);
1066 :
1067 : /*
1068 : * signal handlers have highest priority in tevent, so this signal
1069 : * handler will always be started before the other handlers
1070 : * below. Freeing the (wrapper) event context here tests that the
1071 : * wrapper implementation correclty handles the wrapper ev going away
1072 : * with pending events.
1073 : */
1074 0 : talloc_free(ev);
1075 0 : return;
1076 : }
1077 :
1078 0 : static void test_free_wrapper_fd_handler(struct tevent_context *ev,
1079 : struct tevent_fd *fde,
1080 : unsigned short fd_flags,
1081 : void *private_data)
1082 : {
1083 : /*
1084 : * This should never be called as
1085 : * test_free_wrapper_signal_handler()
1086 : * already destroyed the wrapper tevent_context.
1087 : */
1088 0 : abort();
1089 : }
1090 :
1091 0 : static void test_free_wrapper_immediate_handler(struct tevent_context *ev,
1092 : struct tevent_immediate *im,
1093 : void *private_data)
1094 : {
1095 : /*
1096 : * This should never be called as
1097 : * test_free_wrapper_signal_handler()
1098 : * already destroyed the wrapper tevent_context.
1099 : */
1100 0 : abort();
1101 : }
1102 :
1103 0 : static void test_free_wrapper_timer_handler(struct tevent_context *ev,
1104 : struct tevent_timer *te,
1105 : struct timeval tv,
1106 : void *private_data)
1107 : {
1108 : /*
1109 : * This should never be called as
1110 : * test_free_wrapper_signal_handler()
1111 : * already destroyed the wrapper tevent_context.
1112 : */
1113 0 : abort();
1114 : }
1115 :
1116 0 : static bool test_free_wrapper(struct torture_context *tctx,
1117 : const void *test_data)
1118 : {
1119 0 : struct test_wrapper_state *state = NULL;
1120 0 : int sock[2] = { -1, -1};
1121 0 : uint8_t c = 0;
1122 0 : const char *backend = (const char *)test_data;
1123 0 : TALLOC_CTX *frame = talloc_stackframe();
1124 0 : struct tevent_context *ev = NULL;
1125 0 : struct tevent_context *wrap_ev = NULL;
1126 0 : struct tevent_fd *fde = NULL;
1127 0 : struct tevent_timer *te = NULL;
1128 0 : struct tevent_signal *se = NULL;
1129 0 : struct tevent_immediate *im = NULL;
1130 : int ret;
1131 0 : bool ok = false;
1132 :
1133 0 : ev = tevent_context_init_byname(frame, backend);
1134 0 : if (ev == NULL) {
1135 0 : torture_skip(tctx, talloc_asprintf(tctx,
1136 : "event backend '%s' not supported\n",
1137 : backend));
1138 : return true;
1139 : }
1140 :
1141 0 : tevent_set_debug_stderr(ev);
1142 0 : torture_comment(tctx, "tevent backend '%s'\n", backend);
1143 :
1144 0 : wrap_ev = tevent_context_wrapper_create(
1145 : ev, ev, &test_wrapper_ops, &state, struct test_wrapper_state);
1146 0 : torture_assert_not_null_goto(tctx, wrap_ev, ok, done,
1147 : "tevent_context_wrapper_create failed\n");
1148 0 : *state = (struct test_wrapper_state) {
1149 : .tctx = tctx,
1150 : };
1151 :
1152 0 : ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
1153 0 : torture_assert_goto(tctx, ret == 0, ok, done, "socketpair failed\n");
1154 :
1155 0 : fde = tevent_add_fd(wrap_ev, frame,
1156 : sock[1],
1157 : TEVENT_FD_READ,
1158 : test_free_wrapper_fd_handler,
1159 : NULL);
1160 0 : torture_assert_not_null_goto(tctx, fde, ok, done,
1161 : "tevent_add_fd failed\n");
1162 :
1163 0 : te = tevent_add_timer(wrap_ev, frame,
1164 : timeval_current_ofs(0, 0),
1165 : test_free_wrapper_timer_handler, NULL);
1166 0 : torture_assert_not_null_goto(tctx, te, ok, done,
1167 : "tevent_add_timer failed\n");
1168 :
1169 0 : im = tevent_create_immediate(frame);
1170 0 : torture_assert_not_null_goto(tctx, im, ok, done,
1171 : "tevent_create_immediate failed\n");
1172 :
1173 0 : se = tevent_add_signal(wrap_ev, frame,
1174 : SIGUSR1,
1175 : 0,
1176 : test_free_wrapper_signal_handler,
1177 : tctx);
1178 0 : torture_assert_not_null_goto(tctx, se, ok, done,
1179 : "tevent_add_signal failed\n");
1180 :
1181 0 : do_write(sock[0], &c, 1);
1182 0 : kill(getpid(), SIGUSR1);
1183 0 : tevent_schedule_immediate(im,
1184 : wrap_ev,
1185 : test_free_wrapper_immediate_handler,
1186 : NULL);
1187 :
1188 0 : ret = tevent_loop_wait(ev);
1189 0 : torture_assert_goto(tctx, ret == 0, ok, done, "tevent_loop_wait failed\n");
1190 :
1191 0 : ok = true;
1192 :
1193 0 : done:
1194 0 : TALLOC_FREE(frame);
1195 :
1196 0 : if (sock[0] != -1) {
1197 0 : close(sock[0]);
1198 : }
1199 0 : if (sock[1] != -1) {
1200 0 : close(sock[1]);
1201 : }
1202 0 : return ok;
1203 : }
1204 :
1205 : #ifdef HAVE_PTHREAD
1206 :
1207 : static pthread_mutex_t threaded_mutex = PTHREAD_MUTEX_INITIALIZER;
1208 : static bool do_shutdown = false;
1209 :
1210 0 : static void test_event_threaded_lock(void)
1211 : {
1212 : int ret;
1213 0 : ret = pthread_mutex_lock(&threaded_mutex);
1214 0 : assert(ret == 0);
1215 0 : }
1216 :
1217 0 : static void test_event_threaded_unlock(void)
1218 : {
1219 : int ret;
1220 0 : ret = pthread_mutex_unlock(&threaded_mutex);
1221 0 : assert(ret == 0);
1222 0 : }
1223 :
1224 0 : static void test_event_threaded_trace(enum tevent_trace_point point,
1225 : void *private_data)
1226 : {
1227 0 : switch (point) {
1228 0 : case TEVENT_TRACE_BEFORE_WAIT:
1229 0 : test_event_threaded_unlock();
1230 0 : break;
1231 0 : case TEVENT_TRACE_AFTER_WAIT:
1232 0 : test_event_threaded_lock();
1233 0 : break;
1234 0 : case TEVENT_TRACE_BEFORE_LOOP_ONCE:
1235 : case TEVENT_TRACE_AFTER_LOOP_ONCE:
1236 0 : break;
1237 : }
1238 0 : }
1239 :
1240 0 : static void test_event_threaded_timer(struct tevent_context *ev,
1241 : struct tevent_timer *te,
1242 : struct timeval current_time,
1243 : void *private_data)
1244 : {
1245 0 : return;
1246 : }
1247 :
1248 0 : static void *test_event_poll_thread(void *private_data)
1249 : {
1250 0 : struct tevent_context *ev = (struct tevent_context *)private_data;
1251 :
1252 0 : test_event_threaded_lock();
1253 :
1254 0 : while (true) {
1255 : int ret;
1256 0 : ret = tevent_loop_once(ev);
1257 0 : assert(ret == 0);
1258 0 : if (do_shutdown) {
1259 0 : test_event_threaded_unlock();
1260 0 : return NULL;
1261 : }
1262 : }
1263 :
1264 : }
1265 :
1266 0 : static void test_event_threaded_read_handler(struct tevent_context *ev,
1267 : struct tevent_fd *fde,
1268 : uint16_t flags,
1269 : void *private_data)
1270 : {
1271 0 : int *pfd = (int *)private_data;
1272 : char c;
1273 : ssize_t nread;
1274 :
1275 0 : if ((flags & TEVENT_FD_READ) == 0) {
1276 0 : return;
1277 : }
1278 :
1279 : do {
1280 0 : nread = read(*pfd, &c, 1);
1281 0 : } while ((nread == -1) && (errno == EINTR));
1282 :
1283 0 : assert(nread == 1);
1284 : }
1285 :
1286 0 : static bool test_event_context_threaded(struct torture_context *test,
1287 : const void *test_data)
1288 : {
1289 : struct tevent_context *ev;
1290 : struct tevent_timer *te;
1291 : struct tevent_fd *fde;
1292 : pthread_t poll_thread;
1293 : int fds[2];
1294 : int ret;
1295 0 : char c = 0;
1296 :
1297 0 : ev = tevent_context_init_byname(test, "poll_mt");
1298 0 : torture_assert(test, ev != NULL, "poll_mt not supported");
1299 :
1300 0 : tevent_set_trace_callback(ev, test_event_threaded_trace, NULL);
1301 :
1302 0 : te = tevent_add_timer(ev, ev, timeval_current_ofs(5, 0),
1303 : test_event_threaded_timer, NULL);
1304 0 : torture_assert(test, te != NULL, "Could not add timer");
1305 :
1306 0 : ret = pthread_create(&poll_thread, NULL, test_event_poll_thread, ev);
1307 0 : torture_assert(test, ret == 0, "Could not create poll thread");
1308 :
1309 0 : ret = pipe(fds);
1310 0 : torture_assert(test, ret == 0, "Could not create pipe");
1311 :
1312 0 : poll(NULL, 0, 100);
1313 :
1314 0 : test_event_threaded_lock();
1315 :
1316 0 : fde = tevent_add_fd(ev, ev, fds[0], TEVENT_FD_READ,
1317 : test_event_threaded_read_handler, &fds[0]);
1318 0 : torture_assert(test, fde != NULL, "Could not add fd event");
1319 :
1320 0 : test_event_threaded_unlock();
1321 :
1322 0 : poll(NULL, 0, 100);
1323 :
1324 0 : do_write(fds[1], &c, 1);
1325 :
1326 0 : poll(NULL, 0, 100);
1327 :
1328 0 : test_event_threaded_lock();
1329 0 : do_shutdown = true;
1330 0 : test_event_threaded_unlock();
1331 :
1332 0 : do_write(fds[1], &c, 1);
1333 :
1334 0 : ret = pthread_join(poll_thread, NULL);
1335 0 : torture_assert(test, ret == 0, "pthread_join failed");
1336 :
1337 0 : return true;
1338 : }
1339 :
1340 : #define NUM_TEVENT_THREADS 100
1341 :
1342 : /* Ugly, but needed for torture_comment... */
1343 : static struct torture_context *thread_test_ctx;
1344 : static pthread_t thread_map[NUM_TEVENT_THREADS];
1345 : static unsigned thread_counter;
1346 :
1347 : /* Called in master thread context */
1348 0 : static void callback_nowait(struct tevent_context *ev,
1349 : struct tevent_immediate *im,
1350 : void *private_ptr)
1351 : {
1352 0 : pthread_t *thread_id_ptr =
1353 0 : talloc_get_type_abort(private_ptr, pthread_t);
1354 : unsigned i;
1355 :
1356 0 : for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1357 0 : if (pthread_equal(*thread_id_ptr,
1358 : thread_map[i])) {
1359 0 : break;
1360 : }
1361 : }
1362 0 : torture_comment(thread_test_ctx,
1363 : "Callback %u from thread %u\n",
1364 : thread_counter,
1365 : i);
1366 0 : thread_counter++;
1367 0 : }
1368 :
1369 : /* Blast the master tevent_context with a callback, no waiting. */
1370 0 : static void *thread_fn_nowait(void *private_ptr)
1371 : {
1372 0 : struct tevent_thread_proxy *master_tp =
1373 0 : talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
1374 : struct tevent_immediate *im;
1375 : pthread_t *thread_id_ptr;
1376 :
1377 0 : im = tevent_create_immediate(NULL);
1378 0 : if (im == NULL) {
1379 0 : return NULL;
1380 : }
1381 0 : thread_id_ptr = talloc(NULL, pthread_t);
1382 0 : if (thread_id_ptr == NULL) {
1383 0 : return NULL;
1384 : }
1385 0 : *thread_id_ptr = pthread_self();
1386 :
1387 0 : tevent_thread_proxy_schedule(master_tp,
1388 : &im,
1389 : callback_nowait,
1390 : &thread_id_ptr);
1391 0 : return NULL;
1392 : }
1393 :
1394 0 : static void timeout_fn(struct tevent_context *ev,
1395 : struct tevent_timer *te,
1396 : struct timeval tv, void *p)
1397 : {
1398 0 : thread_counter = NUM_TEVENT_THREADS * 10;
1399 0 : }
1400 :
1401 0 : static bool test_multi_tevent_threaded(struct torture_context *test,
1402 : const void *test_data)
1403 : {
1404 : unsigned i;
1405 : struct tevent_context *master_ev;
1406 : struct tevent_thread_proxy *tp;
1407 :
1408 0 : talloc_disable_null_tracking();
1409 :
1410 : /* Ugly global stuff. */
1411 0 : thread_test_ctx = test;
1412 0 : thread_counter = 0;
1413 :
1414 0 : master_ev = tevent_context_init(NULL);
1415 0 : if (master_ev == NULL) {
1416 0 : return false;
1417 : }
1418 0 : tevent_set_debug_stderr(master_ev);
1419 :
1420 0 : tp = tevent_thread_proxy_create(master_ev);
1421 0 : if (tp == NULL) {
1422 0 : torture_fail(test,
1423 : talloc_asprintf(test,
1424 : "tevent_thread_proxy_create failed\n"));
1425 : talloc_free(master_ev);
1426 : return false;
1427 : }
1428 :
1429 0 : for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1430 0 : int ret = pthread_create(&thread_map[i],
1431 : NULL,
1432 : thread_fn_nowait,
1433 : tp);
1434 0 : if (ret != 0) {
1435 0 : torture_fail(test,
1436 : talloc_asprintf(test,
1437 : "Failed to create thread %i, %d\n",
1438 : i, ret));
1439 : return false;
1440 : }
1441 : }
1442 :
1443 : /* Ensure we don't wait more than 10 seconds. */
1444 0 : tevent_add_timer(master_ev,
1445 : master_ev,
1446 : timeval_current_ofs(10,0),
1447 : timeout_fn,
1448 : NULL);
1449 :
1450 0 : while (thread_counter < NUM_TEVENT_THREADS) {
1451 0 : int ret = tevent_loop_once(master_ev);
1452 0 : torture_assert(test, ret == 0, "tevent_loop_once failed");
1453 : }
1454 :
1455 0 : torture_assert(test, thread_counter == NUM_TEVENT_THREADS,
1456 : "thread_counter fail\n");
1457 :
1458 0 : talloc_free(master_ev);
1459 0 : return true;
1460 : }
1461 :
1462 : struct reply_state {
1463 : struct tevent_thread_proxy *reply_tp;
1464 : pthread_t thread_id;
1465 : int *p_finished;
1466 : };
1467 :
1468 0 : static void thread_timeout_fn(struct tevent_context *ev,
1469 : struct tevent_timer *te,
1470 : struct timeval tv, void *p)
1471 : {
1472 0 : int *p_finished = (int *)p;
1473 :
1474 0 : *p_finished = 2;
1475 0 : }
1476 :
1477 : /* Called in child-thread context */
1478 0 : static void thread_callback(struct tevent_context *ev,
1479 : struct tevent_immediate *im,
1480 : void *private_ptr)
1481 : {
1482 0 : struct reply_state *rsp =
1483 0 : talloc_get_type_abort(private_ptr, struct reply_state);
1484 :
1485 0 : talloc_steal(ev, rsp);
1486 0 : *rsp->p_finished = 1;
1487 0 : }
1488 :
1489 : /* Called in master thread context */
1490 0 : static void master_callback(struct tevent_context *ev,
1491 : struct tevent_immediate *im,
1492 : void *private_ptr)
1493 : {
1494 0 : struct reply_state *rsp =
1495 0 : talloc_get_type_abort(private_ptr, struct reply_state);
1496 : unsigned i;
1497 :
1498 0 : talloc_steal(ev, rsp);
1499 :
1500 0 : for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1501 0 : if (pthread_equal(rsp->thread_id,
1502 : thread_map[i])) {
1503 0 : break;
1504 : }
1505 : }
1506 0 : torture_comment(thread_test_ctx,
1507 : "Callback %u from thread %u\n",
1508 : thread_counter,
1509 : i);
1510 : /* Now reply to the thread ! */
1511 0 : tevent_thread_proxy_schedule(rsp->reply_tp,
1512 : &im,
1513 : thread_callback,
1514 : &rsp);
1515 :
1516 0 : thread_counter++;
1517 0 : }
1518 :
1519 0 : static void *thread_fn_1(void *private_ptr)
1520 : {
1521 0 : struct tevent_thread_proxy *master_tp =
1522 0 : talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
1523 : struct tevent_thread_proxy *tp;
1524 : struct tevent_immediate *im;
1525 : struct tevent_context *ev;
1526 : struct reply_state *rsp;
1527 0 : int finished = 0;
1528 : int ret;
1529 :
1530 0 : ev = tevent_context_init(NULL);
1531 0 : if (ev == NULL) {
1532 0 : return NULL;
1533 : }
1534 :
1535 0 : tp = tevent_thread_proxy_create(ev);
1536 0 : if (tp == NULL) {
1537 0 : talloc_free(ev);
1538 0 : return NULL;
1539 : }
1540 :
1541 0 : im = tevent_create_immediate(ev);
1542 0 : if (im == NULL) {
1543 0 : talloc_free(ev);
1544 0 : return NULL;
1545 : }
1546 :
1547 0 : rsp = talloc(ev, struct reply_state);
1548 0 : if (rsp == NULL) {
1549 0 : talloc_free(ev);
1550 0 : return NULL;
1551 : }
1552 :
1553 0 : rsp->thread_id = pthread_self();
1554 0 : rsp->reply_tp = tp;
1555 0 : rsp->p_finished = &finished;
1556 :
1557 : /* Introduce a little randomness into the mix.. */
1558 0 : usleep(random() % 7000);
1559 :
1560 0 : tevent_thread_proxy_schedule(master_tp,
1561 : &im,
1562 : master_callback,
1563 : &rsp);
1564 :
1565 : /* Ensure we don't wait more than 10 seconds. */
1566 0 : tevent_add_timer(ev,
1567 : ev,
1568 : timeval_current_ofs(10,0),
1569 : thread_timeout_fn,
1570 : &finished);
1571 :
1572 0 : while (finished == 0) {
1573 0 : ret = tevent_loop_once(ev);
1574 0 : assert(ret == 0);
1575 : }
1576 :
1577 0 : if (finished > 1) {
1578 : /* Timeout ! */
1579 0 : abort();
1580 : }
1581 :
1582 : /*
1583 : * NB. We should talloc_free(ev) here, but if we do
1584 : * we currently get hit by helgrind Fix #323432
1585 : * "When calling pthread_cond_destroy or pthread_mutex_destroy
1586 : * with initializers as argument Helgrind (incorrectly) reports errors."
1587 : *
1588 : * http://valgrind.10908.n7.nabble.com/Helgrind-3-9-0-false-positive-
1589 : * with-pthread-mutex-destroy-td47757.html
1590 : *
1591 : * Helgrind doesn't understand that the request/reply
1592 : * messages provide synchronization between the lock/unlock
1593 : * in tevent_thread_proxy_schedule(), and the pthread_destroy()
1594 : * when the struct tevent_thread_proxy object is talloc_free'd.
1595 : *
1596 : * As a work-around for now return ev for the parent thread to free.
1597 : */
1598 0 : return ev;
1599 : }
1600 :
1601 0 : static bool test_multi_tevent_threaded_1(struct torture_context *test,
1602 : const void *test_data)
1603 : {
1604 : unsigned i;
1605 : struct tevent_context *master_ev;
1606 : struct tevent_thread_proxy *master_tp;
1607 : int ret;
1608 :
1609 0 : talloc_disable_null_tracking();
1610 :
1611 : /* Ugly global stuff. */
1612 0 : thread_test_ctx = test;
1613 0 : thread_counter = 0;
1614 :
1615 0 : master_ev = tevent_context_init(NULL);
1616 0 : if (master_ev == NULL) {
1617 0 : return false;
1618 : }
1619 0 : tevent_set_debug_stderr(master_ev);
1620 :
1621 0 : master_tp = tevent_thread_proxy_create(master_ev);
1622 0 : if (master_tp == NULL) {
1623 0 : torture_fail(test,
1624 : talloc_asprintf(test,
1625 : "tevent_thread_proxy_create failed\n"));
1626 : talloc_free(master_ev);
1627 : return false;
1628 : }
1629 :
1630 0 : for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1631 0 : ret = pthread_create(&thread_map[i],
1632 : NULL,
1633 : thread_fn_1,
1634 : master_tp);
1635 0 : if (ret != 0) {
1636 0 : torture_fail(test,
1637 : talloc_asprintf(test,
1638 : "Failed to create thread %i, %d\n",
1639 : i, ret));
1640 : return false;
1641 : }
1642 : }
1643 :
1644 0 : while (thread_counter < NUM_TEVENT_THREADS) {
1645 0 : ret = tevent_loop_once(master_ev);
1646 0 : torture_assert(test, ret == 0, "tevent_loop_once failed");
1647 : }
1648 :
1649 : /* Wait for all the threads to finish - join 'em. */
1650 0 : for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1651 : void *retval;
1652 0 : ret = pthread_join(thread_map[i], &retval);
1653 0 : torture_assert(test, ret == 0, "pthread_join failed");
1654 : /* Free the child thread event context. */
1655 0 : talloc_free(retval);
1656 : }
1657 :
1658 0 : talloc_free(master_ev);
1659 0 : return true;
1660 : }
1661 :
1662 : struct threaded_test_2 {
1663 : struct tevent_threaded_context *tctx;
1664 : struct tevent_immediate *im;
1665 : pthread_t thread_id;
1666 : };
1667 :
1668 : static void master_callback_2(struct tevent_context *ev,
1669 : struct tevent_immediate *im,
1670 : void *private_data);
1671 :
1672 0 : static void *thread_fn_2(void *private_data)
1673 : {
1674 0 : struct threaded_test_2 *state = private_data;
1675 :
1676 0 : state->thread_id = pthread_self();
1677 :
1678 0 : usleep(random() % 7000);
1679 :
1680 0 : tevent_threaded_schedule_immediate(
1681 : state->tctx, state->im, master_callback_2, state);
1682 :
1683 0 : return NULL;
1684 : }
1685 :
1686 0 : static void master_callback_2(struct tevent_context *ev,
1687 : struct tevent_immediate *im,
1688 : void *private_data)
1689 : {
1690 0 : struct threaded_test_2 *state = private_data;
1691 : int i;
1692 :
1693 0 : for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1694 0 : if (pthread_equal(state->thread_id, thread_map[i])) {
1695 0 : break;
1696 : }
1697 : }
1698 0 : torture_comment(thread_test_ctx,
1699 : "Callback_2 %u from thread %u\n",
1700 : thread_counter,
1701 : i);
1702 0 : thread_counter++;
1703 0 : }
1704 :
1705 0 : static bool test_multi_tevent_threaded_2(struct torture_context *test,
1706 : const void *test_data)
1707 : {
1708 : unsigned i;
1709 :
1710 : struct tevent_context *ev;
1711 : struct tevent_threaded_context *tctx;
1712 : int ret;
1713 :
1714 0 : thread_test_ctx = test;
1715 0 : thread_counter = 0;
1716 :
1717 0 : ev = tevent_context_init(test);
1718 0 : torture_assert(test, ev != NULL, "tevent_context_init failed");
1719 :
1720 : /*
1721 : * tevent_re_initialise used to have a bug where it did not
1722 : * re-initialise the thread support after taking it
1723 : * down. Exercise that code path.
1724 : */
1725 0 : ret = tevent_re_initialise(ev);
1726 0 : torture_assert(test, ret == 0, "tevent_re_initialise failed");
1727 :
1728 0 : tctx = tevent_threaded_context_create(ev, ev);
1729 0 : torture_assert(test, tctx != NULL,
1730 : "tevent_threaded_context_create failed");
1731 :
1732 0 : for (i=0; i<NUM_TEVENT_THREADS; i++) {
1733 : struct threaded_test_2 *state;
1734 :
1735 0 : state = talloc(ev, struct threaded_test_2);
1736 0 : torture_assert(test, state != NULL, "talloc failed");
1737 :
1738 0 : state->tctx = tctx;
1739 0 : state->im = tevent_create_immediate(state);
1740 0 : torture_assert(test, state->im != NULL,
1741 : "tevent_create_immediate failed");
1742 :
1743 0 : ret = pthread_create(&thread_map[i], NULL, thread_fn_2, state);
1744 0 : torture_assert(test, ret == 0, "pthread_create failed");
1745 : }
1746 :
1747 0 : while (thread_counter < NUM_TEVENT_THREADS) {
1748 0 : ret = tevent_loop_once(ev);
1749 0 : torture_assert(test, ret == 0, "tevent_loop_once failed");
1750 : }
1751 :
1752 : /* Wait for all the threads to finish - join 'em. */
1753 0 : for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1754 : void *retval;
1755 0 : ret = pthread_join(thread_map[i], &retval);
1756 0 : torture_assert(test, ret == 0, "pthread_join failed");
1757 : /* Free the child thread event context. */
1758 : }
1759 :
1760 0 : talloc_free(tctx);
1761 0 : talloc_free(ev);
1762 0 : return true;
1763 : }
1764 :
1765 : struct test_cached_pid_thread_state {
1766 : pid_t thread_cached_pid;
1767 : pid_t thread_pid;
1768 : };
1769 :
1770 0 : static void *test_cached_pid_thread(void *private_data)
1771 : {
1772 0 : struct test_cached_pid_thread_state *state =
1773 : (struct test_cached_pid_thread_state *)private_data;
1774 :
1775 0 : state->thread_cached_pid = tevent_cached_getpid();
1776 0 : state->thread_pid = getpid();
1777 :
1778 0 : return NULL;
1779 : }
1780 : #endif
1781 :
1782 0 : static bool test_cached_pid(struct torture_context *test,
1783 : const void *test_data)
1784 : {
1785 0 : pid_t parent_pid = getpid();
1786 : pid_t child_pid;
1787 : pid_t finished_pid;
1788 : int child_status;
1789 :
1790 0 : torture_assert(test, tevent_cached_getpid() == parent_pid, "tevent_cached_getpid()");
1791 :
1792 : #ifdef HAVE_PTHREAD
1793 : {
1794 0 : struct test_cached_pid_thread_state state = { .thread_cached_pid = -1, };
1795 : pthread_t thread;
1796 0 : void *retval = NULL;
1797 : int ret;
1798 :
1799 0 : ret = pthread_create(&thread, NULL, test_cached_pid_thread, &state);
1800 0 : torture_assert(test, ret == 0, "pthread_create failed");
1801 :
1802 0 : ret = pthread_join(thread, &retval);
1803 0 : torture_assert(test, ret == 0, "pthread_join failed");
1804 :
1805 0 : torture_assert(test, state.thread_pid == parent_pid, "getpid() in thread");
1806 0 : torture_assert(test, state.thread_cached_pid == parent_pid, "tevent_cached_getpid() in thread");
1807 : }
1808 : #endif /* HAVE_PTHREAD */
1809 :
1810 0 : child_pid = fork();
1811 0 : if (child_pid == 0) {
1812 : /* child */
1813 0 : pid_t pid = getpid();
1814 0 : pid_t cached_pid = tevent_cached_getpid();
1815 :
1816 0 : if (parent_pid == pid) {
1817 0 : exit(1);
1818 : }
1819 0 : if (pid != cached_pid) {
1820 0 : exit(2);
1821 : }
1822 0 : exit(0);
1823 : }
1824 0 : torture_assert(test, child_pid > 0, "fork failed");
1825 :
1826 0 : finished_pid = waitpid(child_pid, &child_status, 0);
1827 0 : torture_assert(test, finished_pid == child_pid, "wrong child");
1828 0 : torture_assert(test, child_status == 0, "child_status");
1829 :
1830 0 : return true;
1831 : }
1832 :
1833 964 : struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
1834 : {
1835 964 : struct torture_suite *suite = torture_suite_create(mem_ctx, "event");
1836 964 : const char **list = tevent_backend_list(suite);
1837 : int i;
1838 :
1839 4820 : for (i=0;list && list[i];i++) {
1840 : struct torture_suite *backend_suite;
1841 :
1842 3856 : backend_suite = torture_suite_create(mem_ctx, list[i]);
1843 :
1844 3856 : torture_suite_add_simple_tcase_const(backend_suite,
1845 : "context",
1846 : test_event_context,
1847 3856 : (const void *)list[i]);
1848 3856 : torture_suite_add_simple_tcase_const(backend_suite,
1849 : "fd1",
1850 : test_event_fd1,
1851 3856 : (const void *)list[i]);
1852 3856 : torture_suite_add_simple_tcase_const(backend_suite,
1853 : "fd2",
1854 : test_event_fd2,
1855 3856 : (const void *)list[i]);
1856 3856 : torture_suite_add_simple_tcase_const(backend_suite,
1857 : "wrapper",
1858 : test_wrapper,
1859 3856 : (const void *)list[i]);
1860 3856 : torture_suite_add_simple_tcase_const(backend_suite,
1861 : "free_wrapper",
1862 : test_free_wrapper,
1863 3856 : (const void *)list[i]);
1864 :
1865 3856 : torture_suite_add_suite(suite, backend_suite);
1866 : }
1867 :
1868 : #ifdef HAVE_PTHREAD
1869 964 : torture_suite_add_simple_tcase_const(suite, "threaded_poll_mt",
1870 : test_event_context_threaded,
1871 : NULL);
1872 :
1873 964 : torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded",
1874 : test_multi_tevent_threaded,
1875 : NULL);
1876 :
1877 964 : torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_1",
1878 : test_multi_tevent_threaded_1,
1879 : NULL);
1880 :
1881 964 : torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_2",
1882 : test_multi_tevent_threaded_2,
1883 : NULL);
1884 :
1885 : #endif
1886 :
1887 964 : torture_suite_add_simple_tcase_const(suite, "tevent_cached_getpid",
1888 : test_cached_pid,
1889 : NULL);
1890 :
1891 964 : return suite;
1892 : }
|