30 uint8_t
read_buf[
sizeof(uint8_t) +
sizeof(uint16_t)];
41 fds[0].events = POLLIN;
49 "poll_on_completion, polling %d", fd);
52 "poll_on_completion for %d, res %d (errno %d)", fd, res, errno);
56 "poll_on_completion, resumable IO error %d", errno);
61 "poll_on_completion, non-resumable IO error %d", errno);
64 }
else if (res == 1) {
67 uint8_t
read_buf[
sizeof(uint8_t) +
sizeof(uint16_t)];
70 if (fds[0].revents & (POLLERR | POLLHUP | POLLNVAL)) {
79 ssize_t sz =
syscall_read(fd, read_buf,
sizeof(read_buf));
81 if (sz ==
sizeof(read_buf)) {
85 msec = (read_buf[1] << 0) + (read_buf[2] << 8);
89 }
else if (sz == -1) {
92 "poll_on_completion read, resumable IO error %d", errno);
97 "poll_on_completion read, non-resumable IO error %d", errno);
103 "poll_on_completion bad read size %zd", sz);
111 "poll_on_completion, blocking forever returned %d, errno %d", res, errno);
bool Util_IsResumableIOError(int errno_)
#define LISTENER_BACKPRESSURE_SHIFT
How many bits to >> the backpressure value from commands delivered to the listener.
#define LISTENER_MSG_TAG
Arbitrary byte used to tag writes from the listener.
static uint8_t read_buf[(2 *1024L *1024)]
void * udata
User data for callbacks.
ssize_t syscall_read(int fildes, void *buf, size_t nbyte)
void Bus_BackpressureDelay(struct bus *b, size_t backpressure, uint8_t shift)
Provide backpressure by sleeping for (backpressure >> shift) msec, if the value is greater than 0...
bool BusPoll_OnCompletion(struct bus *b, int fd)
Poll on fd until complete, return true on success or false on IO error.
#define BUS_LOG_SNPRINTF(B, LEVEL, EVENT_KEY, UDATA, MAX_SZ, FMT,...)
int syscall_poll(struct pollfd fds[], nfds_t nfds, int timeout)
Wrappers for syscalls, to allow mocking for testing.
#define BUS_LOG(B, LEVEL, EVENT_KEY, MSG, UDATA)