Line data Source code
1 : /*
2 : Unix SMB/CIFS implementation.
3 : Infrastructure for async requests
4 : Copyright (C) Volker Lendecke 2008
5 : Copyright (C) Stefan Metzmacher 2009
6 :
7 : ** NOTE! The following LGPL license applies to the tevent
8 : ** library. This does NOT imply that all of Samba is released
9 : ** under the LGPL
10 :
11 : This library is free software; you can redistribute it and/or
12 : modify it under the terms of the GNU Lesser General Public
13 : License as published by the Free Software Foundation; either
14 : version 3 of the License, or (at your option) any later version.
15 :
16 : This library is distributed in the hope that it will be useful,
17 : but WITHOUT ANY WARRANTY; without even the implied warranty of
18 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 : Lesser General Public License for more details.
20 :
21 : You should have received a copy of the GNU Lesser General Public
22 : License along with this library; if not, see <http://www.gnu.org/licenses/>.
23 : */
24 :
25 : #include "replace.h"
26 : #include "tevent.h"
27 : #include "tevent_internal.h"
28 : #include "tevent_util.h"
29 :
30 : struct tevent_queue_entry {
31 : struct tevent_queue_entry *prev, *next;
32 : struct tevent_queue *queue;
33 :
34 : bool triggered;
35 :
36 : struct tevent_req *req;
37 : struct tevent_context *ev;
38 :
39 : tevent_queue_trigger_fn_t trigger;
40 : void *private_data;
41 : uint64_t tag;
42 : };
43 :
44 : struct tevent_queue {
45 : const char *name;
46 : const char *location;
47 :
48 : bool running;
49 : struct tevent_immediate *immediate;
50 :
51 : size_t length;
52 : struct tevent_queue_entry *list;
53 : };
54 :
55 : static void tevent_queue_immediate_trigger(struct tevent_context *ev,
56 : struct tevent_immediate *im,
57 : void *private_data);
58 :
59 6713072 : static int tevent_queue_entry_destructor(struct tevent_queue_entry *e)
60 : {
61 6713072 : struct tevent_queue *q = e->queue;
62 :
63 6713072 : if (!q) {
64 0 : return 0;
65 : }
66 :
67 6713072 : tevent_trace_queue_callback(q->list->ev, e, TEVENT_EVENT_TRACE_DETACH);
68 6713072 : DLIST_REMOVE(q->list, e);
69 6713072 : q->length--;
70 :
71 6713072 : if (!q->running) {
72 2552 : return 0;
73 : }
74 :
75 6710520 : if (!q->list) {
76 5561843 : return 0;
77 : }
78 :
79 1148677 : if (q->list->triggered) {
80 203 : return 0;
81 : }
82 :
83 1148474 : tevent_schedule_immediate(q->immediate,
84 : q->list->ev,
85 : tevent_queue_immediate_trigger,
86 : q);
87 :
88 1148474 : return 0;
89 : }
90 :
91 474333 : static int tevent_queue_destructor(struct tevent_queue *q)
92 : {
93 474333 : q->running = false;
94 :
95 899568 : while (q->list) {
96 2 : struct tevent_queue_entry *e = q->list;
97 2 : talloc_free(e);
98 : }
99 :
100 474333 : return 0;
101 : }
102 :
103 475149 : struct tevent_queue *_tevent_queue_create(TALLOC_CTX *mem_ctx,
104 : const char *name,
105 : const char *location)
106 : {
107 : struct tevent_queue *queue;
108 :
109 475149 : queue = talloc_zero(mem_ctx, struct tevent_queue);
110 475149 : if (!queue) {
111 0 : return NULL;
112 : }
113 :
114 475149 : queue->name = talloc_strdup(queue, name);
115 475149 : if (!queue->name) {
116 0 : talloc_free(queue);
117 0 : return NULL;
118 : }
119 475149 : queue->immediate = tevent_create_immediate(queue);
120 475149 : if (!queue->immediate) {
121 0 : talloc_free(queue);
122 0 : return NULL;
123 : }
124 :
125 475149 : queue->location = location;
126 :
127 : /* queue is running by default */
128 475149 : queue->running = true;
129 :
130 475149 : talloc_set_destructor(queue, tevent_queue_destructor);
131 475149 : return queue;
132 : }
133 :
134 1762667 : static void tevent_queue_immediate_trigger(struct tevent_context *ev,
135 : struct tevent_immediate *im,
136 : void *private_data)
137 : {
138 1549161 : struct tevent_queue *q =
139 213506 : talloc_get_type_abort(private_data,
140 : struct tevent_queue);
141 :
142 1762667 : if (!q->running) {
143 0 : return;
144 : }
145 :
146 1762667 : if (!q->list) {
147 0 : return;
148 : }
149 :
150 1762667 : tevent_trace_queue_callback(ev, q->list,
151 : TEVENT_EVENT_TRACE_BEFORE_HANDLER);
152 1762667 : q->list->triggered = true;
153 1762667 : q->list->trigger(q->list->req, q->list->private_data);
154 : }
155 :
156 1 : static void tevent_queue_noop_trigger(struct tevent_req *req,
157 : void *_private_data)
158 : {
159 : /* this is doing nothing but blocking the queue */
160 1 : }
161 :
162 6713072 : static struct tevent_queue_entry *tevent_queue_add_internal(
163 : struct tevent_queue *queue,
164 : struct tevent_context *ev,
165 : struct tevent_req *req,
166 : tevent_queue_trigger_fn_t trigger,
167 : void *private_data,
168 : bool allow_direct)
169 : {
170 : struct tevent_queue_entry *e;
171 :
172 6713072 : e = talloc_zero(req, struct tevent_queue_entry);
173 6713072 : if (e == NULL) {
174 0 : return NULL;
175 : }
176 :
177 : /*
178 : * if there is no trigger, it is just a blocker
179 : */
180 6713072 : if (trigger == NULL) {
181 1 : trigger = tevent_queue_noop_trigger;
182 : }
183 :
184 6713072 : e->queue = queue;
185 6713072 : e->req = req;
186 6713072 : e->ev = ev;
187 6713072 : e->trigger = trigger;
188 6713072 : e->private_data = private_data;
189 :
190 6713072 : if (queue->length > 0) {
191 : /*
192 : * if there are already entries in the
193 : * queue do not optimize.
194 : */
195 1148684 : allow_direct = false;
196 : }
197 :
198 6713072 : if (req->async.fn != NULL) {
199 : /*
200 : * If the caller wants to optimize for the
201 : * empty queue case, call the trigger only
202 : * if there is no callback defined for the
203 : * request yet.
204 : */
205 0 : allow_direct = false;
206 : }
207 :
208 6713072 : DLIST_ADD_END(queue->list, e);
209 6713072 : queue->length++;
210 6713072 : talloc_set_destructor(e, tevent_queue_entry_destructor);
211 6713072 : tevent_trace_queue_callback(ev, e, TEVENT_EVENT_TRACE_ATTACH);
212 :
213 6713072 : if (!queue->running) {
214 12 : return e;
215 : }
216 :
217 6713060 : if (queue->list->triggered) {
218 1029551 : return e;
219 : }
220 :
221 : /*
222 : * If allowed we directly call the trigger
223 : * avoiding possible delays caused by
224 : * an immediate event.
225 : */
226 5683509 : if (allow_direct) {
227 4950189 : tevent_trace_queue_callback(ev,
228 : queue->list,
229 : TEVENT_EVENT_TRACE_BEFORE_HANDLER);
230 4950189 : queue->list->triggered = true;
231 9379566 : queue->list->trigger(queue->list->req,
232 4950189 : queue->list->private_data);
233 4950189 : return e;
234 : }
235 :
236 733320 : tevent_schedule_immediate(queue->immediate,
237 : queue->list->ev,
238 : tevent_queue_immediate_trigger,
239 : queue);
240 :
241 733320 : return e;
242 : }
243 :
244 613930 : bool tevent_queue_add(struct tevent_queue *queue,
245 : struct tevent_context *ev,
246 : struct tevent_req *req,
247 : tevent_queue_trigger_fn_t trigger,
248 : void *private_data)
249 : {
250 : struct tevent_queue_entry *e;
251 :
252 613930 : e = tevent_queue_add_internal(queue, ev, req,
253 : trigger, private_data, false);
254 613930 : if (e == NULL) {
255 0 : return false;
256 : }
257 :
258 613930 : return true;
259 : }
260 :
261 4013 : struct tevent_queue_entry *tevent_queue_add_entry(
262 : struct tevent_queue *queue,
263 : struct tevent_context *ev,
264 : struct tevent_req *req,
265 : tevent_queue_trigger_fn_t trigger,
266 : void *private_data)
267 : {
268 4013 : return tevent_queue_add_internal(queue, ev, req,
269 : trigger, private_data, false);
270 : }
271 :
272 6095129 : struct tevent_queue_entry *tevent_queue_add_optimize_empty(
273 : struct tevent_queue *queue,
274 : struct tevent_context *ev,
275 : struct tevent_req *req,
276 : tevent_queue_trigger_fn_t trigger,
277 : void *private_data)
278 : {
279 6095129 : return tevent_queue_add_internal(queue, ev, req,
280 : trigger, private_data, true);
281 : }
282 :
283 3 : void tevent_queue_entry_untrigger(struct tevent_queue_entry *entry)
284 : {
285 3 : if (entry->queue->running) {
286 0 : abort();
287 : }
288 :
289 3 : if (entry->queue->list != entry) {
290 0 : abort();
291 : }
292 :
293 3 : entry->triggered = false;
294 3 : }
295 :
296 9329 : void tevent_queue_start(struct tevent_queue *queue)
297 : {
298 9329 : if (queue->running) {
299 : /* already started */
300 9326 : return;
301 : }
302 :
303 3 : queue->running = true;
304 :
305 3 : if (!queue->list) {
306 0 : return;
307 : }
308 :
309 3 : if (queue->list->triggered) {
310 0 : return;
311 : }
312 :
313 3 : tevent_schedule_immediate(queue->immediate,
314 : queue->list->ev,
315 : tevent_queue_immediate_trigger,
316 : queue);
317 : }
318 :
319 57475 : void tevent_queue_stop(struct tevent_queue *queue)
320 : {
321 57475 : queue->running = false;
322 57475 : }
323 :
324 3185167 : size_t tevent_queue_length(struct tevent_queue *queue)
325 : {
326 3185167 : return queue->length;
327 : }
328 :
329 0 : bool tevent_queue_running(struct tevent_queue *queue)
330 : {
331 0 : return queue->running;
332 : }
333 :
334 : struct tevent_queue_wait_state {
335 : uint8_t dummy;
336 : };
337 :
338 : static void tevent_queue_wait_trigger(struct tevent_req *req,
339 : void *private_data);
340 :
341 135134 : struct tevent_req *tevent_queue_wait_send(TALLOC_CTX *mem_ctx,
342 : struct tevent_context *ev,
343 : struct tevent_queue *queue)
344 : {
345 : struct tevent_req *req;
346 : struct tevent_queue_wait_state *state;
347 : bool ok;
348 :
349 135134 : req = tevent_req_create(mem_ctx, &state,
350 : struct tevent_queue_wait_state);
351 135134 : if (req == NULL) {
352 0 : return NULL;
353 : }
354 :
355 135134 : ok = tevent_queue_add(queue, ev, req,
356 : tevent_queue_wait_trigger,
357 : NULL);
358 135134 : if (!ok) {
359 0 : tevent_req_oom(req);
360 0 : return tevent_req_post(req, ev);
361 : }
362 :
363 135134 : return req;
364 : }
365 :
366 135126 : static void tevent_queue_wait_trigger(struct tevent_req *req,
367 : void *private_data)
368 : {
369 135126 : tevent_req_done(req);
370 135126 : }
371 :
372 135084 : bool tevent_queue_wait_recv(struct tevent_req *req)
373 : {
374 : enum tevent_req_state state;
375 : uint64_t err;
376 :
377 135084 : if (tevent_req_is_error(req, &state, &err)) {
378 0 : tevent_req_received(req);
379 0 : return false;
380 : }
381 :
382 135084 : tevent_req_received(req);
383 135084 : return true;
384 : }
385 :
386 10 : void tevent_queue_entry_set_tag(struct tevent_queue_entry *qe, uint64_t tag)
387 : {
388 10 : if (qe == NULL) {
389 0 : return;
390 : }
391 :
392 10 : qe->tag = tag;
393 : }
394 :
395 11 : uint64_t tevent_queue_entry_get_tag(const struct tevent_queue_entry *qe)
396 : {
397 11 : if (qe == NULL) {
398 1 : return 0;
399 : }
400 :
401 10 : return qe->tag;
402 : }
|