kinetic-c  v0.12.0
Seagate Kinetic Protocol Client Library for C
send.c
Go to the documentation of this file.
1 /*
2 * kinetic-c
3 * Copyright (C) 2015 Seagate Technology.
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20 #include <stdio.h>
21 #include <string.h>
22 #include <unistd.h>
23 #include <errno.h>
24 #include <sys/time.h>
25 #include <assert.h>
26 #include <errno.h>
27 
28 #include "bus.h"
29 #include "bus_poll.h"
30 #include "bus_types.h"
31 #include "bus_internal_types.h"
32 #include "listener.h"
33 #include "syscall.h"
34 #include "util.h"
35 #include "atomic.h"
36 #include "yacht.h"
37 #include "send_helper.h"
38 #include "send_internal.h"
39 
40 #ifdef TEST
41 struct timeval start;
42 struct timeval now;
43 struct pollfd fds[1];
44 size_t backpressure = 0;
45 int poll_errno = 0;
46 int write_errno = 0;
47 int completion_pipe = -1;
48 #endif
49 
51  int fd, int64_t seq_id, int16_t timeout_sec);
52 /* Do a blocking send.
53  *
54  * RetuBus_RegisterSocketing true indicates that the message has been queued up for
55  * delivery, but the request or response may still fail. Those errors
56  * are handled by giving an error status code to the callback.
57  * RetuBus_RegisterSocketing false means that the send was rejected outright, and
58  * the callback-based error handling will not be used. */
60  /* Note: assumes that all locking and thread-safe seq_id allocation
61  * has been handled upstream. If multiple client requests are queued
62  * up to go out at the same time, they must go out in monotonic order,
63  * with only a single thread writing to the socket at once. */
64  assert(b);
65  assert(box);
66 
67  BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 256,
68  "doing blocking send of box %p, with <fd:%d, seq_id %lld>, msg[%zd]: %p",
69  (void *)box, box->fd, (long long)box->out_seq_id,
70  box->out_msg_size, (void *)box->out_msg);
71 
72  int timeout_msec = box->timeout_sec * 1000;
73 
74 #ifndef TEST
75  struct timeval start;
76  struct timeval now;
77  struct pollfd fds[1];
78 #endif
79  if (Util_Timestamp(&start, true)) {
80  box->tv_send_start = start;
81  } else {
82  BUS_LOG_SNPRINTF(b, 0, LOG_SENDER, b->udata, 128,
83  "gettimeofday failure: %d", errno);
84  return false;
85  }
86 
87  fds[0].fd = box->fd;
88  fds[0].events = POLLOUT;
89 
90  /* Notify the listener that we're about to start writing to a drive,
91  * because (in rare cases) the response may arrive between finishing
92  * the write and the listener processing the notification. In that
93  * case, it should hold onto the unrecognized response until the
94  * client notifies it (and passes it the callback).
95  *
96  * This timeout is several extra seconds so that we don't have
97  * a window where the HOLD message has timed out, but the
98  * EXPECT hasn't, leading to ambiguity about what to do with
99  * the response (which may or may not have arrived).
100  * */
102  box->fd, box->out_seq_id, box->timeout_sec + 5)) {
103  return false;
104  }
105  assert(box->out_sent_size == 0);
106 
107  int rem_msec = timeout_msec;
108 
109  while (rem_msec > 0) {
110  if (Util_Timestamp(&now, true)) {
111  size_t usec_elapsed = (((now.tv_sec - start.tv_sec) * 1000000)
112  + (now.tv_usec - start.tv_usec));
113  size_t msec_elapsed = usec_elapsed / 1000;
114 
115  rem_msec = timeout_msec - msec_elapsed;
116  } else {
117  /* If gettimeofday fails here, the listener's hold message has
118  * already been sent; it will time out later. We need to treat
119  * this like a TX failure (including closing the socket) because
120  * we don't know what state the connection was left in. */
121  BUS_LOG_SNPRINTF(b, 0, LOG_SENDER, b->udata, 128,
122  "gettimeofday failure in poll loop: %d", errno);
124  return true;
125  }
126 
127  #ifdef TEST
128  errno = poll_errno;
129  #endif
130  int res = syscall_poll(fds, 1, rem_msec);
131  BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 256,
132  "handle_write: poll res %d", res);
133  if (res == -1) {
134  if (errno == EINTR || errno == EAGAIN) { /* interrupted/try again */
135  errno = 0;
136  continue;
137  } else {
139  return true;
140  }
141  } else if (res == 1) {
142  short revents = fds[0].revents;
143  if (revents & POLLNVAL) {
144  BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 256,
145  "do_blocking_send on %d: POLLNVAL => UNREGISTERED_SOCKET", box->fd);
147  return true;
148  } else if (revents & (POLLERR | POLLHUP)) {
149  BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 256,
150  "do_blocking_send on %d: POLLERR/POLLHUP => TX_FAILURE (%d)",
151  box->fd, revents);
153  return true;
154  } else if (revents & POLLOUT) {
156 
157  BUS_LOG_SNPRINTF(b, 4, LOG_SENDER, b->udata, 256,
158  "SendHelper_HandleWrite res %d", hw_res);
159 
160  switch (hw_res) {
161  case SHHW_OK:
162  continue;
163  case SHHW_DONE:
164  return true;
165  case SHHW_ERROR:
166  return true;
167  }
168  } else {
169  BUS_LOG_SNPRINTF(b, 0, LOG_SENDER, b->udata, 256,
170  "match fail %d", revents);
171  assert(false); /* match fail */
172  }
173  } else if (res == 0) { /* timeout */
174  break;
175  }
176  }
177  BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 256,
178  "do_blocking_send on <fd:%d, seq_id:%lld>: TX_TIMEOUT",
179  box->fd, (long long)box->out_seq_id);
181  return true;
182 }
183 
185  int fd, int64_t seq_id, int16_t timeout_sec) {
186  BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 128,
187  "telling listener to HOLD response, with <fd:%d, seq_id:%lld>",
188  fd, (long long)seq_id);
189 
190  struct listener *l = Bus_GetListenerForSocket(b, fd);
191 
192  const int max_retries = SEND_NOTIFY_LISTENER_RETRIES;
193  for (int try = 0; try < max_retries; try++) {
194  #ifndef TEST
195  int completion_pipe = -1;
196  #endif
197  if (Listener_HoldResponse(l, fd, seq_id, timeout_sec, &completion_pipe)) {
198  return BusPoll_OnCompletion(b, completion_pipe);
199  } else {
200  /* Don't apply much backpressure here since the client
201  * thread will get it when the message is done sending. */
203  }
204  }
205  return false;
206 }
207 
208 void Send_HandleFailure(struct bus *b, boxed_msg *box, bus_send_status_t status) {
209  BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64,
210  "Send_HandleFailure: box %p, <fd:%d, seq_id:%lld>, status %d",
211  (void*)box, box->fd, (long long)box->out_seq_id, status);
212  BUS_ASSERT(b, b->udata, status != BUS_SEND_UNDEFINED);
213 
214  box->result = (bus_msg_result_t){
215  .status = status,
216  };
217 
218  #ifndef TEST
219  size_t backpressure = 0;
220  #endif
221 
222  /* Retry until it succeeds. */
223  size_t retries = 0;
224  for (;;) {
225  if (Bus_ProcessBoxedMessage(b, box, &backpressure)) {
226  BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64,
227  "deleted box %p", (void*)box);
228  Bus_BackpressureDelay(b, backpressure,
230  return;
231  } else {
232  retries++;
234  if (retries > 0 && (retries & 255) == 0) {
235  BUS_LOG_SNPRINTF(b, 0, LOG_SENDER, b->udata, 64,
236  "looping on handle_failure retry: %zd", retries);
237  }
238  }
239  }
240 }
time_t timeout_sec
Message send timeout.
bool Listener_HoldResponse(struct listener *l, int fd, int64_t seq_id, int16_t timeout_sec, int *notify_fd)
The client is about to start a write, the listener should hold on to the response (with timeout) if i...
Definition: listener.c:115
bool Bus_ProcessBoxedMessage(struct bus *b, struct boxed_msg *box, size_t *backpressure)
Deliver a boxed message to the thread pool to execute.
Definition: bus.c:580
bus_msg_result_t result
Result message, constructed in place after the request/response cycle has completed or failed due to ...
Receiver of responses.
static bool attempt_to_enqueue_HOLD_message_to_listener(struct bus *b, int fd, int64_t seq_id, int16_t timeout_sec)
Definition: send.c:184
SendHelper_HandleWrite_res
Definition: send_helper.h:26
#define LISTENER_EXPECT_BACKPRESSURE_SHIFT
How many bits to >> the backpressure value from the listener when a send has completed.
Definition: listener.h:32
size_t out_sent_size
struct timeval tv_send_start
Event timestamps to track timeouts.
Message bus.
void * udata
User data for callbacks.
uint8_t * out_msg
int64_t out_seq_id
bus_send_status_t status
Definition: bus_types.h:216
#define SEND_NOTIFY_LISTENER_RETRY_DELAY
Definition: send_internal.h:26
void Bus_BackpressureDelay(struct bus *b, size_t backpressure, uint8_t shift)
Provide backpressure by sleeping for (backpressure >> shift) msec, if the value is greater than 0...
Definition: bus.c:551
bool BusPoll_OnCompletion(struct bus *b, int fd)
Poll on fd until complete, return true on success or false on IO error.
Definition: bus_poll.c:35
bus_send_status_t
Definition: bus_types.h:193
#define BUS_LOG_SNPRINTF(B, LEVEL, EVENT_KEY, UDATA, MAX_SZ, FMT,...)
Definition: bus_types.h:59
#define SEND_NOTIFY_LISTENER_RETRIES
Definition: send_internal.h:25
size_t out_msg_size
SendHelper_HandleWrite_res SendHelper_HandleWrite(bus *b, boxed_msg *box)
Definition: send_helper.c:39
void Send_HandleFailure(struct bus *b, boxed_msg *box, bus_send_status_t status)
Definition: send.c:208
#define BUS_ASSERT(B, UDATA, COND)
Definition: bus_types.h:83
bool Send_DoBlockingSend(bus *b, boxed_msg *box)
Do a blocking send.
Definition: send.c:59
bool Util_Timestamp(struct timeval *tv, bool relative)
Definition: util.c:30
struct listener * Bus_GetListenerForSocket(struct bus *b, int fd)
For a given file descriptor, get the listener ID to use.
Definition: bus.c:330
int fd
Destination filename and message body.
int syscall_poll(struct pollfd fds[], nfds_t nfds, int timeout)
Wrappers for syscalls, to allow mocking for testing.
Definition: syscall.c:27