44 size_t backpressure = 0;
47 int completion_pipe = -1;
51 int fd, int64_t seq_id, int16_t timeout_sec);
68 "doing blocking send of box %p, with <fd:%d, seq_id %lld>, msg[%zd]: %p",
83 "gettimeofday failure: %d", errno);
88 fds[0].events = POLLOUT;
107 int rem_msec = timeout_msec;
109 while (rem_msec > 0) {
111 size_t usec_elapsed = (((now.tv_sec - start.tv_sec) * 1000000)
112 + (now.tv_usec - start.tv_usec));
113 size_t msec_elapsed = usec_elapsed / 1000;
115 rem_msec = timeout_msec - msec_elapsed;
122 "gettimeofday failure in poll loop: %d", errno);
132 "handle_write: poll res %d", res);
134 if (errno == EINTR || errno == EAGAIN) {
141 }
else if (res == 1) {
142 short revents = fds[0].revents;
143 if (revents & POLLNVAL) {
145 "do_blocking_send on %d: POLLNVAL => UNREGISTERED_SOCKET", box->
fd);
148 }
else if (revents & (POLLERR | POLLHUP)) {
150 "do_blocking_send on %d: POLLERR/POLLHUP => TX_FAILURE (%d)",
154 }
else if (revents & POLLOUT) {
158 "SendHelper_HandleWrite res %d", hw_res);
170 "match fail %d", revents);
173 }
else if (res == 0) {
178 "do_blocking_send on <fd:%d, seq_id:%lld>: TX_TIMEOUT",
185 int fd, int64_t seq_id, int16_t timeout_sec) {
187 "telling listener to HOLD response, with <fd:%d, seq_id:%lld>",
188 fd, (
long long)seq_id);
193 for (
int try = 0;
try < max_retries;
try++) {
195 int completion_pipe = -1;
210 "Send_HandleFailure: box %p, <fd:%d, seq_id:%lld>, status %d",
211 (
void*)box, box->
fd, (
long long)box->
out_seq_id, status);
219 size_t backpressure = 0;
227 "deleted box %p", (
void*)box);
234 if (retries > 0 && (retries & 255) == 0) {
236 "looping on handle_failure retry: %zd", retries);
time_t timeout_sec
Message send timeout.
bool Listener_HoldResponse(struct listener *l, int fd, int64_t seq_id, int16_t timeout_sec, int *notify_fd)
The client is about to start a write, the listener should hold on to the response (with timeout) if i...
bool Bus_ProcessBoxedMessage(struct bus *b, struct boxed_msg *box, size_t *backpressure)
Deliver a boxed message to the thread pool to execute.
bus_msg_result_t result
Result message, constructed in place after the request/response cycle has completed or failed due to ...
static bool attempt_to_enqueue_HOLD_message_to_listener(struct bus *b, int fd, int64_t seq_id, int16_t timeout_sec)
SendHelper_HandleWrite_res
#define LISTENER_EXPECT_BACKPRESSURE_SHIFT
How many bits to >> the backpressure value from the listener when a send has completed.
struct timeval tv_send_start
Event timestamps to track timeouts.
void * udata
User data for callbacks.
#define SEND_NOTIFY_LISTENER_RETRY_DELAY
void Bus_BackpressureDelay(struct bus *b, size_t backpressure, uint8_t shift)
Provide backpressure by sleeping for (backpressure >> shift) msec, if the value is greater than 0...
bool BusPoll_OnCompletion(struct bus *b, int fd)
Poll on fd until complete, return true on success or false on IO error.
#define BUS_LOG_SNPRINTF(B, LEVEL, EVENT_KEY, UDATA, MAX_SZ, FMT,...)
#define SEND_NOTIFY_LISTENER_RETRIES
SendHelper_HandleWrite_res SendHelper_HandleWrite(bus *b, boxed_msg *box)
void Send_HandleFailure(struct bus *b, boxed_msg *box, bus_send_status_t status)
#define BUS_ASSERT(B, UDATA, COND)
bool Send_DoBlockingSend(bus *b, boxed_msg *box)
Do a blocking send.
bool Util_Timestamp(struct timeval *tv, bool relative)
struct listener * Bus_GetListenerForSocket(struct bus *b, int fd)
For a given file descriptor, get the listener ID to use.
int fd
Destination filename and message body.
int syscall_poll(struct pollfd fds[], nfds_t nfds, int timeout)
Wrappers for syscalls, to allow mocking for testing.