52 if (read_from == available) {
break; }
58 "poll: l->fds[%d]->revents: 0x%04x",
68 bool is_closing = fd->events & (POLLERR | POLLNVAL | POLLHUP);
70 if (fd->revents & POLLIN) {
76 "reading %zd bytes from socket (buf is %zd)",
92 }
while (is_closing && cur_read > 0 && ci->
to_read_size > 0);
96 if (fd->revents & (POLLERR | POLLNVAL)) {
99 "pollfd: socket error (POLLERR | POLLNVAL)", b->
udata);
101 }
else if (fd->revents & POLLHUP) {
123 "read: size %zd, errno %d", size, errno);
124 if (errno == EAGAIN) {
132 "read: socket error reading, %d", errno);
152 unsigned long errval = ERR_get_error();
155 "%s -- ERROR on fd %d -- %s",
156 prefix, ci->
fd, ERR_error_string(errval, ebuf));
174 case SSL_ERROR_WANT_READ:
176 "SSL_read fd %d: WANT_READ", ci->
fd);
179 case SSL_ERROR_WANT_WRITE:
182 case SSL_ERROR_SYSCALL:
192 "SSL_read fd %d: errno %d", ci->
fd, errno);
199 case SSL_ERROR_ZERO_RETURN:
202 "SSL_read fd %d: ZERO_RETURN (HUP)", ci->
fd);
212 }
else if (size > 0) {
228 "read %zd bytes, calling sink CB", size);
232 for (
int i = 0; i < size; i++) {
233 if (i > 0 && (i & 15) == 0) { printf(
"\n"); }
244 "process_unpacked_message: ok? %d, seq_id:%lld",
252 "expecting next read to have %zd bytes", ci->
to_read_size);
258 "Read buffer realloc failure for %p (%zd to %zd)",
272 "set_error_for_socket %d, err %d", fd, err);
276 switch (info->
state) {
284 info->
u.
hold.error = err;
289 if (box && box->
fd == fd) {
297 "match fail %d on line %d", info->
state, __LINE__);
304 newly_inactive_ci->
error = err;
312 if (ci->
error < 0 && pfd->events & POLLIN) {
313 pfd->events &= ~POLLIN;
316 if (
id != last_active) {
317 fprintf(stderr,
"swapping %u and %u\n",
id, last_active);
327 l->
fd_info[id] = last_active_ci;
342 int64_t seq_id = result.
u.
success.seq_id;
343 void *opaque_msg = result.
u.
success.msg;
348 switch (info->
state) {
352 info->
u.
hold.has_result =
true;
353 info->
u.
hold.result = result;
358 "marking info %d, seq_id:%lld ready for delivery",
359 info->
id, (
long long)result.
u.
success.seq_id);
362 info->
u.
expect.has_result =
true;
363 info->
u.
expect.result = result;
375 "Couldn't find info for fd:%d, seq_id:%lld, msg %p",
376 ci->
fd, (
long long)seq_id, opaque_msg);
383 uintptr_t e_id = result.
u.
error.opaque_error_id;
385 "Got opaque_error_id of %"PRIuPTR
" (0x%08"PRIxPTR
")",
int syscall_SSL_get_error(const SSL *ssl, int ret)
bool Util_IsResumableIOError(int errno_)
Record in table for partially processed messages.
struct rx_info_t::@12::@13 hold
struct pollfd fds[1000+1]
Tracked file descriptors, for polling.
union bus_unpack_cb_res_t::@0 u
bus_unexpected_msg_cb * unexpected_msg_cb
connection_info * fd_info[1000]
The connection info, corresponding to the the file descriptors tracked in l->fds. ...
SSL * ssl
SSL handle. Must be valid or BUS_NO_SSL.
bus_unpack_cb * unpack_cb
Message unpacking callback.
static ssize_t socket_read_ssl(struct bus *b, listener *l, int pfd_i, connection_info *ci)
static ssize_t socket_read_plain(struct bus *b, listener *l, int pfd_i, connection_info *ci)
void * udata
User data for callbacks.
ssize_t syscall_read(int fildes, void *buf, size_t nbyte)
bool ListenerTask_GrowReadBuf(listener *l, size_t nsize)
Grow the listener's read buffer to NSIZE.
bus_error_cb * error_cb
Error handling callback.
uint16_t tracked_fds
FDs currently tracked by listener.
static void print_SSL_error(struct bus *b, connection_info *ci, int lvl, const char *prefix)
void * udata
user connection data
static void process_unpacked_message(listener *l, connection_info *ci, bus_unpack_cb_res_t result)
rx_info_t rx_info[(1024)]
void ListenerTask_AttemptDelivery(listener *l, struct rx_info_t *info)
Attempt delivery of the message boxed in INFO.
Per-socket connection context.
int syscall_SSL_read(SSL *ssl, void *buf, int num)
uint16_t rx_info_max_used
#define BUS_LOG_SNPRINTF(B, LEVEL, EVENT_KEY, UDATA, MAX_SZ, FMT,...)
static bool sink_socket_read(struct bus *b, listener *l, connection_info *ci, ssize_t size)
struct rx_info_t::@12::@14 expect
bus_sink_cb * sink_cb
IO sink callback.
struct bus_unpack_cb_res_t::@0::@2 error
bool error_occured
Flag indicating post-poll handling is necessary.
#define INCOMING_MSG_PIPE
Offset to account for the first file descriptor being the incoming message pipe.
rx_info_t * ListenerHelper_FindInfoBySequenceID(listener *l, int fd, int64_t seq_id)
Try to find an RX_INFO record by a pair.
struct bus_unpack_cb_res_t::@0::@1 success
#define BUS_ASSERT(B, UDATA, COND)
static void move_errored_active_sockets_to_end(listener *l)
uint16_t inactive_fds
File descriptors that are inactive due to errors, but have not yet been explicitly removed/closed by ...
int fd
Destination filename and message body.
void ListenerIO_AttemptRecv(listener *l, int available)
static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err)
#define BUS_LOG(B, LEVEL, EVENT_KEY, MSG, UDATA)