Line data Source code
1 : /*
2 : Unix SMB/CIFS implementation.
3 : Main metadata server / Spotlight routines / ES backend
4 :
5 : Copyright (C) Ralph Boehme 2019
6 :
7 : This program is free software; you can redistribute it and/or modify
8 : it under the terms of the GNU General Public License as published by
9 : the Free Software Foundation; either version 3 of the License, or
10 : (at your option) any later version.
11 :
12 : This program is distributed in the hope that it will be useful,
13 : but WITHOUT ANY WARRANTY; without even the implied warranty of
14 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 : GNU General Public License for more details.
16 :
17 : You should have received a copy of the GNU General Public License
18 : along with this program. If not, see <http://www.gnu.org/licenses/>.
19 : */
20 :
21 : #include "includes.h"
22 : #include "system/filesys.h"
23 : #include "lib/util/time_basic.h"
24 : #include "lib/tls/tls.h"
25 : #include "lib/util/tevent_ntstatus.h"
26 : #include "libcli/http/http.h"
27 : #include "lib/util/tevent_unix.h"
28 : #include "credentials.h"
29 : #include "mdssvc.h"
30 : #include "mdssvc_es.h"
31 : #include "rpc_server/mdssvc/es_parser.tab.h"
32 :
33 : #include <jansson.h>
34 :
35 : #undef DBGC_CLASS
36 : #define DBGC_CLASS DBGC_RPC_SRV
37 :
38 : #define MDSSVC_ELASTIC_QUERY_TEMPLATE \
39 : "{" \
40 : " \"from\": %zu," \
41 : " \"size\": %zu," \
42 : " \"_source\": [%s]," \
43 : " \"query\": {" \
44 : " \"query_string\": {" \
45 : " \"query\": \"%s\"" \
46 : " }" \
47 : " }" \
48 : "}"
49 :
50 : #define MDSSVC_ELASTIC_SOURCES \
51 : "\"path.real\""
52 :
53 0 : static bool mdssvc_es_init(struct mdssvc_ctx *mdssvc_ctx)
54 : {
55 0 : struct mdssvc_es_ctx *mdssvc_es_ctx = NULL;
56 : json_error_t json_error;
57 0 : char *default_path = NULL;
58 0 : const char *path = NULL;
59 :
60 0 : mdssvc_es_ctx = talloc_zero(mdssvc_ctx, struct mdssvc_es_ctx);
61 0 : if (mdssvc_es_ctx == NULL) {
62 0 : return false;
63 : }
64 0 : mdssvc_es_ctx->mdssvc_ctx = mdssvc_ctx;
65 :
66 0 : mdssvc_es_ctx->creds = cli_credentials_init_anon(mdssvc_es_ctx);
67 0 : if (mdssvc_es_ctx->creds == NULL) {
68 0 : TALLOC_FREE(mdssvc_es_ctx);
69 0 : return false;
70 : }
71 :
72 0 : default_path = talloc_asprintf(
73 : mdssvc_es_ctx,
74 : "%s/mdssvc/elasticsearch_mappings.json",
75 : get_dyn_SAMBA_DATADIR());
76 0 : if (default_path == NULL) {
77 0 : TALLOC_FREE(mdssvc_es_ctx);
78 0 : return false;
79 : }
80 :
81 0 : path = lp_parm_const_string(GLOBAL_SECTION_SNUM,
82 : "elasticsearch",
83 : "mappings",
84 : default_path);
85 0 : if (path == NULL) {
86 0 : TALLOC_FREE(mdssvc_es_ctx);
87 0 : return false;
88 : }
89 :
90 0 : mdssvc_es_ctx->mappings = json_load_file(path, 0, &json_error);
91 0 : if (mdssvc_es_ctx->mappings == NULL) {
92 0 : DBG_ERR("Opening mapping file [%s] failed: %s\n",
93 : path, json_error.text);
94 0 : TALLOC_FREE(mdssvc_es_ctx);
95 0 : return false;
96 : }
97 0 : TALLOC_FREE(default_path);
98 :
99 0 : mdssvc_ctx->backend_private = mdssvc_es_ctx;
100 0 : return true;
101 : }
102 :
103 0 : static bool mdssvc_es_shutdown(struct mdssvc_ctx *mdssvc_ctx)
104 : {
105 0 : return true;
106 : }
107 :
108 : static struct tevent_req *mds_es_connect_send(
109 : TALLOC_CTX *mem_ctx,
110 : struct tevent_context *ev,
111 : struct mds_es_ctx *mds_es_ctx);
112 : static int mds_es_connect_recv(struct tevent_req *req);
113 : static void mds_es_connected(struct tevent_req *subreq);
114 : static bool mds_es_next_search_trigger(struct mds_es_ctx *mds_es_ctx);
115 : static void mds_es_search_set_pending(struct sl_es_search *s);
116 : static void mds_es_search_unset_pending(struct sl_es_search *s);
117 :
118 0 : static int mds_es_ctx_destructor(struct mds_es_ctx *mds_es_ctx)
119 : {
120 0 : struct sl_es_search *s = mds_es_ctx->searches;
121 :
122 : /*
123 : * The per tree-connect state mds_es_ctx (a child of mds_ctx) is about
124 : * to go away and has already freed all waiting searches. If there's a
125 : * search remaining that's when the search is already active. Reset the
126 : * mds_es_ctx pointer, so we can detect this when the search completes.
127 : */
128 :
129 0 : if (s == NULL) {
130 0 : return 0;
131 : }
132 :
133 0 : s->mds_es_ctx = NULL;
134 :
135 0 : return 0;
136 : }
137 :
138 0 : static bool mds_es_connect(struct mds_ctx *mds_ctx)
139 : {
140 0 : struct mdssvc_es_ctx *mdssvc_es_ctx = talloc_get_type_abort(
141 : mds_ctx->mdssvc_ctx->backend_private, struct mdssvc_es_ctx);
142 0 : struct mds_es_ctx *mds_es_ctx = NULL;
143 0 : struct tevent_req *subreq = NULL;
144 :
145 0 : mds_es_ctx = talloc_zero(mds_ctx, struct mds_es_ctx);
146 0 : if (mds_es_ctx == NULL) {
147 0 : return false;
148 : }
149 0 : *mds_es_ctx = (struct mds_es_ctx) {
150 : .mdssvc_es_ctx = mdssvc_es_ctx,
151 : .mds_ctx = mds_ctx,
152 : };
153 :
154 0 : mds_ctx->backend_private = mds_es_ctx;
155 0 : talloc_set_destructor(mds_es_ctx, mds_es_ctx_destructor);
156 :
157 0 : subreq = mds_es_connect_send(
158 : mds_es_ctx,
159 0 : mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
160 : mds_es_ctx);
161 0 : if (subreq == NULL) {
162 0 : TALLOC_FREE(mds_es_ctx);
163 0 : return false;
164 : }
165 0 : tevent_req_set_callback(subreq, mds_es_connected, mds_es_ctx);
166 0 : return true;
167 : }
168 :
169 0 : static void mds_es_connected(struct tevent_req *subreq)
170 : {
171 0 : struct mds_es_ctx *mds_es_ctx = tevent_req_callback_data(
172 : subreq, struct mds_es_ctx);
173 : int ret;
174 : bool ok;
175 :
176 0 : ret = mds_es_connect_recv(subreq);
177 0 : TALLOC_FREE(subreq);
178 0 : if (ret != 0) {
179 0 : DBG_ERR("HTTP connect failed\n");
180 0 : return;
181 : }
182 :
183 0 : ok = mds_es_next_search_trigger(mds_es_ctx);
184 0 : if (!ok) {
185 0 : DBG_ERR("mds_es_next_search_trigger failed\n");
186 : }
187 0 : return;
188 : }
189 :
190 : struct mds_es_connect_state {
191 : struct tevent_context *ev;
192 : struct mds_es_ctx *mds_es_ctx;
193 : struct tevent_queue_entry *qe;
194 : const char *server_addr;
195 : uint16_t server_port;
196 : struct tstream_tls_params *tls_params;
197 : };
198 :
199 : static void mds_es_http_connect_done(struct tevent_req *subreq);
200 : static void mds_es_http_waited(struct tevent_req *subreq);
201 :
202 0 : static struct tevent_req *mds_es_connect_send(
203 : TALLOC_CTX *mem_ctx,
204 : struct tevent_context *ev,
205 : struct mds_es_ctx *mds_es_ctx)
206 : {
207 0 : struct tevent_req *req = NULL;
208 0 : struct tevent_req *subreq = NULL;
209 0 : struct mds_es_connect_state *state = NULL;
210 0 : const char *server_addr = NULL;
211 : bool use_tls;
212 : NTSTATUS status;
213 :
214 0 : req = tevent_req_create(mem_ctx, &state, struct mds_es_connect_state);
215 0 : if (req == NULL) {
216 0 : return NULL;
217 : }
218 0 : *state = (struct mds_es_connect_state) {
219 : .ev = ev,
220 : .mds_es_ctx = mds_es_ctx,
221 : };
222 :
223 0 : server_addr = lp_parm_const_string(
224 0 : mds_es_ctx->mds_ctx->snum,
225 : "elasticsearch",
226 : "address",
227 : "localhost");
228 0 : state->server_addr = talloc_strdup(state, server_addr);
229 0 : if (tevent_req_nomem(state->server_addr, req)) {
230 0 : return tevent_req_post(req, ev);
231 : }
232 :
233 0 : state->server_port = lp_parm_int(
234 0 : mds_es_ctx->mds_ctx->snum,
235 : "elasticsearch",
236 : "port",
237 : 9200);
238 :
239 0 : use_tls = lp_parm_bool(
240 0 : mds_es_ctx->mds_ctx->snum,
241 : "elasticsearch",
242 : "use tls",
243 : false);
244 :
245 0 : DBG_DEBUG("Connecting to HTTP%s [%s] port [%"PRIu16"]\n",
246 : use_tls ? "S" : "", state->server_addr, state->server_port);
247 :
248 0 : if (use_tls) {
249 0 : const char *ca_file = lp__tls_cafile();
250 0 : const char *crl_file = lp__tls_crlfile();
251 0 : const char *tls_priority = lp_tls_priority();
252 0 : enum tls_verify_peer_state verify_peer = lp_tls_verify_peer();
253 :
254 0 : status = tstream_tls_params_client(state,
255 : ca_file,
256 : crl_file,
257 : tls_priority,
258 : verify_peer,
259 0 : state->server_addr,
260 0 : &state->tls_params);
261 0 : if (!NT_STATUS_IS_OK(status)) {
262 0 : DBG_ERR("Failed tstream_tls_params_client - %s\n",
263 : nt_errstr(status));
264 0 : tevent_req_nterror(req, status);
265 0 : return tevent_req_post(req, ev);
266 : }
267 : }
268 :
269 0 : subreq = http_connect_send(state,
270 0 : state->ev,
271 0 : state->server_addr,
272 0 : state->server_port,
273 0 : mds_es_ctx->mdssvc_es_ctx->creds,
274 0 : state->tls_params);
275 0 : if (tevent_req_nomem(subreq, req)) {
276 0 : return tevent_req_post(req, ev);
277 : }
278 0 : tevent_req_set_callback(subreq, mds_es_http_connect_done, req);
279 0 : return req;
280 : }
281 :
282 0 : static void mds_es_http_connect_done(struct tevent_req *subreq)
283 : {
284 0 : struct tevent_req *req = tevent_req_callback_data(
285 : subreq, struct tevent_req);
286 0 : struct mds_es_connect_state *state = tevent_req_data(
287 : req, struct mds_es_connect_state);
288 : int error;
289 :
290 0 : error = http_connect_recv(subreq,
291 0 : state->mds_es_ctx,
292 0 : &state->mds_es_ctx->http_conn);
293 0 : TALLOC_FREE(subreq);
294 0 : if (error != 0) {
295 0 : DBG_ERR("HTTP connect failed, retrying...\n");
296 :
297 0 : subreq = tevent_wakeup_send(
298 0 : state->mds_es_ctx,
299 0 : state->mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
300 : tevent_timeval_current_ofs(10, 0));
301 0 : if (tevent_req_nomem(subreq, req)) {
302 0 : return;
303 : }
304 0 : tevent_req_set_callback(subreq,
305 : mds_es_http_waited,
306 : req);
307 0 : return;
308 : }
309 :
310 0 : DBG_DEBUG("Connected to HTTP%s [%s] port [%"PRIu16"]\n",
311 : state->tls_params ? "S" : "",
312 : state->server_addr, state->server_port);
313 :
314 0 : tevent_req_done(req);
315 0 : return;
316 : }
317 :
318 0 : static void mds_es_http_waited(struct tevent_req *subreq)
319 : {
320 0 : struct tevent_req *req = tevent_req_callback_data(
321 : subreq, struct tevent_req);
322 0 : struct mds_es_connect_state *state = tevent_req_data(
323 : req, struct mds_es_connect_state);
324 : bool ok;
325 :
326 0 : ok = tevent_wakeup_recv(subreq);
327 0 : TALLOC_FREE(subreq);
328 0 : if (!ok) {
329 0 : tevent_req_error(req, ETIMEDOUT);
330 0 : return;
331 : }
332 :
333 0 : subreq = mds_es_connect_send(
334 0 : state->mds_es_ctx,
335 0 : state->mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
336 : state->mds_es_ctx);
337 0 : if (tevent_req_nomem(subreq, req)) {
338 0 : return;
339 : }
340 0 : tevent_req_set_callback(subreq, mds_es_connected, state->mds_es_ctx);
341 : }
342 :
343 0 : static int mds_es_connect_recv(struct tevent_req *req)
344 : {
345 0 : return tevent_req_simple_recv_unix(req);
346 : }
347 :
348 0 : static void mds_es_reconnect_on_error(struct sl_es_search *s)
349 : {
350 0 : struct mds_es_ctx *mds_es_ctx = s->mds_es_ctx;
351 0 : struct tevent_req *subreq = NULL;
352 :
353 0 : if (s->slq != NULL) {
354 0 : s->slq->state = SLQ_STATE_ERROR;
355 : }
356 :
357 0 : DBG_WARNING("Reconnecting HTTP...\n");
358 0 : TALLOC_FREE(mds_es_ctx->http_conn);
359 :
360 0 : subreq = mds_es_connect_send(
361 : mds_es_ctx,
362 0 : mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
363 : mds_es_ctx);
364 0 : if (subreq == NULL) {
365 0 : DBG_ERR("mds_es_connect_send failed\n");
366 0 : return;
367 : }
368 0 : tevent_req_set_callback(subreq, mds_es_connected, mds_es_ctx);
369 : }
370 :
371 0 : static int search_destructor(struct sl_es_search *s)
372 : {
373 0 : if (s->mds_es_ctx == NULL) {
374 0 : return 0;
375 : }
376 0 : DLIST_REMOVE(s->mds_es_ctx->searches, s);
377 0 : return 0;
378 : }
379 :
380 : static struct tevent_req *mds_es_search_send(TALLOC_CTX *mem_ctx,
381 : struct tevent_context *ev,
382 : struct sl_es_search *s);
383 : static int mds_es_search_recv(struct tevent_req *req);
384 : static void mds_es_search_done(struct tevent_req *subreq);
385 :
386 0 : static bool mds_es_search(struct sl_query *slq)
387 : {
388 0 : struct mds_es_ctx *mds_es_ctx = talloc_get_type_abort(
389 : slq->mds_ctx->backend_private, struct mds_es_ctx);
390 0 : struct sl_es_search *s = NULL;
391 : bool ok;
392 :
393 0 : s = talloc_zero(slq, struct sl_es_search);
394 0 : if (s == NULL) {
395 0 : return false;
396 : }
397 0 : *s = (struct sl_es_search) {
398 0 : .ev = mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
399 : .mds_es_ctx = mds_es_ctx,
400 : .slq = slq,
401 : .size = SL_PAGESIZE,
402 : };
403 :
404 : /* 0 would mean no limit */
405 0 : s->max = lp_parm_ulonglong(s->slq->mds_ctx->snum,
406 : "elasticsearch",
407 : "max results",
408 : MAX_SL_RESULTS);
409 :
410 0 : DBG_DEBUG("Spotlight query: '%s'\n", slq->query_string);
411 :
412 0 : ok = map_spotlight_to_es_query(
413 : s,
414 0 : mds_es_ctx->mdssvc_es_ctx->mappings,
415 : slq->path_scope,
416 0 : slq->query_string,
417 : &s->es_query);
418 0 : if (!ok) {
419 0 : TALLOC_FREE(s);
420 0 : return false;
421 : }
422 0 : DBG_DEBUG("Elasticsearch query: '%s'\n", s->es_query);
423 :
424 0 : slq->backend_private = s;
425 0 : slq->state = SLQ_STATE_RUNNING;
426 0 : DLIST_ADD_END(mds_es_ctx->searches, s);
427 0 : talloc_set_destructor(s, search_destructor);
428 :
429 0 : return mds_es_next_search_trigger(mds_es_ctx);
430 : }
431 :
432 0 : static bool mds_es_next_search_trigger(struct mds_es_ctx *mds_es_ctx)
433 : {
434 0 : struct tevent_req *subreq = NULL;
435 0 : struct sl_es_search *s = mds_es_ctx->searches;
436 :
437 0 : if (mds_es_ctx->http_conn == NULL) {
438 0 : DBG_DEBUG("Waiting for HTTP connection...\n");
439 0 : return true;
440 : }
441 0 : if (s == NULL) {
442 0 : DBG_DEBUG("No pending searches, idling...\n");
443 0 : return true;
444 : }
445 0 : if (s->pending) {
446 0 : DBG_DEBUG("Search pending [%p]\n", s);
447 0 : return true;
448 : }
449 :
450 0 : subreq = mds_es_search_send(s, s->ev, s);
451 0 : if (subreq == NULL) {
452 0 : return false;
453 : }
454 0 : tevent_req_set_callback(subreq, mds_es_search_done, s);
455 0 : mds_es_search_set_pending(s);
456 0 : return true;
457 : }
458 :
459 0 : static void mds_es_search_done(struct tevent_req *subreq)
460 : {
461 0 : struct sl_es_search *s = tevent_req_callback_data(
462 : subreq, struct sl_es_search);
463 0 : struct mds_es_ctx *mds_es_ctx = s->mds_es_ctx;
464 0 : struct sl_query *slq = s->slq;
465 : int ret;
466 : bool ok;
467 :
468 0 : DBG_DEBUG("Search done for search [%p]\n", s);
469 :
470 0 : mds_es_search_unset_pending(s);
471 :
472 0 : if (mds_es_ctx == NULL) {
473 : /*
474 : * Search connection closed by the user while s was pending.
475 : */
476 0 : TALLOC_FREE(s);
477 0 : return;
478 : }
479 :
480 0 : DLIST_REMOVE(mds_es_ctx->searches, s);
481 :
482 0 : ret = mds_es_search_recv(subreq);
483 0 : TALLOC_FREE(subreq);
484 0 : if (ret != 0) {
485 0 : mds_es_reconnect_on_error(s);
486 0 : return;
487 : }
488 :
489 0 : if (slq == NULL) {
490 : /*
491 : * Closed by the user. Explicitly free "s" here because the
492 : * talloc parent slq is already gone.
493 : */
494 0 : TALLOC_FREE(s);
495 0 : goto trigger;
496 : }
497 :
498 0 : SLQ_DEBUG(10, slq, "search done");
499 :
500 0 : if (s->total == 0 || s->from >= s->max) {
501 0 : slq->state = SLQ_STATE_DONE;
502 0 : goto trigger;
503 : }
504 :
505 0 : if (slq->query_results->num_results >= SL_PAGESIZE) {
506 0 : slq->state = SLQ_STATE_FULL;
507 0 : goto trigger;
508 : }
509 :
510 : /*
511 : * Reschedule this query as there are more results waiting in the
512 : * Elasticsearch server and the client result queue has room as
513 : * well. But put it at the end of the list of active queries as a simple
514 : * heuristic that should ensure all client queries are dispatched to the
515 : * server.
516 : */
517 0 : DLIST_ADD_END(mds_es_ctx->searches, s);
518 :
519 0 : trigger:
520 0 : ok = mds_es_next_search_trigger(mds_es_ctx);
521 0 : if (!ok) {
522 0 : DBG_ERR("mds_es_next_search_trigger failed\n");
523 : }
524 : }
525 :
526 : static void mds_es_search_http_send_done(struct tevent_req *subreq);
527 : static void mds_es_search_http_read_done(struct tevent_req *subreq);
528 :
529 : struct mds_es_search_state {
530 : struct tevent_context *ev;
531 : struct sl_es_search *s;
532 : struct tevent_queue_entry *qe;
533 : struct http_request http_request;
534 : struct http_request *http_response;
535 : };
536 :
537 0 : static int mds_es_search_pending_destructor(struct sl_es_search *s)
538 : {
539 : /*
540 : * s is a child of slq which may get freed when a user closes a
541 : * query. To maintain the HTTP request/response sequence on the HTTP
542 : * channel, we keep processing pending requests and free s when we
543 : * receive the HTTP response for pending requests.
544 : */
545 0 : DBG_DEBUG("Preserving pending search [%p]\n", s);
546 0 : s->slq = NULL;
547 0 : return -1;
548 : }
549 :
550 0 : static void mds_es_search_set_pending(struct sl_es_search *s)
551 : {
552 0 : DBG_DEBUG("Set pending [%p]\n", s);
553 0 : SLQ_DEBUG(10, s->slq, "pending");
554 :
555 0 : s->pending = true;
556 0 : talloc_set_destructor(s, mds_es_search_pending_destructor);
557 0 : }
558 :
559 0 : static void mds_es_search_unset_pending(struct sl_es_search *s)
560 : {
561 0 : DBG_DEBUG("Unset pending [%p]\n", s);
562 0 : if (s->slq != NULL) {
563 0 : SLQ_DEBUG(10, s->slq, "unset pending");
564 : }
565 :
566 0 : s->pending = false;
567 0 : talloc_set_destructor(s, search_destructor);
568 0 : }
569 :
570 0 : static struct tevent_req *mds_es_search_send(TALLOC_CTX *mem_ctx,
571 : struct tevent_context *ev,
572 : struct sl_es_search *s)
573 : {
574 0 : struct tevent_req *req = NULL;
575 0 : struct tevent_req *subreq = NULL;
576 0 : struct mds_es_search_state *state = NULL;
577 0 : const char *index = NULL;
578 0 : char *elastic_query = NULL;
579 0 : char *uri = NULL;
580 : size_t elastic_query_len;
581 0 : char *elastic_query_len_str = NULL;
582 0 : char *hostname = NULL;
583 0 : bool pretty = false;
584 :
585 0 : req = tevent_req_create(mem_ctx, &state, struct mds_es_search_state);
586 0 : if (req == NULL) {
587 0 : return NULL;
588 : }
589 0 : *state = (struct mds_es_search_state) {
590 : .ev = ev,
591 : .s = s,
592 : };
593 :
594 0 : if (!tevent_req_set_endtime(req, ev, timeval_current_ofs(60, 0))) {
595 0 : return tevent_req_post(req, s->ev);
596 : }
597 :
598 0 : index = lp_parm_const_string(s->slq->mds_ctx->snum,
599 : "elasticsearch",
600 : "index",
601 : "_all");
602 0 : if (tevent_req_nomem(index, req)) {
603 0 : return tevent_req_post(req, ev);
604 : }
605 :
606 0 : if (DEBUGLVL(10)) {
607 0 : pretty = true;
608 : }
609 :
610 0 : uri = talloc_asprintf(state,
611 : "/%s/_search%s",
612 : index,
613 : pretty ? "?pretty" : "");
614 0 : if (tevent_req_nomem(uri, req)) {
615 0 : return tevent_req_post(req, ev);
616 : }
617 :
618 0 : elastic_query = talloc_asprintf(state,
619 : MDSSVC_ELASTIC_QUERY_TEMPLATE,
620 : s->from,
621 : s->size,
622 : MDSSVC_ELASTIC_SOURCES,
623 : s->es_query);
624 0 : if (tevent_req_nomem(elastic_query, req)) {
625 0 : return tevent_req_post(req, ev);
626 : }
627 0 : DBG_DEBUG("Elastic query: '%s'\n", elastic_query);
628 :
629 0 : elastic_query_len = strlen(elastic_query);
630 :
631 0 : state->http_request = (struct http_request) {
632 : .type = HTTP_REQ_POST,
633 : .uri = uri,
634 0 : .body = data_blob_const(elastic_query, elastic_query_len),
635 : .major = '1',
636 : .minor = '1',
637 : };
638 :
639 0 : elastic_query_len_str = talloc_asprintf(state, "%zu", elastic_query_len);
640 0 : if (tevent_req_nomem(elastic_query_len_str, req)) {
641 0 : return tevent_req_post(req, ev);
642 : }
643 :
644 0 : hostname = get_myname(state);
645 0 : if (tevent_req_nomem(hostname, req)) {
646 0 : return tevent_req_post(req, ev);
647 : }
648 :
649 0 : http_add_header(state, &state->http_request.headers,
650 : "Content-Type", "application/json");
651 0 : http_add_header(state, &state->http_request.headers,
652 : "Accept", "application/json");
653 0 : http_add_header(state, &state->http_request.headers,
654 : "User-Agent", "Samba/mdssvc");
655 0 : http_add_header(state, &state->http_request.headers,
656 : "Host", hostname);
657 0 : http_add_header(state, &state->http_request.headers,
658 : "Content-Length", elastic_query_len_str);
659 :
660 0 : subreq = http_send_request_send(state,
661 : ev,
662 0 : s->mds_es_ctx->http_conn,
663 0 : &state->http_request);
664 0 : if (tevent_req_nomem(subreq, req)) {
665 0 : return tevent_req_post(req, ev);
666 : }
667 0 : tevent_req_set_callback(subreq, mds_es_search_http_send_done, req);
668 0 : return req;
669 : }
670 :
671 0 : static void mds_es_search_http_send_done(struct tevent_req *subreq)
672 : {
673 0 : struct tevent_req *req = tevent_req_callback_data(
674 : subreq, struct tevent_req);
675 0 : struct mds_es_search_state *state = tevent_req_data(
676 : req, struct mds_es_search_state);
677 : NTSTATUS status;
678 :
679 0 : DBG_DEBUG("Sent out search [%p]\n", state->s);
680 :
681 0 : status = http_send_request_recv(subreq);
682 0 : TALLOC_FREE(subreq);
683 0 : if (!NT_STATUS_IS_OK(status)) {
684 0 : tevent_req_error(req, map_errno_from_nt_status(status));
685 0 : return;
686 : }
687 :
688 0 : if (state->s->mds_es_ctx == NULL || state->s->slq == NULL) {
689 0 : tevent_req_done(req);
690 0 : return;
691 : }
692 :
693 0 : subreq = http_read_response_send(state,
694 : state->ev,
695 0 : state->s->mds_es_ctx->http_conn,
696 : SL_PAGESIZE * 8192);
697 0 : if (tevent_req_nomem(subreq, req)) {
698 0 : return;
699 : }
700 0 : tevent_req_set_callback(subreq, mds_es_search_http_read_done, req);
701 : }
702 :
703 0 : static void mds_es_search_http_read_done(struct tevent_req *subreq)
704 : {
705 0 : struct tevent_req *req = tevent_req_callback_data(
706 : subreq, struct tevent_req);
707 0 : struct mds_es_search_state *state = tevent_req_data(
708 : req, struct mds_es_search_state);
709 0 : struct sl_es_search *s = state->s;
710 0 : struct sl_query *slq = s->slq;
711 0 : json_t *root = NULL;
712 0 : json_t *matches = NULL;
713 0 : json_t *match = NULL;
714 : size_t i;
715 : json_error_t error;
716 : size_t hits;
717 : NTSTATUS status;
718 : int ret;
719 : bool ok;
720 :
721 0 : DBG_DEBUG("Got response for search [%p]\n", s);
722 :
723 0 : status = http_read_response_recv(subreq, state, &state->http_response);
724 0 : TALLOC_FREE(subreq);
725 0 : if (!NT_STATUS_IS_OK(status)) {
726 0 : DBG_DEBUG("HTTP response failed: %s\n", nt_errstr(status));
727 0 : tevent_req_error(req, map_errno_from_nt_status(status));
728 0 : return;
729 : }
730 :
731 0 : if (slq == NULL || s->mds_es_ctx == NULL) {
732 0 : tevent_req_done(req);
733 0 : return;
734 : }
735 :
736 0 : switch (state->http_response->response_code) {
737 0 : case 200:
738 0 : break;
739 0 : default:
740 0 : DBG_ERR("HTTP server response: %u\n",
741 : state->http_response->response_code);
742 0 : goto fail;
743 : }
744 :
745 0 : DBG_DEBUG("JSON response:\n%s\n",
746 : talloc_strndup(talloc_tos(),
747 : (char *)state->http_response->body.data,
748 : state->http_response->body.length));
749 :
750 0 : root = json_loadb((char *)state->http_response->body.data,
751 0 : state->http_response->body.length,
752 : 0,
753 : &error);
754 0 : if (root == NULL) {
755 0 : DBG_ERR("json_loadb failed\n");
756 0 : goto fail;
757 : }
758 :
759 0 : if (s->total == 0) {
760 : /*
761 : * Get the total number of results the first time, format
762 : * used by Elasticsearch 7.0 or newer
763 : */
764 0 : ret = json_unpack(root, "{s: {s: {s: i}}}",
765 : "hits", "total", "value", &s->total);
766 0 : if (ret != 0) {
767 : /* Format used before 7.0 */
768 0 : ret = json_unpack(root, "{s: {s: i}}",
769 : "hits", "total", &s->total);
770 0 : if (ret != 0) {
771 0 : DBG_ERR("json_unpack failed\n");
772 0 : goto fail;
773 : }
774 : }
775 :
776 0 : DBG_DEBUG("Total: %zu\n", s->total);
777 :
778 0 : if (s->total == 0) {
779 0 : json_decref(root);
780 0 : tevent_req_done(req);
781 0 : return;
782 : }
783 : }
784 :
785 0 : if (s->max == 0 || s->max > s->total) {
786 0 : s->max = s->total;
787 : }
788 :
789 0 : ret = json_unpack(root, "{s: {s:o}}",
790 : "hits", "hits", &matches);
791 0 : if (ret != 0 || matches == NULL) {
792 0 : DBG_ERR("json_unpack hits failed\n");
793 0 : goto fail;
794 : }
795 :
796 0 : hits = json_array_size(matches);
797 0 : if (hits == 0) {
798 0 : DBG_ERR("Hu?! No results?\n");
799 0 : goto fail;
800 : }
801 0 : DBG_DEBUG("Hits: %zu\n", hits);
802 :
803 0 : for (i = 0; i < hits && s->from + i < s->max; i++) {
804 0 : const char *path = NULL;
805 :
806 0 : match = json_array_get(matches, i);
807 0 : if (match == NULL) {
808 0 : DBG_ERR("Hu?! No value for index %zu\n", i);
809 0 : goto fail;
810 : }
811 0 : ret = json_unpack(match,
812 : "{s: {s: {s: s}}}",
813 : "_source",
814 : "path",
815 : "real",
816 : &path);
817 0 : if (ret != 0) {
818 0 : DBG_ERR("Missing path.real in JSON result\n");
819 0 : goto fail;
820 : }
821 :
822 0 : ok = mds_add_result(slq, path);
823 0 : if (!ok) {
824 0 : DBG_ERR("error adding result for path: %s\n", path);
825 0 : goto fail;
826 : }
827 : }
828 0 : json_decref(root);
829 :
830 0 : s->from += hits;
831 0 : slq->state = SLQ_STATE_RESULTS;
832 0 : tevent_req_done(req);
833 0 : return;
834 :
835 0 : fail:
836 0 : if (root != NULL) {
837 0 : json_decref(root);
838 : }
839 0 : slq->state = SLQ_STATE_ERROR;
840 0 : tevent_req_error(req, EINVAL);
841 0 : return;
842 : }
843 :
844 0 : static int mds_es_search_recv(struct tevent_req *req)
845 : {
846 0 : return tevent_req_simple_recv_unix(req);
847 : }
848 :
849 0 : static bool mds_es_search_cont(struct sl_query *slq)
850 : {
851 0 : struct sl_es_search *s = talloc_get_type_abort(
852 : slq->backend_private, struct sl_es_search);
853 :
854 0 : SLQ_DEBUG(10, slq, "continue");
855 0 : DLIST_ADD_END(s->mds_es_ctx->searches, s);
856 0 : return mds_es_next_search_trigger(s->mds_es_ctx);
857 : }
858 :
859 : struct mdssvc_backend mdsscv_backend_es = {
860 : .init = mdssvc_es_init,
861 : .shutdown = mdssvc_es_shutdown,
862 : .connect = mds_es_connect,
863 : .search_start = mds_es_search,
864 : .search_cont = mds_es_search_cont,
865 : };
|