Line data Source code
1 : /*
2 : Unix SMB/CIFS implementation.
3 :
4 : WINS Replication server
5 :
6 : Copyright (C) Stefan Metzmacher 2005
7 :
8 : This program is free software; you can redistribute it and/or modify
9 : it under the terms of the GNU General Public License as published by
10 : the Free Software Foundation; either version 3 of the License, or
11 : (at your option) any later version.
12 :
13 : This program is distributed in the hope that it will be useful,
14 : but WITHOUT ANY WARRANTY; without even the implied warranty of
15 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 : GNU General Public License for more details.
17 :
18 : You should have received a copy of the GNU General Public License
19 : along with this program. If not, see <http://www.gnu.org/licenses/>.
20 : */
21 :
22 : #include "includes.h"
23 : #include "lib/events/events.h"
24 : #include "lib/socket/socket.h"
25 : #include "samba/service_task.h"
26 : #include "samba/service_stream.h"
27 : #include "librpc/gen_ndr/winsrepl.h"
28 : #include "wrepl_server/wrepl_server.h"
29 : #include "nbt_server/wins/winsdb.h"
30 : #include "libcli/composite/composite.h"
31 : #include "libcli/wrepl/winsrepl.h"
32 : #include "libcli/resolve/resolve.h"
33 : #include "param/param.h"
34 :
35 : enum wreplsrv_out_connect_stage {
36 : WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET,
37 : WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX,
38 : WREPLSRV_OUT_CONNECT_STAGE_DONE
39 : };
40 :
41 : struct wreplsrv_out_connect_state {
42 : enum wreplsrv_out_connect_stage stage;
43 : struct composite_context *c;
44 : struct wrepl_associate assoc_io;
45 : enum winsrepl_partner_type type;
46 : struct wreplsrv_out_connection *wreplconn;
47 : struct tevent_req *subreq;
48 : };
49 :
50 : static void wreplsrv_out_connect_handler_treq(struct tevent_req *subreq);
51 :
52 0 : static NTSTATUS wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state *state)
53 : {
54 : NTSTATUS status;
55 :
56 0 : status = wrepl_connect_recv(state->subreq);
57 0 : TALLOC_FREE(state->subreq);
58 0 : NT_STATUS_NOT_OK_RETURN(status);
59 :
60 0 : state->subreq = wrepl_associate_send(state,
61 0 : state->wreplconn->service->task->event_ctx,
62 0 : state->wreplconn->sock, &state->assoc_io);
63 0 : NT_STATUS_HAVE_NO_MEMORY(state->subreq);
64 :
65 0 : tevent_req_set_callback(state->subreq,
66 : wreplsrv_out_connect_handler_treq,
67 : state);
68 :
69 0 : state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX;
70 :
71 0 : return NT_STATUS_OK;
72 : }
73 :
74 0 : static NTSTATUS wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_state *state)
75 : {
76 : NTSTATUS status;
77 :
78 0 : status = wrepl_associate_recv(state->subreq, &state->assoc_io);
79 0 : TALLOC_FREE(state->subreq);
80 0 : NT_STATUS_NOT_OK_RETURN(status);
81 :
82 0 : state->wreplconn->assoc_ctx.peer_ctx = state->assoc_io.out.assoc_ctx;
83 0 : state->wreplconn->assoc_ctx.peer_major = state->assoc_io.out.major_version;
84 :
85 0 : if (state->type == WINSREPL_PARTNER_PUSH) {
86 0 : if (state->wreplconn->assoc_ctx.peer_major >= 5) {
87 0 : state->wreplconn->partner->push.wreplconn = state->wreplconn;
88 0 : talloc_steal(state->wreplconn->partner, state->wreplconn);
89 : } else {
90 0 : state->type = WINSREPL_PARTNER_NONE;
91 : }
92 0 : } else if (state->type == WINSREPL_PARTNER_PULL) {
93 0 : state->wreplconn->partner->pull.wreplconn = state->wreplconn;
94 0 : talloc_steal(state->wreplconn->partner, state->wreplconn);
95 : }
96 :
97 0 : state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
98 :
99 0 : return NT_STATUS_OK;
100 : }
101 :
102 0 : static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state *state)
103 : {
104 0 : struct composite_context *c = state->c;
105 :
106 0 : switch (state->stage) {
107 0 : case WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET:
108 0 : c->status = wreplsrv_out_connect_wait_socket(state);
109 0 : break;
110 0 : case WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX:
111 0 : c->status = wreplsrv_out_connect_wait_assoc_ctx(state);
112 0 : c->state = COMPOSITE_STATE_DONE;
113 0 : break;
114 0 : case WREPLSRV_OUT_CONNECT_STAGE_DONE:
115 0 : c->status = NT_STATUS_INTERNAL_ERROR;
116 : }
117 :
118 0 : if (!NT_STATUS_IS_OK(c->status)) {
119 0 : c->state = COMPOSITE_STATE_ERROR;
120 : }
121 :
122 0 : if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
123 0 : c->async.fn(c);
124 : }
125 0 : }
126 :
127 0 : static void wreplsrv_out_connect_handler_treq(struct tevent_req *subreq)
128 : {
129 0 : struct wreplsrv_out_connect_state *state = tevent_req_callback_data(subreq,
130 : struct wreplsrv_out_connect_state);
131 0 : wreplsrv_out_connect_handler(state);
132 0 : return;
133 : }
134 :
135 675 : static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partner *partner,
136 : enum winsrepl_partner_type type,
137 : struct wreplsrv_out_connection *wreplconn)
138 : {
139 675 : struct composite_context *c = NULL;
140 675 : struct wreplsrv_service *service = partner->service;
141 675 : struct wreplsrv_out_connect_state *state = NULL;
142 675 : struct wreplsrv_out_connection **wreplconnp = &wreplconn;
143 675 : bool cached_connection = false;
144 :
145 675 : c = talloc_zero(partner, struct composite_context);
146 675 : if (!c) goto failed;
147 :
148 675 : state = talloc_zero(c, struct wreplsrv_out_connect_state);
149 675 : if (!state) goto failed;
150 675 : state->c = c;
151 675 : state->type = type;
152 :
153 675 : c->state = COMPOSITE_STATE_IN_PROGRESS;
154 675 : c->event_ctx = service->task->event_ctx;
155 675 : c->private_data = state;
156 :
157 675 : if (type == WINSREPL_PARTNER_PUSH) {
158 0 : cached_connection = true;
159 0 : wreplconn = partner->push.wreplconn;
160 0 : wreplconnp = &partner->push.wreplconn;
161 675 : } else if (type == WINSREPL_PARTNER_PULL) {
162 0 : cached_connection = true;
163 0 : wreplconn = partner->pull.wreplconn;
164 0 : wreplconnp = &partner->pull.wreplconn;
165 : }
166 :
167 : /* we have a connection already, so use it */
168 675 : if (wreplconn) {
169 675 : if (wrepl_socket_is_connected(wreplconn->sock)) {
170 675 : state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
171 675 : state->wreplconn= wreplconn;
172 675 : composite_done(c);
173 675 : return c;
174 0 : } else if (!cached_connection) {
175 0 : state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
176 0 : state->wreplconn= NULL;
177 0 : composite_done(c);
178 0 : return c;
179 : } else {
180 0 : talloc_free(wreplconn);
181 0 : *wreplconnp = NULL;
182 : }
183 : }
184 :
185 0 : wreplconn = talloc_zero(state, struct wreplsrv_out_connection);
186 0 : if (!wreplconn) goto failed;
187 :
188 0 : wreplconn->service = service;
189 0 : wreplconn->partner = partner;
190 0 : wreplconn->sock = wrepl_socket_init(wreplconn, service->task->event_ctx);
191 0 : if (!wreplconn->sock) goto failed;
192 :
193 0 : state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET;
194 0 : state->wreplconn= wreplconn;
195 0 : state->subreq = wrepl_connect_send(state,
196 0 : service->task->event_ctx,
197 0 : wreplconn->sock,
198 0 : partner->our_address?partner->our_address:wrepl_best_ip(service->task->lp_ctx, partner->address),
199 : partner->address);
200 0 : if (!state->subreq) goto failed;
201 :
202 0 : tevent_req_set_callback(state->subreq,
203 : wreplsrv_out_connect_handler_treq,
204 : state);
205 :
206 0 : return c;
207 0 : failed:
208 0 : talloc_free(c);
209 0 : return NULL;
210 : }
211 :
212 675 : static NTSTATUS wreplsrv_out_connect_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
213 : struct wreplsrv_out_connection **wreplconn)
214 : {
215 : NTSTATUS status;
216 :
217 675 : status = composite_wait(c);
218 :
219 675 : if (NT_STATUS_IS_OK(status)) {
220 675 : struct wreplsrv_out_connect_state *state = talloc_get_type(c->private_data,
221 : struct wreplsrv_out_connect_state);
222 675 : if (state->wreplconn) {
223 675 : *wreplconn = talloc_reference(mem_ctx, state->wreplconn);
224 675 : if (!*wreplconn) status = NT_STATUS_NO_MEMORY;
225 : } else {
226 0 : status = NT_STATUS_CONNECTION_DISCONNECTED;
227 : }
228 : }
229 :
230 675 : talloc_free(c);
231 675 : return status;
232 :
233 : }
234 :
235 : struct wreplsrv_pull_table_io {
236 : struct {
237 : struct wreplsrv_partner *partner;
238 : uint32_t num_owners;
239 : struct wrepl_wins_owner *owners;
240 : } in;
241 : struct {
242 : uint32_t num_owners;
243 : struct wrepl_wins_owner *owners;
244 : } out;
245 : };
246 :
247 : enum wreplsrv_pull_table_stage {
248 : WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION,
249 : WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY,
250 : WREPLSRV_PULL_TABLE_STAGE_DONE
251 : };
252 :
253 : struct wreplsrv_pull_table_state {
254 : enum wreplsrv_pull_table_stage stage;
255 : struct composite_context *c;
256 : struct wrepl_pull_table table_io;
257 : struct wreplsrv_pull_table_io *io;
258 : struct composite_context *creq;
259 : struct wreplsrv_out_connection *wreplconn;
260 : struct tevent_req *subreq;
261 : };
262 :
263 : static void wreplsrv_pull_table_handler_treq(struct tevent_req *subreq);
264 :
265 0 : static NTSTATUS wreplsrv_pull_table_wait_connection(struct wreplsrv_pull_table_state *state)
266 : {
267 : NTSTATUS status;
268 :
269 0 : status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
270 0 : NT_STATUS_NOT_OK_RETURN(status);
271 :
272 0 : state->table_io.in.assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
273 0 : state->subreq = wrepl_pull_table_send(state,
274 0 : state->wreplconn->service->task->event_ctx,
275 0 : state->wreplconn->sock, &state->table_io);
276 0 : NT_STATUS_HAVE_NO_MEMORY(state->subreq);
277 :
278 0 : tevent_req_set_callback(state->subreq,
279 : wreplsrv_pull_table_handler_treq,
280 : state);
281 :
282 0 : state->stage = WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY;
283 :
284 0 : return NT_STATUS_OK;
285 : }
286 :
287 0 : static NTSTATUS wreplsrv_pull_table_wait_table_reply(struct wreplsrv_pull_table_state *state)
288 : {
289 : NTSTATUS status;
290 :
291 0 : status = wrepl_pull_table_recv(state->subreq, state, &state->table_io);
292 0 : TALLOC_FREE(state->subreq);
293 0 : NT_STATUS_NOT_OK_RETURN(status);
294 :
295 0 : state->stage = WREPLSRV_PULL_TABLE_STAGE_DONE;
296 :
297 0 : return NT_STATUS_OK;
298 : }
299 :
300 0 : static void wreplsrv_pull_table_handler(struct wreplsrv_pull_table_state *state)
301 : {
302 0 : struct composite_context *c = state->c;
303 :
304 0 : switch (state->stage) {
305 0 : case WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION:
306 0 : c->status = wreplsrv_pull_table_wait_connection(state);
307 0 : break;
308 0 : case WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY:
309 0 : c->status = wreplsrv_pull_table_wait_table_reply(state);
310 0 : c->state = COMPOSITE_STATE_DONE;
311 0 : break;
312 0 : case WREPLSRV_PULL_TABLE_STAGE_DONE:
313 0 : c->status = NT_STATUS_INTERNAL_ERROR;
314 : }
315 :
316 0 : if (!NT_STATUS_IS_OK(c->status)) {
317 0 : c->state = COMPOSITE_STATE_ERROR;
318 : }
319 :
320 0 : if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
321 0 : c->async.fn(c);
322 : }
323 0 : }
324 :
325 0 : static void wreplsrv_pull_table_handler_creq(struct composite_context *creq)
326 : {
327 0 : struct wreplsrv_pull_table_state *state = talloc_get_type(creq->async.private_data,
328 : struct wreplsrv_pull_table_state);
329 0 : wreplsrv_pull_table_handler(state);
330 0 : return;
331 : }
332 :
333 0 : static void wreplsrv_pull_table_handler_treq(struct tevent_req *subreq)
334 : {
335 0 : struct wreplsrv_pull_table_state *state = tevent_req_callback_data(subreq,
336 : struct wreplsrv_pull_table_state);
337 0 : wreplsrv_pull_table_handler(state);
338 0 : return;
339 : }
340 :
341 675 : static struct composite_context *wreplsrv_pull_table_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_table_io *io)
342 : {
343 675 : struct composite_context *c = NULL;
344 675 : struct wreplsrv_service *service = io->in.partner->service;
345 675 : struct wreplsrv_pull_table_state *state = NULL;
346 :
347 675 : c = talloc_zero(mem_ctx, struct composite_context);
348 675 : if (!c) goto failed;
349 :
350 675 : state = talloc_zero(c, struct wreplsrv_pull_table_state);
351 675 : if (!state) goto failed;
352 675 : state->c = c;
353 675 : state->io = io;
354 :
355 675 : c->state = COMPOSITE_STATE_IN_PROGRESS;
356 675 : c->event_ctx = service->task->event_ctx;
357 675 : c->private_data = state;
358 :
359 675 : if (io->in.num_owners) {
360 : struct wrepl_wins_owner *partners;
361 : uint32_t i;
362 :
363 675 : partners = talloc_array(state,
364 : struct wrepl_wins_owner,
365 : io->in.num_owners);
366 675 : if (composite_nomem(partners, c)) goto failed;
367 :
368 1350 : for (i=0; i < io->in.num_owners; i++) {
369 675 : partners[i] = io->in.owners[i];
370 675 : partners[i].address = talloc_strdup(partners,
371 675 : io->in.owners[i].address);
372 675 : if (composite_nomem(partners[i].address, c)) goto failed;
373 : }
374 :
375 675 : state->table_io.out.num_partners = io->in.num_owners;
376 675 : state->table_io.out.partners = partners;
377 675 : state->stage = WREPLSRV_PULL_TABLE_STAGE_DONE;
378 675 : composite_done(c);
379 675 : return c;
380 : }
381 :
382 0 : state->stage = WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION;
383 0 : state->creq = wreplsrv_out_connect_send(io->in.partner, WINSREPL_PARTNER_PULL, NULL);
384 0 : if (!state->creq) goto failed;
385 :
386 0 : state->creq->async.fn = wreplsrv_pull_table_handler_creq;
387 0 : state->creq->async.private_data = state;
388 :
389 0 : return c;
390 0 : failed:
391 0 : talloc_free(c);
392 0 : return NULL;
393 : }
394 :
395 675 : static NTSTATUS wreplsrv_pull_table_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
396 : struct wreplsrv_pull_table_io *io)
397 : {
398 : NTSTATUS status;
399 :
400 675 : status = composite_wait(c);
401 :
402 675 : if (NT_STATUS_IS_OK(status)) {
403 675 : struct wreplsrv_pull_table_state *state = talloc_get_type(c->private_data,
404 : struct wreplsrv_pull_table_state);
405 675 : io->out.num_owners = state->table_io.out.num_partners;
406 675 : io->out.owners = talloc_move(mem_ctx, &state->table_io.out.partners);
407 : }
408 :
409 675 : talloc_free(c);
410 675 : return status;
411 : }
412 :
413 : struct wreplsrv_pull_names_io {
414 : struct {
415 : struct wreplsrv_partner *partner;
416 : struct wreplsrv_out_connection *wreplconn;
417 : struct wrepl_wins_owner owner;
418 : } in;
419 : struct {
420 : uint32_t num_names;
421 : struct wrepl_name *names;
422 : } out;
423 : };
424 :
425 : enum wreplsrv_pull_names_stage {
426 : WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION,
427 : WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY,
428 : WREPLSRV_PULL_NAMES_STAGE_DONE
429 : };
430 :
431 : struct wreplsrv_pull_names_state {
432 : enum wreplsrv_pull_names_stage stage;
433 : struct composite_context *c;
434 : struct wrepl_pull_names pull_io;
435 : struct wreplsrv_pull_names_io *io;
436 : struct composite_context *creq;
437 : struct wreplsrv_out_connection *wreplconn;
438 : struct tevent_req *subreq;
439 : };
440 :
441 : static void wreplsrv_pull_names_handler_treq(struct tevent_req *subreq);
442 :
443 675 : static NTSTATUS wreplsrv_pull_names_wait_connection(struct wreplsrv_pull_names_state *state)
444 : {
445 : NTSTATUS status;
446 :
447 675 : status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
448 675 : NT_STATUS_NOT_OK_RETURN(status);
449 :
450 675 : state->pull_io.in.assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
451 675 : state->pull_io.in.partner = state->io->in.owner;
452 1350 : state->subreq = wrepl_pull_names_send(state,
453 675 : state->wreplconn->service->task->event_ctx,
454 675 : state->wreplconn->sock,
455 675 : &state->pull_io);
456 675 : NT_STATUS_HAVE_NO_MEMORY(state->subreq);
457 :
458 675 : tevent_req_set_callback(state->subreq,
459 : wreplsrv_pull_names_handler_treq,
460 : state);
461 :
462 675 : state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY;
463 :
464 675 : return NT_STATUS_OK;
465 : }
466 :
467 675 : static NTSTATUS wreplsrv_pull_names_wait_send_reply(struct wreplsrv_pull_names_state *state)
468 : {
469 : NTSTATUS status;
470 :
471 675 : status = wrepl_pull_names_recv(state->subreq, state, &state->pull_io);
472 675 : TALLOC_FREE(state->subreq);
473 675 : NT_STATUS_NOT_OK_RETURN(status);
474 :
475 675 : state->stage = WREPLSRV_PULL_NAMES_STAGE_DONE;
476 :
477 675 : return NT_STATUS_OK;
478 : }
479 :
480 1350 : static void wreplsrv_pull_names_handler(struct wreplsrv_pull_names_state *state)
481 : {
482 1350 : struct composite_context *c = state->c;
483 :
484 1350 : switch (state->stage) {
485 675 : case WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION:
486 675 : c->status = wreplsrv_pull_names_wait_connection(state);
487 675 : break;
488 675 : case WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY:
489 675 : c->status = wreplsrv_pull_names_wait_send_reply(state);
490 675 : c->state = COMPOSITE_STATE_DONE;
491 675 : break;
492 0 : case WREPLSRV_PULL_NAMES_STAGE_DONE:
493 0 : c->status = NT_STATUS_INTERNAL_ERROR;
494 : }
495 :
496 1350 : if (!NT_STATUS_IS_OK(c->status)) {
497 0 : c->state = COMPOSITE_STATE_ERROR;
498 : }
499 :
500 1350 : if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
501 675 : c->async.fn(c);
502 : }
503 1350 : }
504 :
505 675 : static void wreplsrv_pull_names_handler_creq(struct composite_context *creq)
506 : {
507 675 : struct wreplsrv_pull_names_state *state = talloc_get_type(creq->async.private_data,
508 : struct wreplsrv_pull_names_state);
509 675 : wreplsrv_pull_names_handler(state);
510 675 : return;
511 : }
512 :
513 675 : static void wreplsrv_pull_names_handler_treq(struct tevent_req *subreq)
514 : {
515 675 : struct wreplsrv_pull_names_state *state = tevent_req_callback_data(subreq,
516 : struct wreplsrv_pull_names_state);
517 675 : wreplsrv_pull_names_handler(state);
518 675 : return;
519 : }
520 :
521 675 : static struct composite_context *wreplsrv_pull_names_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_names_io *io)
522 : {
523 675 : struct composite_context *c = NULL;
524 675 : struct wreplsrv_service *service = io->in.partner->service;
525 675 : struct wreplsrv_pull_names_state *state = NULL;
526 675 : enum winsrepl_partner_type partner_type = WINSREPL_PARTNER_PULL;
527 :
528 675 : if (io->in.wreplconn) partner_type = WINSREPL_PARTNER_NONE;
529 :
530 675 : c = talloc_zero(mem_ctx, struct composite_context);
531 675 : if (!c) goto failed;
532 :
533 675 : state = talloc_zero(c, struct wreplsrv_pull_names_state);
534 675 : if (!state) goto failed;
535 675 : state->c = c;
536 675 : state->io = io;
537 :
538 675 : c->state = COMPOSITE_STATE_IN_PROGRESS;
539 675 : c->event_ctx = service->task->event_ctx;
540 675 : c->private_data = state;
541 :
542 675 : state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION;
543 675 : state->creq = wreplsrv_out_connect_send(io->in.partner, partner_type, io->in.wreplconn);
544 675 : if (!state->creq) goto failed;
545 :
546 675 : state->creq->async.fn = wreplsrv_pull_names_handler_creq;
547 675 : state->creq->async.private_data = state;
548 :
549 675 : return c;
550 0 : failed:
551 0 : talloc_free(c);
552 0 : return NULL;
553 : }
554 :
555 675 : static NTSTATUS wreplsrv_pull_names_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
556 : struct wreplsrv_pull_names_io *io)
557 : {
558 : NTSTATUS status;
559 :
560 675 : status = composite_wait(c);
561 :
562 675 : if (NT_STATUS_IS_OK(status)) {
563 675 : struct wreplsrv_pull_names_state *state = talloc_get_type(c->private_data,
564 : struct wreplsrv_pull_names_state);
565 675 : io->out.num_names = state->pull_io.out.num_names;
566 675 : io->out.names = talloc_move(mem_ctx, &state->pull_io.out.names);
567 : }
568 :
569 675 : talloc_free(c);
570 675 : return status;
571 :
572 : }
573 :
574 : enum wreplsrv_pull_cycle_stage {
575 : WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY,
576 : WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES,
577 : WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC,
578 : WREPLSRV_PULL_CYCLE_STAGE_DONE
579 : };
580 :
581 : struct wreplsrv_pull_cycle_state {
582 : enum wreplsrv_pull_cycle_stage stage;
583 : struct composite_context *c;
584 : struct wreplsrv_pull_cycle_io *io;
585 : struct wreplsrv_pull_table_io table_io;
586 : uint32_t current;
587 : struct wreplsrv_pull_names_io names_io;
588 : struct composite_context *creq;
589 : struct wrepl_associate_stop assoc_stop_io;
590 : struct tevent_req *subreq;
591 : };
592 :
593 : static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq);
594 : static void wreplsrv_pull_cycle_handler_treq(struct tevent_req *subreq);
595 :
596 1350 : static NTSTATUS wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state *state)
597 : {
598 1350 : struct wreplsrv_owner *current_owner=NULL;
599 : struct wreplsrv_owner *local_owner;
600 : uint32_t i;
601 1350 : uint64_t old_max_version = 0;
602 1350 : bool do_pull = false;
603 :
604 2025 : for (i=state->current; i < state->table_io.out.num_owners; i++) {
605 2700 : current_owner = wreplsrv_find_owner(state->io->in.partner->service,
606 1350 : state->io->in.partner->pull.table,
607 1350 : state->table_io.out.owners[i].address);
608 :
609 2700 : local_owner = wreplsrv_find_owner(state->io->in.partner->service,
610 1350 : state->io->in.partner->service->table,
611 1350 : state->table_io.out.owners[i].address);
612 : /*
613 : * this means we are ourself the current owner,
614 : * and we don't want replicate ourself
615 : */
616 1350 : if (!current_owner) continue;
617 :
618 : /*
619 : * this means we don't have any records of this owner
620 : * so fetch them
621 : */
622 1350 : if (!local_owner) {
623 3 : do_pull = true;
624 :
625 3 : break;
626 : }
627 :
628 : /*
629 : * this means the remote partner has some new records of this owner
630 : * fetch them
631 : */
632 1347 : if (current_owner->owner.max_version > local_owner->owner.max_version) {
633 672 : do_pull = true;
634 672 : old_max_version = local_owner->owner.max_version;
635 672 : break;
636 : }
637 : }
638 1350 : state->current = i;
639 :
640 1350 : if (do_pull) {
641 675 : state->names_io.in.partner = state->io->in.partner;
642 675 : state->names_io.in.wreplconn = state->io->in.wreplconn;
643 675 : state->names_io.in.owner = current_owner->owner;
644 675 : state->names_io.in.owner.min_version = old_max_version + 1;
645 675 : state->creq = wreplsrv_pull_names_send(state, &state->names_io);
646 675 : NT_STATUS_HAVE_NO_MEMORY(state->creq);
647 :
648 675 : state->creq->async.fn = wreplsrv_pull_cycle_handler_creq;
649 675 : state->creq->async.private_data = state;
650 :
651 675 : return STATUS_MORE_ENTRIES;
652 : }
653 :
654 675 : return NT_STATUS_OK;
655 : }
656 :
657 1350 : static NTSTATUS wreplsrv_pull_cycle_next_owner_wrapper(struct wreplsrv_pull_cycle_state *state)
658 : {
659 : NTSTATUS status;
660 :
661 1350 : status = wreplsrv_pull_cycle_next_owner_do_work(state);
662 1350 : if (NT_STATUS_IS_OK(status)) {
663 675 : state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
664 675 : } else if (NT_STATUS_EQUAL(STATUS_MORE_ENTRIES, status)) {
665 675 : state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES;
666 675 : status = NT_STATUS_OK;
667 : }
668 :
669 1350 : if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE && state->io->in.wreplconn) {
670 675 : state->assoc_stop_io.in.assoc_ctx = state->io->in.wreplconn->assoc_ctx.peer_ctx;
671 675 : state->assoc_stop_io.in.reason = 0;
672 1350 : state->subreq = wrepl_associate_stop_send(state,
673 675 : state->io->in.wreplconn->service->task->event_ctx,
674 675 : state->io->in.wreplconn->sock,
675 675 : &state->assoc_stop_io);
676 675 : NT_STATUS_HAVE_NO_MEMORY(state->subreq);
677 :
678 675 : tevent_req_set_callback(state->subreq,
679 : wreplsrv_pull_cycle_handler_treq,
680 : state);
681 :
682 675 : state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC;
683 : }
684 :
685 1350 : return status;
686 : }
687 :
688 675 : static NTSTATUS wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_state *state)
689 : {
690 : NTSTATUS status;
691 : uint32_t i;
692 :
693 675 : status = wreplsrv_pull_table_recv(state->creq, state, &state->table_io);
694 675 : NT_STATUS_NOT_OK_RETURN(status);
695 :
696 : /* update partner table */
697 1350 : for (i=0; i < state->table_io.out.num_owners; i++) {
698 2700 : status = wreplsrv_add_table(state->io->in.partner->service,
699 675 : state->io->in.partner,
700 675 : &state->io->in.partner->pull.table,
701 675 : state->table_io.out.owners[i].address,
702 675 : state->table_io.out.owners[i].max_version);
703 675 : NT_STATUS_NOT_OK_RETURN(status);
704 : }
705 :
706 675 : status = wreplsrv_pull_cycle_next_owner_wrapper(state);
707 675 : NT_STATUS_NOT_OK_RETURN(status);
708 :
709 675 : return status;
710 : }
711 :
712 675 : static NTSTATUS wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_state *state)
713 : {
714 : NTSTATUS status;
715 :
716 675 : status = wreplsrv_apply_records(state->io->in.partner,
717 : &state->names_io.in.owner,
718 : state->names_io.out.num_names,
719 : state->names_io.out.names);
720 675 : NT_STATUS_NOT_OK_RETURN(status);
721 :
722 675 : talloc_free(state->names_io.out.names);
723 675 : ZERO_STRUCT(state->names_io);
724 :
725 675 : return NT_STATUS_OK;
726 : }
727 :
728 675 : static NTSTATUS wreplsrv_pull_cycle_wait_send_replies(struct wreplsrv_pull_cycle_state *state)
729 : {
730 : NTSTATUS status;
731 :
732 675 : status = wreplsrv_pull_names_recv(state->creq, state, &state->names_io);
733 675 : NT_STATUS_NOT_OK_RETURN(status);
734 :
735 : /*
736 : * TODO: this should maybe an async call,
737 : * because we may need some network access
738 : * for conflict resolving
739 : */
740 675 : status = wreplsrv_pull_cycle_apply_records(state);
741 675 : NT_STATUS_NOT_OK_RETURN(status);
742 :
743 675 : status = wreplsrv_pull_cycle_next_owner_wrapper(state);
744 675 : NT_STATUS_NOT_OK_RETURN(status);
745 :
746 675 : return status;
747 : }
748 :
749 675 : static NTSTATUS wreplsrv_pull_cycle_wait_stop_assoc(struct wreplsrv_pull_cycle_state *state)
750 : {
751 : NTSTATUS status;
752 :
753 675 : status = wrepl_associate_stop_recv(state->subreq, &state->assoc_stop_io);
754 675 : TALLOC_FREE(state->subreq);
755 675 : NT_STATUS_NOT_OK_RETURN(status);
756 :
757 675 : state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
758 :
759 675 : return status;
760 : }
761 :
762 2025 : static void wreplsrv_pull_cycle_handler(struct wreplsrv_pull_cycle_state *state)
763 : {
764 2025 : struct composite_context *c = state->c;
765 :
766 2025 : switch (state->stage) {
767 675 : case WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY:
768 675 : c->status = wreplsrv_pull_cycle_wait_table_reply(state);
769 675 : break;
770 675 : case WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES:
771 675 : c->status = wreplsrv_pull_cycle_wait_send_replies(state);
772 675 : break;
773 675 : case WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC:
774 675 : c->status = wreplsrv_pull_cycle_wait_stop_assoc(state);
775 675 : break;
776 0 : case WREPLSRV_PULL_CYCLE_STAGE_DONE:
777 0 : c->status = NT_STATUS_INTERNAL_ERROR;
778 : }
779 :
780 2025 : if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE) {
781 675 : c->state = COMPOSITE_STATE_DONE;
782 : }
783 :
784 2025 : if (!NT_STATUS_IS_OK(c->status)) {
785 0 : c->state = COMPOSITE_STATE_ERROR;
786 : }
787 :
788 2025 : if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
789 675 : c->async.fn(c);
790 : }
791 2025 : }
792 :
793 1350 : static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq)
794 : {
795 1350 : struct wreplsrv_pull_cycle_state *state = talloc_get_type(creq->async.private_data,
796 : struct wreplsrv_pull_cycle_state);
797 1350 : wreplsrv_pull_cycle_handler(state);
798 1350 : return;
799 : }
800 :
801 675 : static void wreplsrv_pull_cycle_handler_treq(struct tevent_req *subreq)
802 : {
803 675 : struct wreplsrv_pull_cycle_state *state = tevent_req_callback_data(subreq,
804 : struct wreplsrv_pull_cycle_state);
805 675 : wreplsrv_pull_cycle_handler(state);
806 675 : return;
807 : }
808 :
809 675 : struct composite_context *wreplsrv_pull_cycle_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_cycle_io *io)
810 : {
811 675 : struct composite_context *c = NULL;
812 675 : struct wreplsrv_service *service = io->in.partner->service;
813 675 : struct wreplsrv_pull_cycle_state *state = NULL;
814 :
815 675 : c = talloc_zero(mem_ctx, struct composite_context);
816 675 : if (!c) goto failed;
817 :
818 675 : state = talloc_zero(c, struct wreplsrv_pull_cycle_state);
819 675 : if (!state) goto failed;
820 675 : state->c = c;
821 675 : state->io = io;
822 :
823 675 : c->state = COMPOSITE_STATE_IN_PROGRESS;
824 675 : c->event_ctx = service->task->event_ctx;
825 675 : c->private_data = state;
826 :
827 675 : state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY;
828 675 : state->table_io.in.partner = io->in.partner;
829 675 : state->table_io.in.num_owners = io->in.num_owners;
830 675 : state->table_io.in.owners = io->in.owners;
831 675 : state->creq = wreplsrv_pull_table_send(state, &state->table_io);
832 675 : if (!state->creq) goto failed;
833 :
834 675 : state->creq->async.fn = wreplsrv_pull_cycle_handler_creq;
835 675 : state->creq->async.private_data = state;
836 :
837 675 : return c;
838 0 : failed:
839 0 : talloc_free(c);
840 0 : return NULL;
841 : }
842 :
843 675 : NTSTATUS wreplsrv_pull_cycle_recv(struct composite_context *c)
844 : {
845 : NTSTATUS status;
846 :
847 675 : status = composite_wait(c);
848 :
849 675 : talloc_free(c);
850 675 : return status;
851 : }
852 :
853 : enum wreplsrv_push_notify_stage {
854 : WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT,
855 : WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE,
856 : WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM,
857 : WREPLSRV_PUSH_NOTIFY_STAGE_DONE
858 : };
859 :
860 : struct wreplsrv_push_notify_state {
861 : enum wreplsrv_push_notify_stage stage;
862 : struct composite_context *c;
863 : struct wreplsrv_push_notify_io *io;
864 : enum wrepl_replication_cmd command;
865 : bool full_table;
866 : struct wrepl_send_ctrl ctrl;
867 : struct wrepl_packet req_packet;
868 : struct wrepl_packet *rep_packet;
869 : struct composite_context *creq;
870 : struct wreplsrv_out_connection *wreplconn;
871 : struct tevent_req *subreq;
872 : };
873 :
874 : static void wreplsrv_push_notify_handler_creq(struct composite_context *creq);
875 : static void wreplsrv_push_notify_handler_treq(struct tevent_req *subreq);
876 :
877 0 : static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *state)
878 : {
879 0 : struct wreplsrv_service *service = state->io->in.partner->service;
880 0 : struct wrepl_packet *req = &state->req_packet;
881 0 : struct wrepl_replication *repl_out = &state->req_packet.message.replication;
882 0 : struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
883 : NTSTATUS status;
884 :
885 : /* prepare the outgoing request */
886 0 : req->opcode = WREPL_OPCODE_BITS;
887 0 : req->assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
888 0 : req->mess_type = WREPL_REPLICATION;
889 :
890 0 : repl_out->command = state->command;
891 :
892 0 : status = wreplsrv_fill_wrepl_table(service, state, table_out,
893 0 : service->wins_db->local_owner, state->full_table);
894 0 : NT_STATUS_NOT_OK_RETURN(status);
895 :
896 : /* queue the request */
897 0 : state->subreq = wrepl_request_send(state,
898 0 : state->wreplconn->service->task->event_ctx,
899 0 : state->wreplconn->sock, req, NULL);
900 0 : NT_STATUS_HAVE_NO_MEMORY(state->subreq);
901 :
902 0 : tevent_req_set_callback(state->subreq,
903 : wreplsrv_push_notify_handler_treq,
904 : state);
905 :
906 0 : state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE;
907 :
908 0 : return NT_STATUS_OK;
909 : }
910 :
911 0 : static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *state)
912 : {
913 0 : struct wreplsrv_service *service = state->io->in.partner->service;
914 0 : struct wrepl_packet *req = &state->req_packet;
915 0 : struct wrepl_replication *repl_out = &state->req_packet.message.replication;
916 0 : struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
917 : NTSTATUS status;
918 :
919 0 : req->opcode = WREPL_OPCODE_BITS;
920 0 : req->assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
921 0 : req->mess_type = WREPL_REPLICATION;
922 :
923 0 : repl_out->command = state->command;
924 :
925 0 : status = wreplsrv_fill_wrepl_table(service, state, table_out,
926 0 : service->wins_db->local_owner, state->full_table);
927 0 : NT_STATUS_NOT_OK_RETURN(status);
928 :
929 : /* we won't get a reply to a inform message */
930 0 : state->ctrl.send_only = true;
931 :
932 0 : state->subreq = wrepl_request_send(state,
933 0 : state->wreplconn->service->task->event_ctx,
934 0 : state->wreplconn->sock, req, &state->ctrl);
935 0 : NT_STATUS_HAVE_NO_MEMORY(state->subreq);
936 :
937 0 : tevent_req_set_callback(state->subreq,
938 : wreplsrv_push_notify_handler_treq,
939 : state);
940 :
941 0 : state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM;
942 :
943 0 : return NT_STATUS_OK;
944 : }
945 :
946 0 : static NTSTATUS wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_state *state)
947 : {
948 : NTSTATUS status;
949 :
950 0 : status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
951 0 : NT_STATUS_NOT_OK_RETURN(status);
952 :
953 : /* is the peer doesn't support inform fallback to update */
954 0 : switch (state->command) {
955 0 : case WREPL_REPL_INFORM:
956 0 : if (state->wreplconn->assoc_ctx.peer_major < 5) {
957 0 : state->command = WREPL_REPL_UPDATE;
958 : }
959 0 : break;
960 0 : case WREPL_REPL_INFORM2:
961 0 : if (state->wreplconn->assoc_ctx.peer_major < 5) {
962 0 : state->command = WREPL_REPL_UPDATE2;
963 : }
964 0 : break;
965 0 : default:
966 0 : break;
967 : }
968 :
969 0 : switch (state->command) {
970 0 : case WREPL_REPL_UPDATE:
971 0 : state->full_table = true;
972 0 : return wreplsrv_push_notify_update(state);
973 0 : case WREPL_REPL_UPDATE2:
974 0 : state->full_table = false;
975 0 : return wreplsrv_push_notify_update(state);
976 0 : case WREPL_REPL_INFORM:
977 0 : state->full_table = true;
978 0 : return wreplsrv_push_notify_inform(state);
979 0 : case WREPL_REPL_INFORM2:
980 0 : state->full_table = false;
981 0 : return wreplsrv_push_notify_inform(state);
982 0 : default:
983 0 : return NT_STATUS_INTERNAL_ERROR;
984 : }
985 : }
986 :
987 0 : static NTSTATUS wreplsrv_push_notify_wait_update(struct wreplsrv_push_notify_state *state)
988 : {
989 : struct wreplsrv_in_connection *wrepl_in;
990 : struct tstream_context *stream;
991 0 : void *process_context = NULL;
992 : NTSTATUS status;
993 :
994 0 : status = wrepl_request_recv(state->subreq, state, NULL);
995 0 : TALLOC_FREE(state->subreq);
996 0 : NT_STATUS_NOT_OK_RETURN(status);
997 :
998 : /*
999 : * now we need to convert the wrepl_socket (client connection)
1000 : * into a wreplsrv_in_connection (server connection), because
1001 : * we'll act as a server on this connection after the WREPL_REPL_UPDATE*
1002 : * message is received by the peer.
1003 : */
1004 :
1005 0 : status = wrepl_socket_split_stream(state->wreplconn->sock, state, &stream);
1006 0 : NT_STATUS_NOT_OK_RETURN(status);
1007 :
1008 : /*
1009 : * now create a wreplsrv_in_connection,
1010 : * on which we act as server
1011 : *
1012 : * NOTE: stream will be stolen by
1013 : * wreplsrv_in_connection_merge()
1014 : */
1015 0 : process_context = state->io->in.partner->service->task->process_context;
1016 0 : status = wreplsrv_in_connection_merge(state->io->in.partner,
1017 0 : state->wreplconn->assoc_ctx.peer_ctx,
1018 : &stream,
1019 : &wrepl_in, process_context);
1020 0 : NT_STATUS_NOT_OK_RETURN(status);
1021 :
1022 : /* now we can free the wreplsrv_out_connection */
1023 0 : TALLOC_FREE(state->wreplconn);
1024 :
1025 0 : state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
1026 0 : return NT_STATUS_OK;
1027 : }
1028 :
1029 0 : static NTSTATUS wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_state *state)
1030 : {
1031 : NTSTATUS status;
1032 :
1033 0 : status = wrepl_request_recv(state->subreq, state, NULL);
1034 0 : TALLOC_FREE(state->subreq);
1035 0 : NT_STATUS_NOT_OK_RETURN(status);
1036 :
1037 0 : state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
1038 0 : return status;
1039 : }
1040 :
1041 0 : static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state *state)
1042 : {
1043 0 : struct composite_context *c = state->c;
1044 :
1045 0 : switch (state->stage) {
1046 0 : case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT:
1047 0 : c->status = wreplsrv_push_notify_wait_connect(state);
1048 0 : break;
1049 0 : case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE:
1050 0 : c->status = wreplsrv_push_notify_wait_update(state);
1051 0 : break;
1052 0 : case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM:
1053 0 : c->status = wreplsrv_push_notify_wait_inform(state);
1054 0 : break;
1055 0 : case WREPLSRV_PUSH_NOTIFY_STAGE_DONE:
1056 0 : c->status = NT_STATUS_INTERNAL_ERROR;
1057 : }
1058 :
1059 0 : if (state->stage == WREPLSRV_PUSH_NOTIFY_STAGE_DONE) {
1060 0 : c->state = COMPOSITE_STATE_DONE;
1061 : }
1062 :
1063 0 : if (!NT_STATUS_IS_OK(c->status)) {
1064 0 : c->state = COMPOSITE_STATE_ERROR;
1065 : }
1066 :
1067 0 : if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
1068 0 : c->async.fn(c);
1069 : }
1070 0 : }
1071 :
1072 0 : static void wreplsrv_push_notify_handler_creq(struct composite_context *creq)
1073 : {
1074 0 : struct wreplsrv_push_notify_state *state = talloc_get_type(creq->async.private_data,
1075 : struct wreplsrv_push_notify_state);
1076 0 : wreplsrv_push_notify_handler(state);
1077 0 : return;
1078 : }
1079 :
1080 0 : static void wreplsrv_push_notify_handler_treq(struct tevent_req *subreq)
1081 : {
1082 0 : struct wreplsrv_push_notify_state *state = tevent_req_callback_data(subreq,
1083 : struct wreplsrv_push_notify_state);
1084 0 : wreplsrv_push_notify_handler(state);
1085 0 : return;
1086 : }
1087 :
1088 0 : struct composite_context *wreplsrv_push_notify_send(TALLOC_CTX *mem_ctx, struct wreplsrv_push_notify_io *io)
1089 : {
1090 0 : struct composite_context *c = NULL;
1091 0 : struct wreplsrv_service *service = io->in.partner->service;
1092 0 : struct wreplsrv_push_notify_state *state = NULL;
1093 : enum winsrepl_partner_type partner_type;
1094 :
1095 0 : c = talloc_zero(mem_ctx, struct composite_context);
1096 0 : if (!c) goto failed;
1097 :
1098 0 : state = talloc_zero(c, struct wreplsrv_push_notify_state);
1099 0 : if (!state) goto failed;
1100 0 : state->c = c;
1101 0 : state->io = io;
1102 :
1103 0 : if (io->in.inform) {
1104 : /* we can cache the connection in partner->push->wreplconn */
1105 0 : partner_type = WINSREPL_PARTNER_PUSH;
1106 0 : if (io->in.propagate) {
1107 0 : state->command = WREPL_REPL_INFORM2;
1108 : } else {
1109 0 : state->command = WREPL_REPL_INFORM;
1110 : }
1111 : } else {
1112 : /* we can NOT cache the connection */
1113 0 : partner_type = WINSREPL_PARTNER_NONE;
1114 0 : if (io->in.propagate) {
1115 0 : state->command = WREPL_REPL_UPDATE2;
1116 : } else {
1117 0 : state->command = WREPL_REPL_UPDATE;
1118 : }
1119 : }
1120 :
1121 0 : c->state = COMPOSITE_STATE_IN_PROGRESS;
1122 0 : c->event_ctx = service->task->event_ctx;
1123 0 : c->private_data = state;
1124 :
1125 0 : state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT;
1126 0 : state->creq = wreplsrv_out_connect_send(io->in.partner, partner_type, NULL);
1127 0 : if (!state->creq) goto failed;
1128 :
1129 0 : state->creq->async.fn = wreplsrv_push_notify_handler_creq;
1130 0 : state->creq->async.private_data = state;
1131 :
1132 0 : return c;
1133 0 : failed:
1134 0 : talloc_free(c);
1135 0 : return NULL;
1136 : }
1137 :
1138 0 : NTSTATUS wreplsrv_push_notify_recv(struct composite_context *c)
1139 : {
1140 : NTSTATUS status;
1141 :
1142 0 : status = composite_wait(c);
1143 :
1144 0 : talloc_free(c);
1145 0 : return status;
1146 : }
|