33 size_t backpressure = 0;
48 struct bus *b =
self->bus;
53 time_t last_sec = (time_t)-1;
65 "timestamp failure: %d", errno);
67 time_t cur_sec = now.tv_sec;
68 if (cur_sec != last_sec) {
82 "poll res %d", poll_res);
92 }
else if (poll_res > 0) {
106 if (self->tracked_fds > 0) {
108 "%d connections still open!", self->tracked_fds);
121 bool any_work =
false;
124 "tick... %p: %d of %d msgs in use, %d of %d rx_info in use, %d tracked_fds",
130 printf(
" -- msg %d: type %d\n", i, l->
msgs[i].
type);
139 switch (info->
state) {
151 "gettimeofday failure in tick_handler!", b->
udata);
158 "timing out hold info %p -- <fd:%d, seq_id:%lld> at (%ld.%ld)",
159 (
void*)info, info->
u.
hold.fd, (
long long)info->
u.
hold.seq_id,
160 (
long)cur.tv_sec, (
long)cur.tv_usec);
165 "decrementing countdown on info %p [%u]: %ld",
174 "retrying RX event delivery", b->
udata);
178 "cleaning up completed RX event at info %p", (
void*)info);
182 "notifying of rx failure -- error %d (info %p)",
183 info->
u.
expect.error, (
void*)info);
191 "gettimeofday failure in tick_handler!", b->
udata);
196 "notifying of rx failure -- timeout (info %p) -- "
197 "<fd:%d, seq_id:%lld>, from time (queued:%ld.%ld) to (sent:%ld.%ld) to (now:%ld.%ld)",
201 (
long)cur.tv_sec, (
long)cur.tv_usec);
207 "decrementing countdown on info %p [%u]: %ld",
214 "match fail %d on line %d", info->
state, __LINE__);
218 if (!any_work) { l->
is_idle =
true; }
225 printf(
" -- state: %d, info[%d]: timeout %ld",
229 printf(
", fd %d, seq_id %lld, has_result? %d\n",
230 info->
u.
hold.fd, (
long long)info->
u.
hold.seq_id, info->
u.
hold.has_result);
235 printf(
", box %p (fd:%d, seq_id:%lld), error %d, has_result? %d\n",
236 (
void *)box, box ? box->
fd : -1, box ? (
long long)box->
out_seq_id : -1,
241 printf(
", INACTIVE (next: %d)\n", info->
next ? info->
next->
id : -1);
256 "releasing box %p at line %d", (
void*)box, __LINE__);
260 size_t backpressure = 0;
264 "successfully delivered box %p (seq_id %lld) from info %d at line %d (retry)",
265 (
void*)box, (
long long)box->
out_seq_id, info->
id, __LINE__);
270 "returning box %p at line %d", (
void*)box, __LINE__);
282 "info %p, box is %p at line %d", (
void*)info,
283 (
void*)info->
u.
expect.box, __LINE__);
286 size_t backpressure = 0;
291 printf(
"*** info %d: info->timeout %ld\n",
293 printf(
" info->error %d\n", info->
u.
expect.error);
294 printf(
" info->box == %p\n", (
void*)box);
295 printf(
" info->box->result.status == %d\n", box->
result.
status);
296 printf(
" info->box->fd %d\n", box->
fd);
297 printf(
" info->box->out_seq_id %lld\n", (
long long)box->
out_seq_id);
298 printf(
" info->box->out_msg %p\n", (
void*)box->
out_msg);
303 "releasing box %p at line %d", (
void*)box, __LINE__);
309 "returning box %p at line %d", (
void*)box, __LINE__);
322 size_t backpressure = 0;
329 info->
u.
expect.box->result.status = status;
334 "releasing box %p at line %d", (
void*)box, __LINE__);
337 "delivered box %p with failure message %d at line %d (info %p)",
338 (
void*)box, status, __LINE__, (
void*)info);
354 if (ci->
fd == fd) {
return ci; }
363 "releasing RX info %d (%p), state %d", info->
id, (
void *)info, info->
state);
367 switch (info->
state) {
370 " -- releasing HOLD: has result? %d", info->
u.
hold.has_result);
371 if (info->
u.
hold.has_result) {
377 if (info->
u.
hold.result.ok) {
378 void *msg = info->
u.
hold.result.u.success.msg;
379 int64_t seq_id = info->
u.
hold.result.u.success.seq_id;
384 "CALLING UNEXPECTED_MSG_CB ON RESULT %p", (
void *)&info->
u.
hold.result);
388 "LEAKING RESULT %p", (
void *)&info->
u.
hold.result);
404 "releasing rx_info_t %d (%p), was %d",
405 info->
id, (
void *)info, info->
state);
409 memset(&info->
u, 0,
sizeof(info->
u));
415 "rx_info_max_used--, from %d to %d",
449 if (nsize < l->read_buf_size) {
return true; }
451 uint8_t *nbuf = realloc(l->
read_buf, nsize);
455 "Read buffer realloc success, %p (%zd) to %p (%zd)",
472 "attempting delivery of %p", (
void*)box);
474 "releasing box %p at line %d", (
void*)box, __LINE__);
482 "unexpected status for completed RX event at info +%d, box %p, status %d",
483 info->
id, (
void *)box, result->
status);
487 switch (info->
state) {
490 unpacked_result = info->
u.
expect.result;
499 int64_t seq_id = unpacked_result.
u.
success.seq_id;
500 void *opaque_msg = unpacked_result.
u.
success.msg;
502 result->
u.
response.opaque_msg = opaque_msg;
505 size_t backpressure = 0;
510 "successfully delivered box %p (seq_id:%lld), marking info %d as DONE",
511 (
void*)box, (
long long)seq_id, info->
id);
514 "initial clean-up attempt for completed RX event at info +%d", info->
id);
519 "returning box %p at line %d", (
void*)box, __LINE__);
532 uint16_t msg_fill_pressure = 0;
535 msg_fill_pressure = 0;
544 uint16_t rx_info_fill_pressure = 0;
546 rx_info_fill_pressure = 0;
559 "lbp: %u, %u (iu %u), %u",
560 msg_fill_pressure, rx_info_fill_pressure, l->
rx_info_in_use, threadpool_fill_pressure);
562 return msg_fill_pressure + rx_info_fill_pressure
563 + threadpool_fill_pressure;
bool Util_IsResumableIOError(int errno_)
#define MAX_QUEUE_MESSAGES
Max number of unprocessed queue messages.
Record in table for partially processed messages.
struct rx_info_t::@12::@13 hold
listener_msg * msg_freelist
#define LISTENER_TASK_TIMEOUT_DELAY
How long the listener should wait for responses before becoming idle and blocking.
bool Bus_ProcessBoxedMessage(struct bus *b, struct boxed_msg *box, size_t *backpressure)
Deliver a boxed message to the thread pool to execute.
static connection_info * get_connection_info(struct listener *l, int fd)
A queue message, with a command in the tagged union.
#define MSG_BP_1QTR
Coefficients for backpressure based on certain conditions.
union bus_unpack_cb_res_t::@0 u
bus_msg_result_t result
Result message, constructed in place after the request/response cycle has completed or failed due to ...
static void observe_backpressure(listener *l, size_t backpressure)
bus_unexpected_msg_cb * unexpected_msg_cb
connection_info * fd_info[1000]
The connection info, corresponding to the the file descriptors tracked in l->fds. ...
uint16_t ListenerTask_GetBackpressure(struct listener *l)
Get the current backpressure from the listener.
void * ListenerTask_MainLoop(void *arg)
Listener's main loop – function pointer for pthread start function.
struct timeval tv_send_start
Event timestamps to track timeouts.
void * udata
User data for callbacks.
struct listener_msg * next
bool ListenerTask_GrowReadBuf(listener *l, size_t nsize)
Grow the listener's read buffer to NSIZE.
void ListenerTask_ReleaseRXInfo(struct listener *l, rx_info_t *info)
Release an INFO to the listener's info pool.
void ListenerCmd_CheckIncomingMessages(listener *l, int *res)
Process incoming commands, if any.
rx_info_t * rx_info_freelist
#define LISTENER_SHUTDOWN_COMPLETE_FD
uint16_t tracked_fds
FDs currently tracked by listener.
void * udata
user connection data
void ListenerTask_ReleaseMsg(struct listener *l, listener_msg *msg)
Release a message to the listener's message pool.
void ListenerCmd_NotifyCaller(listener *l, int fd)
Notify the listener's caller that a command has completed.
static void tick_handler(listener *l)
rx_info_t rx_info[(1024)]
void ListenerTask_AttemptDelivery(listener *l, struct rx_info_t *info)
Attempt delivery of the message boxed in INFO.
struct bus_msg_result_t::@3::@5 response
Per-socket connection context.
uint16_t rx_info_max_used
#define BUS_LOG_SNPRINTF(B, LEVEL, EVENT_KEY, UDATA, MAX_SZ, FMT,...)
void ListenerTask_DumpRXInfoTable(listener *l)
Dump the RX info table.
struct rx_info_t::@12::@14 expect
struct timeval tv_send_done
static void clean_up_completed_info(listener *l, rx_info_t *info)
#define INCOMING_MSG_PIPE
Offset to account for the first file descriptor being the incoming message pipe.
#define ATOMIC_BOOL_COMPARE_AND_SWAP(PTR, OLD, NEW)
void ListenerTask_NotifyMessageFailure(listener *l, rx_info_t *info, bus_send_status_t status)
Notify the client that the event in INFO has failed with STATUS.
#define INFINITE_DELAY
Special value meaning poll should block indefinitely.
union bus_msg_result_t::@3 u
struct bus_unpack_cb_res_t::@0::@1 success
#define BUS_ASSERT(B, UDATA, COND)
static void retry_delivery(listener *l, rx_info_t *info)
#define LISTENER_NO_FD
Sentinel values used for listener.shutdown_notify_fd.
bool Util_Timestamp(struct timeval *tv, bool relative)
int fd
Destination filename and message body.
size_t upstream_backpressure
void ListenerIO_AttemptRecv(listener *l, int available)
int syscall_poll(struct pollfd fds[], nfds_t nfds, int timeout)
Wrappers for syscalls, to allow mocking for testing.
#define MAX_PENDING_MESSAGES
#define BUS_LOG(B, LEVEL, EVENT_KEY, MSG, UDATA)