38 uint8_t reply_buf[
sizeof(uint8_t) +
sizeof(uint16_t)];
43 if (fd == -1) {
return; }
45 uint8_t reply_buf[
sizeof(uint8_t) +
sizeof(uint16_t)];
51 reply_buf[1] = (uint8_t)(backpressure & 0xff);
52 reply_buf[2] = (uint8_t)((backpressure >> 8) & 0xff);
57 "NotifyCaller on %d with backpressure %u",
60 ssize_t wres =
syscall_write(fd, reply_buf,
sizeof(reply_buf));
61 if (wres ==
sizeof(reply_buf)) {
break; }
77 if (events & (POLLERR | POLLHUP | POLLNVAL)) {
79 "hangup on listener incoming command pipe: %d", events);
83 if (events & POLLIN) {
95 "check_and_flush_incoming_msg_pipe: %s", strerror(errno));
100 for (ssize_t i = 0; i < rd; i++) {
101 uint8_t msg_id = cmd_buf[i];
115 "Handling message -- %p, type %d", (
void*)pmsg, pmsg->
type);
130 msg.
u.
hold.timeout_sec, msg.
u.
hold.notify_fd);
190 swap(l,
id, first_inactive);
221 "removing socket %d", fd);
227 if (removing_pfd.fd == fd) {
228 bool is_active = (removing_pfd.events & POLLIN) > 0;
233 if (is_active &&
id != last_active) {
234 assert(
id < last_active);
235 swap(l,
id, last_active);
259 int16_t timeout_sec,
int notify_fd) {
263 "hold_response <fd:%d, seq_id:%lld>", fd, (
long long)seq_id);
268 "failed to get free rx_info for <fd:%d, seq_id:%lld>, dropping it",
269 fd, (
long long)seq_id);
276 "setting info %p(+%d) to hold response <fd:%d, seq_id:%lld>",
277 (
void *)info, info->
id, fd, (
long long)seq_id);
281 info->
u.
hold.fd = fd;
282 info->
u.
hold.seq_id = seq_id;
283 info->
u.
hold.has_result =
false;
284 memset(&info->
u.
hold.result, 0,
sizeof(info->
u.
hold.result));
292 "notifying to expect response <box:%p, fd:%d, seq_id:%lld>",
303 "converting HOLD to EXPECT for info %d (%p) with result, attempting delivery <box:%p, fd:%d, seq_id:%lld>",
304 info->
id, (
void *)info,
305 (
void *)box, info->
u.
hold.fd, (
long long)info->
u.
hold.seq_id);
310 info->
u.
expect.has_result =
true;
311 info->
u.
expect.result = result;
318 "info %p (%d) with <box:%p, fd:%d, seq_id:%lld> has error %d",
319 (
void *)info, info->
id,
320 (
void *)box, info->
u.
hold.fd, (
long long)info->
u.
hold.seq_id, info->
u.
hold.error);
325 info->
u.
expect.result = result;
330 "converting HOLD to EXPECT info %d (%p), attempting delivery <box:%p, fd:%d, seq_id:%lld>",
331 info->
id, (
void *)info,
332 (
void *)box, info->
u.
hold.fd, (
long long)info->
u.
hold.seq_id);
336 info->
u.
expect.has_result =
false;
time_t timeout_sec
Message send timeout.
Record in table for partially processed messages.
struct rx_info_t::@12::@13 hold
struct pollfd fds[1000+1]
Tracked file descriptors, for polling.
static void shutdown(listener *l, int notify_fd)
struct listener_msg::@6::@7 add_socket
#define LISTENER_MSG_TAG
Arbitrary byte used to tag writes from the listener.
static void msg_handler(listener *l, listener_msg *pmsg)
static void remove_socket(listener *l, int fd, int notify_fd)
#define LISTENER_CMD_BUF_SIZE
A queue message, with a command in the tagged union.
bus_msg_result_t result
Result message, constructed in place after the request/response cycle has completed or failed due to ...
connection_info * fd_info[1000]
The connection info, corresponding to the the file descriptors tracked in l->fds. ...
#define INCOMING_MSG_PIPE_ID
ID of the struct pollfd for the listener's incoming command pipe.
uint16_t ListenerTask_GetBackpressure(struct listener *l)
Get the current backpressure from the listener.
struct listener_msg::@6::@10 expect
void * udata
User data for callbacks.
ssize_t syscall_read(int fildes, void *buf, size_t nbyte)
struct listener_msg::@6::@9 hold
bool ListenerTask_GrowReadBuf(listener *l, size_t nsize)
Grow the listener's read buffer to NSIZE.
void ListenerCmd_CheckIncomingMessages(listener *l, int *res)
Process incoming commands, if any.
uint16_t tracked_fds
FDs currently tracked by listener.
void * udata
user connection data
void ListenerTask_ReleaseMsg(struct listener *l, listener_msg *msg)
Release a message to the listener's message pool.
void ListenerCmd_NotifyCaller(listener *l, int fd)
Notify the listener's caller that a command has completed.
void ListenerTask_AttemptDelivery(listener *l, struct rx_info_t *info)
Attempt delivery of the message boxed in INFO.
Per-socket connection context.
static void hold_response(listener *l, int fd, int64_t seq_id, int16_t timeout_sec, int notify_fd)
#define BUS_LOG_SNPRINTF(B, LEVEL, EVENT_KEY, UDATA, MAX_SZ, FMT,...)
#define MAX_FDS
Max number of sockets to monitor.
struct rx_info_t::@12::@14 expect
bus_sink_cb * sink_cb
IO sink callback.
static void add_socket(listener *l, connection_info *ci, int notify_fd)
rx_info_t * ListenerHelper_GetFreeRXInfo(struct listener *l)
Get a free RX_INFO record, if any are available.
#define INCOMING_MSG_PIPE
Offset to account for the first file descriptor being the incoming message pipe.
void ListenerTask_NotifyMessageFailure(listener *l, rx_info_t *info, bus_send_status_t status)
Notify the client that the event in INFO has failed with STATUS.
rx_info_t * ListenerHelper_FindInfoBySequenceID(listener *l, int fd, int64_t seq_id)
Try to find an RX_INFO record by a pair.
#define BUS_ASSERT(B, UDATA, COND)
ssize_t syscall_write(int fildes, const void *buf, size_t nbyte)
static void swap(listener *l, int a, int b)
struct listener_msg::@6::@11 shutdown
static void expect_response(listener *l, boxed_msg *box)
uint16_t inactive_fds
File descriptors that are inactive due to errors, but have not yet been explicitly removed/closed by ...
struct listener_msg::@6::@8 remove_socket
int fd
Destination filename and message body.
#define BUS_LOG(B, LEVEL, EVENT_KEY, MSG, UDATA)