kinetic-c  v0.12.0
Seagate Kinetic Protocol Client Library for C
bus.c
Go to the documentation of this file.
1 /*
2 * kinetic-c
3 * Copyright (C) 2015 Seagate Technology.
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20 #include <stdlib.h>
21 #include <stdio.h>
22 #include <string.h>
23 #include <unistd.h>
24 #include <pthread.h>
25 #include <errno.h>
26 #include <assert.h>
27 #include <limits.h>
28 #include <sys/resource.h>
29 
30 #include "bus.h"
31 #include "bus_poll.h"
32 #include "send.h"
33 #include "listener.h"
34 #include "threadpool.h"
35 #include "bus_internal_types.h"
36 #include "bus_ssl.h"
37 #include "util.h"
38 #include "yacht.h"
39 #include "syscall.h"
40 #include "atomic.h"
41 
42 #include "kinetic_types_internal.h"
43 #include "listener_task.h"
44 
45 static int listener_id_of_socket(struct bus *b, int fd);
46 static void noop_log_cb(log_event_t event,
47  int log_level, const char *msg, void *udata);
48 static void noop_error_cb(bus_unpack_cb_res_t result, void *socket_udata);
49 static bool attempt_to_increase_resource_limits(struct bus *b);
50 
51 static void set_defaults(bus_config *cfg) {
52  if (cfg->listener_count == 0) { cfg->listener_count = 1; }
53 }
54 
55 #ifdef TEST
56 boxed_msg *test_box = NULL;
57 void *value = NULL;
58 void *old_value = NULL;
59 connection_info *test_ci = NULL;
60 int completion_pipe = -1;
61 void *unused = NULL;
62 #endif
63 
64 bool Bus_Init(bus_config *config, struct bus_result *res) {
65  if (res == NULL) { return false; }
66  if (config == NULL) {
68  return false;
69  }
70  set_defaults(config);
71  if (config->sink_cb == NULL) {
73  return false;
74  }
75  if (config->unpack_cb == NULL) {
77  return false;
78  }
79  if (config->log_cb == NULL) {
80  config->log_cb = noop_log_cb;
81  config->log_level = INT_MIN;
82  }
83  if (config->error_cb == NULL) {
84  config->error_cb = noop_error_cb;
85  }
86 
88 
89  uint8_t locks_initialized = 0;
90  struct listener **ls = NULL; /* listeners */
91  struct threadpool *tp = NULL;
92  bool *joined = NULL;
93  pthread_t *threads = NULL;
94  struct yacht *fd_set = NULL;
95 
96  bus *b = calloc(1, sizeof(*b));
97  if (b == NULL) { goto cleanup; }
98 
99  if (!BusSSL_Init(b)) { goto cleanup; }
100 
101  b->sink_cb = config->sink_cb;
102  b->unpack_cb = config->unpack_cb;
104  b->error_cb = config->error_cb;
105  b->log_cb = config->log_cb;
106  b->log_level = config->log_level;
107  b->udata = config->bus_udata;
108  if (0 != pthread_mutex_init(&b->fd_set_lock, NULL)) {
110  goto cleanup;
111  }
112  locks_initialized++;
113 
115 
117  "Initialized bus at %p", (void*)b);
118 
119  ls = calloc(config->listener_count, sizeof(*ls));
120  if (ls == NULL) {
121  goto cleanup;
122  }
123 
124  for (int i = 0; i < config->listener_count; i++) {
125  ls[i] = Listener_Init(b, config);
126  if (ls[i] == NULL) {
128  goto cleanup;
129  } else {
131  "Initialized listener %d at %p", i, (void*)ls[i]);
132  }
133  }
134 
135  tp = Threadpool_Init(&config->threadpool_cfg);
136  if (tp == NULL) {
138  goto cleanup;
139  }
140 
141  int thread_count = config->listener_count;
142  joined = calloc(thread_count, sizeof(bool));
143  threads = calloc(thread_count, sizeof(pthread_t));
144  if (joined == NULL || threads == NULL) {
145  goto cleanup;
146  }
147 
148  fd_set = Yacht_Init(DEF_FD_SET_SIZE2);
149  if (fd_set == NULL) {
150  goto cleanup;
151  }
152 
153  b->listener_count = config->listener_count;
154  b->listeners = ls;
155  b->threadpool = tp;
156  b->joined = joined;
157  b->threads = threads;
158 
159  for (int i = 0; i < b->listener_count; i++) {
160  int pcres = pthread_create(&b->threads[i], NULL,
161  ListenerTask_MainLoop, (void *)b->listeners[i]);
162  if (pcres != 0) {
164  goto cleanup;
165  }
166  }
167 
168  b->fd_set = fd_set;
169  res->bus = b;
170  BUS_LOG(b, 2, LOG_INITIALIZATION, "initialized", config->bus_udata);
171  return true;
172 
173 cleanup:
174  if (ls) {
175  for (int i = 0; i < config->listener_count; i++) {
176  if (ls[i]) { Listener_Free(ls[i]); }
177  }
178  free(ls);
179  }
180  if (tp) { Threadpool_Free(tp); }
181  if (joined) { free(joined); }
182  if (b) {
183  if (locks_initialized > 1) {
184  pthread_mutex_destroy(&b->fd_set_lock);
185  }
186  free(b);
187  }
188 
189  if (threads) { free(threads); }
190  if (fd_set) { Yacht_Free(fd_set, NULL, NULL); }
191 
192  return false;
193 }
194 
195 static bool attempt_to_increase_resource_limits(struct bus *b) {
196  struct rlimit info;
197  if (-1 == getrlimit(RLIMIT_NOFILE, &info)) {
198  fprintf(stderr, "getrlimit: %s", strerror(errno));
199  errno = 0;
200  return false;
201  }
202 
203  const unsigned int nval = 1024;
204 
205  if (info.rlim_cur < nval && info.rlim_max > nval) {
206  info.rlim_cur = nval;
207  BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 256,
208  "Current FD resource limits, [%lu, %lu], changing to %u",
209  (unsigned long)info.rlim_cur, (unsigned long)info.rlim_max, nval);
210  if (-1 == setrlimit(RLIMIT_NOFILE, &info)) {
211  BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 256,
212  "Failed to increase FD resource limit to %u, %s",
213  nval, strerror(errno));
214  fprintf(stderr, "getrlimit: %s", strerror(errno));
215  errno = 0;
216  return false;
217  } else {
218  BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 256,
219  "Successfully increased FD resource limit to %u", nval);
220  }
221  } else {
222  BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 256,
223  "Current FD resource limits [%lu, %lu] are acceptable",
224  (unsigned long)info.rlim_cur, (unsigned long)info.rlim_max);
225  }
226  return true;
227 }
228 
229 /* Pack message to deliver on behalf of the user into an envelope
230  * that can track status / routing along the way.
231  *
232  * The box should only ever be accessible on a single thread at a time. */
233 static boxed_msg *box_msg(struct bus *b, bus_user_msg *msg) {
234  boxed_msg *box = NULL;
235  #ifdef TEST
236  box = test_box;
237  #else
238  box = calloc(1, sizeof(*box));
239  #endif
240  if (box == NULL) { return NULL; }
241 
242  BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 64,
243  "Allocated boxed message -- %p", (void*)box);
244 
245  box->fd = msg->fd;
246  assert(msg->fd != 0);
247 
248  /* Lock hash table and check whether this FD uses SSL. */
249  if (0 != pthread_mutex_lock(&b->fd_set_lock)) { assert(false); }
250 #ifndef TEST
251  void *value = NULL;
252 #endif
253  connection_info *ci = NULL;
254  if (Yacht_Get(b->fd_set, box->fd, &value)) {
255  ci = (connection_info *)value;
256  }
257  if (0 != pthread_mutex_unlock(&b->fd_set_lock)) { assert(false); }
258 
259  if (ci == NULL) {
260  /* socket isn't registered, fail out */
261  BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 64,
262  "socket isn't registered, failing -- %p", (void*)box);
263  free(box);
264  return NULL;
265  } else {
266  box->ssl = ci->ssl;
267  }
268 
269  if ((msg->seq_id <= ci->largest_wr_seq_id_seen)
271  BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 256,
272  "rejecting request <fd:%d, seq_id:%lld> due to non-monotonic sequence ID, largest seen is %lld",
273  box->fd, (long long)msg->seq_id, (long long)ci->largest_wr_seq_id_seen);
274  free(box);
275  return NULL;
276  } else {
277  ci->largest_wr_seq_id_seen = msg->seq_id;
278  }
279 
280  box->timeout_sec = (time_t)msg->timeout_sec;
281  if (box->timeout_sec == 0) {
283  }
284 
285  box->out_seq_id = msg->seq_id;
286  box->out_msg_size = msg->msg_size;
287 
288  /* Store message by pointer, since the client code calling in is
289  * blocked until we are done sending. */
290  box->out_msg = msg->msg;
291 
292  box->cb = msg->cb;
293  box->udata = msg->udata;
294  return box;
295 }
296 
297 bool Bus_SendRequest(struct bus *b, bus_user_msg *msg)
298 {
299  if (b == NULL || msg == NULL || msg->fd == -1) {
300  return false;
301  }
302 
303  boxed_msg *box = box_msg(b, msg);
304  if (box == NULL) {
305  return false;
306  }
307 
309  "Sending request <fd:%d, seq_id:%lld>", msg->fd, (long long)msg->seq_id);
310  bool res = Send_DoBlockingSend(b, box);
312  "...request sent, result %d", res);
313 
314  /* The send was rejected -- free the box, but don't call the error
315  * handling callback. */
316  if (!res) {
318  "Freeing box since request was rejected: %p", (void *)box);
319  free(box);
320  }
321 
322  return res;
323 }
324 
325 static int listener_id_of_socket(struct bus *b, int fd) {
326  /* Just evenly divide sockets between listeners by file descriptor. */
327  return fd % b->listener_count;
328 }
329 
330 struct listener *Bus_GetListenerForSocket(struct bus *b, int fd) {
331  return b->listeners[listener_id_of_socket(b, fd)];
332 }
333 
334 /* Get the string key for a log event ID. */
335 const char *Bus_LogEventStr(log_event_t event) {
336  switch (event) {
337  case LOG_INITIALIZATION: return "INITIALIZATION";
338  case LOG_NEW_CLIENT: return "NEW_CLIENT";
339  case LOG_SOCKET_REGISTERED: return "SOCKET_REGISTERED";
340  case LOG_SENDING_REQUEST: return "SEND_REQUEST";
341  case LOG_SHUTDOWN: return "SHUTDOWN";
342  case LOG_SENDER: return "SENDER";
343  case LOG_LISTENER: return "LISTENER";
344  case LOG_MEMORY: return "MEMORY";
345  default:
346  return "UNKNOWN";
347  }
348 }
349 
350 bool Bus_RegisterSocket(struct bus *b, bus_socket_t type, int fd, void *udata) {
351  /* Register a socket internally with the listener. */
352  int l_id = listener_id_of_socket(b, fd);
353 
355  "registering socket %d", fd);
356 
357  /* Spread sockets throughout the different listener threads. */
358  struct listener *l = b->listeners[l_id];
359 
360  /* Metadata about the connection. Note: This will be shared by the
361  * client thread and the listener thread, but each will only modify
362  * some of the fields. The client thread will free this. */
363  #ifdef TEST
364  connection_info *ci = test_ci;
365  #else
366  connection_info *ci = calloc(1, sizeof(*ci));
367  #endif
368  if (ci == NULL) { goto cleanup; }
369 
370  SSL *ssl = NULL;
371  if (type == BUS_SOCKET_SSL) {
372  ssl = BusSSL_Connect(b, fd);
373  if (ssl == NULL) { goto cleanup; }
374  } else {
375  ssl = BUS_NO_SSL;
376  }
377 
378  *(int *)&ci->fd = fd;
379  *(bus_socket_t *)&ci->type = type;
380  ci->ssl = ssl;
381  ci->udata = udata;
383 
384  #ifndef TEST
385  void *old_value = NULL;
386  #endif
387  /* Lock hash table and save whether this FD uses SSL. */
388  if (0 != pthread_mutex_lock(&b->fd_set_lock)) { assert(false); }
389  bool set_ok = Yacht_Set(b->fd_set, fd, ci, &old_value);
390  if (0 != pthread_mutex_unlock(&b->fd_set_lock)) { assert(false); }
391 
392  if (set_ok) {
393  assert(old_value == NULL);
394  } else {
395  goto cleanup;
396  }
397 
398  bool res = false;
399  #ifndef TEST
400  int completion_pipe = -1;
401  #endif
402  res = Listener_AddSocket(l, ci, &completion_pipe);
403  if (!res) { goto cleanup; }
404 
405  BUS_LOG(b, 2, LOG_SOCKET_REGISTERED, "polling on socket add...", b->udata);
406  bool completed = BusPoll_OnCompletion(b, completion_pipe);
407  if (!completed) { goto cleanup; }
408 
409  BUS_LOG(b, 2, LOG_SOCKET_REGISTERED, "successfully added socket", b->udata);
410  return true;
411 cleanup:
412  if (ci) {
413  free(ci);
414  }
415  BUS_LOG(b, 2, LOG_SOCKET_REGISTERED, "failed to add socket", b->udata);
416  return false;
417 }
418 
419 /* Free metadata about a socket that has been disconnected. */
420 bool Bus_ReleaseSocket(struct bus *b, int fd, void **socket_udata_out) {
421  int l_id = listener_id_of_socket(b, fd);
422 
424  "forgetting socket %d", fd);
425 
426  struct listener *l = b->listeners[l_id];
427 
428  #ifndef TEST
429  int completion_pipe = -1;
430  #endif
431  if (!Listener_RemoveSocket(l, fd, &completion_pipe)) {
432  return false; /* couldn't send msg to listener */
433  }
434 
435  assert(completion_pipe != -1);
436  bool completed = BusPoll_OnCompletion(b, completion_pipe);
437  if (!completed) { /* listener hung up while waiting */
438  return false;
439  }
440 
441  /* Lock hash table and forget whether this FD uses SSL. */
442  #ifndef TEST
443  void *old_value = NULL;
444  #endif
445  if (0 != pthread_mutex_lock(&b->fd_set_lock)) { assert(false); }
446  bool rm_ok = Yacht_Remove(b->fd_set, fd, &old_value);
447  if (0 != pthread_mutex_unlock(&b->fd_set_lock)) { assert(false); }
448  if (!rm_ok) {
449  return false;
450  }
451 
452  connection_info *ci = (connection_info *)old_value;
453  assert(ci != NULL);
454 
455  if (socket_udata_out) { *socket_udata_out = ci->udata; }
456 
457  bool res = false;
458 
459  if (ci->ssl == BUS_NO_SSL) {
460  res = true; /* nothing else to do */
461  } else {
462  res = BusSSL_Disconnect(b, ci->ssl);
463  }
464 
465  free(ci);
466  return res;
467 }
468 
469 #ifndef TEST
470 static
471 #endif
472 void free_connection_cb(void *value, void *udata) {
473  struct bus *b = (struct bus *)udata;
474  connection_info *ci = (connection_info *)value;
475 
476  int l_id = listener_id_of_socket(b, ci->fd);
477  struct listener *l = b->listeners[l_id];
478 
479  #ifndef TEST
480  int completion_pipe = -1;
481  #endif
482  if (!Listener_RemoveSocket(l, ci->fd, &completion_pipe)) {
483  return; /* couldn't send msg to listener */
484  }
485 
486  bool completed = BusPoll_OnCompletion(b, completion_pipe);
487  if (!completed) {
488  return;
489  }
490 
491  free(ci);
492 }
493 
494 bool Bus_Shutdown(bus *b) {
495  for (;;) {
497  /* Another thread is already shutting things down. */
498  if (ss != SHUTDOWN_STATE_RUNNING) { return false; }
501  break;
502  }
503  }
504 
505  if (b->fd_set) {
506  BUS_LOG(b, 2, LOG_SHUTDOWN, "removing all connections", b->udata);
508  b->fd_set = NULL;
509  }
510 
511  #ifndef TEST
512  int completion_pipe = -1;
513  #endif
514 
515  BUS_LOG(b, 2, LOG_SHUTDOWN, "shutting down listener threads", b->udata);
516  for (int i = 0; i < b->listener_count; i++) {
517  if (!b->joined[i]) {
518  BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
519  "Listener_Shutdown -- %d", i);
520  if (!Listener_Shutdown(b->listeners[i], &completion_pipe)) {
522  return false;
523  }
524 
525  if (!BusPoll_OnCompletion(b, completion_pipe)) {
527  return false;
528  }
529 
530  BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
531  "Listener_Shutdown -- joining %d", i);
532  #ifndef TEST
533  void *unused = NULL;
534  #endif
535  int res = syscall_pthread_join(b->threads[i], &unused);
536  BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
537  "Listener_Shutdown -- joined %d", i);
538  if (res != 0) {
540  return false;
541  }
542  b->joined[i] = true;
543  }
544  }
545 
546  BUS_LOG(b, 2, LOG_SHUTDOWN, "done with shutdown", b->udata);
548  return true;
549 }
550 
551 void Bus_BackpressureDelay(struct bus *b, size_t backpressure, uint8_t shift) {
552  /* Push back if message bus is too busy. */
553  backpressure >>= shift;
554 
555  if (backpressure > 0) {
556  BUS_LOG_SNPRINTF(b, 8, LOG_SENDER, b->udata, 64,
557  "backpressure %zd", backpressure);
558  syscall_poll(NULL, 0, backpressure);
559  }
560 }
561 
562 static void box_execute_cb(void *udata) {
563  boxed_msg *box = (boxed_msg *)udata;
564 
565  void *out_udata = box->udata;
566  bus_msg_result_t res = box->result;
567  bus_msg_cb *cb = box->cb;
568 
569  free(box);
570  cb(&res, out_udata);
571 }
572 
573 static void box_cleanup_cb(void *udata) {
574  boxed_msg *box = (boxed_msg *)udata;
575  free(box);
576 }
577 
578 /* Deliver a boxed message to the thread pool to execute.
579  * The boxed message will be freed by the threadpool. */
581  struct boxed_msg *box, size_t *backpressure) {
582  assert(box);
583  assert(box->result.status != BUS_SEND_UNDEFINED);
584 
585  struct threadpool_task task = {
586  .task = box_execute_cb,
587  .cleanup = box_cleanup_cb,
588  .udata = box,
589  };
590 
591  BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128,
592  "Scheduling boxed message -- %p -- where it will be freed", (void*)box);
593  return Threadpool_Schedule(b->threadpool, &task, backpressure);
594 }
595 
596 /* How many seconds should it give the thread pool to shut down? */
597 #define THREAD_SHUTDOWN_SECONDS 5
598 
599 void Bus_Free(bus *b) {
600  if (b == NULL) { return; }
601  while (b->shutdown_state != SHUTDOWN_STATE_HALTED) {
602  if (Bus_Shutdown(b)) { break; }
603  syscall_poll(NULL, 0, 10); // sleep 10 msec
604  }
605 
606  for (int i = 0; i < b->listener_count; i++) {
607  BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
608  "Listener_Free -- %d", i);
609  Listener_Free(b->listeners[i]);
610  }
611  free(b->listeners);
612 
613  int limit = (1000 * THREAD_SHUTDOWN_SECONDS)/10;
614  for (int i = 0; i < limit; i++) {
615  BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
616  "Threadpool_Shutdown -- %d", i);
617  if (Threadpool_Shutdown(b->threadpool, false)) { break; }
618  (void)syscall_poll(NULL, 0, 10);
619 
620  if (i == limit - 1) {
621  BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
622  "Threadpool_Shutdown -- %d (forced)", i);
624  }
625  }
626  BUS_LOG(b, 3, LOG_SHUTDOWN, "Threadpool_Free", b->udata);
628  free(b->joined);
629  free(b->threads);
630  pthread_mutex_destroy(&b->fd_set_lock);
631 
632  BusSSL_CtxFree(b);
633  free(b);
634 }
635 
636 static void noop_log_cb(log_event_t event,
637  int log_level, const char *msg, void *udata) {
638  (void)event;
639  (void)log_level;
640  (void)msg;
641  (void)udata;
642 }
643 
644 static void noop_error_cb(bus_unpack_cb_res_t result, void *socket_udata) {
645  (void)result;
646  (void)socket_udata;
647 }
Bus_Init_res_t status
Definition: bus_types.h:210
time_t timeout_sec
Message send timeout.
void Threadpool_Free(struct threadpool *t)
Free a threadpool.
Definition: threadpool.c:200
size_t msg_size
Definition: bus_types.h:242
static void free_connection_cb(void *value, void *udata)
Definition: bus.c:472
bool Bus_Shutdown(bus *b)
Begin shutting the system down.
Definition: bus.c:494
static int listener_id_of_socket(struct bus *b, int fd)
Definition: bus.c:325
bool Bus_ProcessBoxedMessage(struct bus *b, struct boxed_msg *box, size_t *backpressure)
Deliver a boxed message to the thread pool to execute.
Definition: bus.c:580
log_event_t
Definition: bus_types.h:95
int64_t largest_wr_seq_id_seen
Set by client thread.
static size_t limit
bool Bus_ReleaseSocket(struct bus *b, int fd, void **socket_udata_out)
Free metadata about a socket that has been disconnected.
Definition: bus.c:420
bool Listener_RemoveSocket(struct listener *l, int fd, int *notify_fd)
Definition: listener.c:105
const bus_socket_t type
bool BusSSL_Init(struct bus *b)
Initialize the SSL library internals for use by the messaging bus.
Definition: bus_ssl.c:36
void BusSSL_CtxFree(struct bus *b)
Free all internal data for using SSL (the SSL_CTX).
Definition: bus_ssl.c:80
bus_msg_result_t result
Result message, constructed in place after the request/response cycle has completed or failed due to ...
bus_unpack_cb * unpack_cb
Definition: bus_types.h:170
threadpool_task_cb * task
Definition: threadpool.h:49
struct yacht * Yacht_Init(uint8_t sz2)
Init a hash table with approx.
Definition: yacht.c:41
void * bus_udata
Definition: bus_types.h:177
pthread_mutex_t fd_set_lock
Receiver of responses.
shutdown_state_t
static void noop_log_cb(log_event_t event, int log_level, const char *msg, void *udata)
Definition: bus.c:636
static void box_execute_cb(void *udata)
Definition: bus.c:562
bus_unexpected_msg_cb * unexpected_msg_cb
bool Bus_SendRequest(struct bus *b, bus_user_msg *msg)
Send a request.
Definition: bus.c:297
static void box_cleanup_cb(void *udata)
Definition: bus.c:573
int listener_count
Definition: bus_types.h:165
struct listener * Listener_Init(struct bus *b, struct bus_config *cfg)
Initialize the listener.
Definition: listener.c:39
void Bus_Free(bus *b)
Free internal data structures for the bus.
Definition: bus.c:599
bool * joined
Which threads have joined.
SSL * ssl
SSL handle. Must be valid or BUS_NO_SSL.
bool Threadpool_Shutdown(struct threadpool *t, bool kill_all)
Notify the threadpool's threads that the system is going to shut down soon.
Definition: threadpool.c:163
bus_unpack_cb * unpack_cb
Message unpacking callback.
struct listener ** listeners
Listener array.
void * ListenerTask_MainLoop(void *arg)
Listener's main loop – function pointer for pthread start function.
Definition: listener_task.c:45
shutdown_state_t shutdown_state
Current shutdown state.
Internal threadpool state.
Message bus.
struct threadpool * threadpool
Thread pool.
bool BusSSL_Disconnect(struct bus *b, SSL *ssl)
Disconnect and free an individual SSL handle.
Definition: bus_ssl.c:73
static bool attempt_to_increase_resource_limits(struct bus *b)
Definition: bus.c:195
const char * Bus_LogEventStr(log_event_t event)
Get the string key for a log event ID.
Definition: bus.c:335
bool Listener_AddSocket(struct listener *l, connection_info *ci, int *notify_fd)
Add/remove sockets' metadata from internal info.
Definition: listener.c:94
void * udata
User data for callbacks.
uint8_t listener_count
Number of listeners.
uint8_t * out_msg
bus_unexpected_msg_cb * unexpected_msg_cb
Definition: bus_types.h:171
void * udata
Definition: bus_types.h:246
struct threadpool_config threadpool_cfg
Definition: bus_types.h:166
#define BUS_NO_SEQ_ID
Definition: bus_types.h:39
int64_t out_seq_id
bus_error_cb * error_cb
Error handling callback.
static void noop_error_cb(bus_unpack_cb_res_t result, void *socket_udata)
Definition: bus.c:644
int log_level
Log level.
bus_msg_cb * cb
Definition: bus_types.h:245
bus_send_status_t status
Definition: bus_types.h:216
bool Yacht_Set(struct yacht *y, int key, void *value, void **old_value)
Set KEY to VALUE in the table.
Definition: yacht.c:97
void * udata
user connection data
void( bus_msg_cb)(bus_msg_result_t *res, void *udata)
Definition: bus_types.h:229
int syscall_pthread_join(pthread_t thread, void **value_ptr)
Wrapper for pthread calls.
Definition: syscall.c:77
SSL * ssl
valid pointer or BUS_BOXED_MSG_NO_SSL
#define THREAD_SHUTDOWN_SECONDS
Definition: bus.c:597
void Bus_BackpressureDelay(struct bus *b, size_t backpressure, uint8_t shift)
Provide backpressure by sleeping for (backpressure >> shift) msec, if the value is greater than 0...
Definition: bus.c:551
bus_msg_cb * cb
Callback and userdata to which the bus_msg_result_t above will be sunk.
#define DEF_FD_SET_SIZE2
Starting size^2 for file descriptor hash table.
pthread_t * threads
Threads.
bus_error_cb * error_cb
Definition: bus_types.h:172
bus_socket_t
Definition: bus_types.h:231
Per-socket connection context.
bool BusPoll_OnCompletion(struct bus *b, int fd)
Poll on fd until complete, return true on success or false on IO error.
Definition: bus_poll.c:35
int log_level
Definition: bus_types.h:174
struct threadpool * Threadpool_Init(struct threadpool_config *cfg)
Initialize a threadpool, according to a config.
Definition: threadpool.c:52
#define BUS_LOG_SNPRINTF(B, LEVEL, EVENT_KEY, UDATA, MAX_SZ, FMT,...)
Definition: bus_types.h:59
static boxed_msg * box_msg(struct bus *b, bus_user_msg *msg)
Definition: bus.c:233
#define BUS_DEFAULT_TIMEOUT_SEC
Definition: bus_types.h:36
A task.
Definition: threadpool.h:48
bool Listener_Shutdown(struct listener *l, int *notify_fd)
Shut down the listener.
Definition: listener.c:170
uint16_t timeout_sec
Definition: bus_types.h:243
bool Yacht_Get(struct yacht *y, int key, void **value)
Get KEY from the table, setting *value if found.
Definition: yacht.c:73
struct thread_info * threads
bool Bus_Init(bus_config *config, struct bus_result *res)
Initialize a bus, based on configuration in *config.
Definition: bus.c:64
bus_sink_cb * sink_cb
IO sink callback.
bus_sink_cb * sink_cb
Definition: bus_types.h:169
#define BUS_NO_SSL
Special "NO SSL" value, to distinguish from a NULL SSL handle.
size_t out_msg_size
void Listener_Free(struct listener *l)
Free the listener, which must already be shut down.
Definition: listener.c:179
#define ATOMIC_BOOL_COMPARE_AND_SWAP(PTR, OLD, NEW)
Definition: atomic.h:27
bool Bus_RegisterSocket(struct bus *b, bus_socket_t type, int fd, void *udata)
Register a socket connected to an endpoint, and data that will be passed to all interactions on that ...
Definition: bus.c:350
bool Yacht_Remove(struct yacht *y, int key, void **old_value)
Remove KEY from the table.
Definition: yacht.c:174
struct bus * bus
Definition: bus_types.h:211
bus_log_cb * log_cb
Definition: bus_types.h:175
struct yacht * fd_set
Locked hash table for fd -> connection_info.
SSL * BusSSL_Connect(struct bus *b, int fd)
Do an SSL / TLS shake for a connection.
Definition: bus_ssl.c:51
static void set_defaults(bus_config *cfg)
Definition: bus.c:51
bool Send_DoBlockingSend(bus *b, boxed_msg *box)
Do a blocking send.
Definition: send.c:59
struct listener * Bus_GetListenerForSocket(struct bus *b, int fd)
For a given file descriptor, get the listener ID to use.
Definition: bus.c:330
uint8_t * msg
Definition: bus_types.h:241
int fd
Destination filename and message body.
void Yacht_Free(struct yacht *y, Yacht_Free_cb *cb, void *udata)
Free the table.
Definition: yacht.c:202
bus_log_cb * log_cb
Logging callback.
int syscall_poll(struct pollfd fds[], nfds_t nfds, int timeout)
Wrappers for syscalls, to allow mocking for testing.
Definition: syscall.c:27
bool Threadpool_Schedule(struct threadpool *t, struct threadpool_task *task, size_t *pushback)
Schedule a task in the threadpool.
Definition: threadpool.c:99
#define BUS_LOG(B, LEVEL, EVENT_KEY, MSG, UDATA)
Definition: bus_types.h:45
int64_t seq_id
Definition: bus_types.h:240