kinetic-c  v0.12.0
Seagate Kinetic Protocol Client Library for C
listener_io.c
Go to the documentation of this file.
1 /*
2 * kinetic-c
3 * Copyright (C) 2015 Seagate Technology.
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20 #include "listener_io.h"
21 #include "listener_helper.h"
22 
23 #include <unistd.h>
24 #include <assert.h>
25 #include <inttypes.h>
26 
27 #include "listener_task.h"
28 #include "syscall.h"
29 #include "util.h"
30 
31 static ssize_t socket_read_plain(struct bus *b,
32  listener *l, int pfd_i, connection_info *ci);
33 static ssize_t socket_read_ssl(struct bus *b,
34  listener *l, int pfd_i, connection_info *ci);
35 static bool sink_socket_read(struct bus *b,
36  listener *l, connection_info *ci, ssize_t size);
37 static void print_SSL_error(struct bus *b,
38  connection_info *ci, int lvl, const char *prefix);
39 static void set_error_for_socket(listener *l, int id,
40  int fd, rx_error_t err);
41 static void process_unpacked_message(listener *l,
44 
45 void ListenerIO_AttemptRecv(listener *l, int available) {
46  /* --> failure --> set 'closed' error on socket, don't die */
47  struct bus *b = l->bus;
48  int read_from = 0;
49  BUS_LOG(b, 3, LOG_LISTENER, "attempting receive", b->udata);
50 
51  for (int i = 0; i < l->tracked_fds; i++) {
52  if (read_from == available) { break; }
53  struct pollfd *fd = &l->fds[i + INCOMING_MSG_PIPE];
54  connection_info *ci = l->fd_info[i];
55  BUS_ASSERT(b, b->udata, ci->fd == fd->fd);
56 
57  BUS_LOG_SNPRINTF(b, 1, LOG_LISTENER, b->udata, 64,
58  "poll: l->fds[%d]->revents: 0x%04x", // NOCOMMIT
59  i + INCOMING_MSG_PIPE, fd->revents);
60 
61  /* If a socket is about to be shut down, we want to get a
62  * complete read from it if possible, because it's likely to be
63  * an UNSOLICITEDSTATUS message with a reason for the hangup.
64  * Only do single reads otherwise, though, otherwise the
65  * listener can end up blocking too long handling consecutive
66  * reads on a busy connection and causing the incoming command
67  * queue to get backed up. */
68  bool is_closing = fd->events & (POLLERR | POLLNVAL | POLLHUP);
69 
70  if (fd->revents & POLLIN) {
71  // Try to read what we can (possibly before hangup)
72  ssize_t cur_read = 0;
73  size_t to_read = ci->to_read_size;
74  do {
75  BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64,
76  "reading %zd bytes from socket (buf is %zd)",
77  ci->to_read_size, l->read_buf_size);
78  BUS_ASSERT(b, b->udata, l->read_buf_size >= to_read);
79 
80  switch (ci->type) {
81  case BUS_SOCKET_PLAIN:
82  cur_read = socket_read_plain(b, l, i, ci);
83  break;
84  case BUS_SOCKET_SSL:
85  cur_read = socket_read_ssl(b, l, i, ci);
86  break;
87  default:
88  BUS_ASSERT(b, b->udata, false);
89  }
90  // -1: socket error
91  // 0: no more to read
92  } while (is_closing && cur_read > 0 && ci->to_read_size > 0);
93  read_from++;
94  }
95 
96  if (fd->revents & (POLLERR | POLLNVAL)) {
97  read_from++;
98  BUS_LOG(b, 2, LOG_LISTENER,
99  "pollfd: socket error (POLLERR | POLLNVAL)", b->udata);
101  } else if (fd->revents & POLLHUP) {
102  read_from++;
103  BUS_LOG(b, 3, LOG_LISTENER, "pollfd: socket error POLLHUP",
104  b->udata);
106  }
107  }
108 
109  if (l->error_occured) { // only conditionally do this to avoid wasting CPU
110  /* This is done outside of the polling loop, to avoid erroneously repeat-polling
111  * or skipping any individual file descriptors. */
113  l->error_occured = false;
114  }
115 }
116 
117 static ssize_t socket_read_plain(struct bus *b, listener *l, int pfd_i, connection_info *ci) {
118  ssize_t accum = 0;
119  while (ci->to_read_size > 0) {
120  ssize_t size = syscall_read(ci->fd, l->read_buf, ci->to_read_size);
121  if (size == -1) {
122  BUS_LOG_SNPRINTF(b, 6, LOG_LISTENER, b->udata, 64,
123  "read: size %zd, errno %d", size, errno);
124  if (errno == EAGAIN) {
125  errno = 0;
126  return accum;
127  } else if (Util_IsResumableIOError(errno)) {
128  errno = 0;
129  continue;
130  } else {
131  BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64,
132  "read: socket error reading, %d", errno);
134  errno = 0;
135  return -1;
136  }
137  }
138 
139  if (size > 0) {
140  BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 64,
141  "read: %zd", size);
142  sink_socket_read(b, l, ci, size);
143  accum += size;
144  } else {
145  return accum;
146  }
147  }
148  return accum;
149 }
150 
151 static void print_SSL_error(struct bus *b, connection_info *ci, int lvl, const char *prefix) {
152  unsigned long errval = ERR_get_error();
153  char ebuf[256];
154  BUS_LOG_SNPRINTF(b, lvl, LOG_LISTENER, b->udata, 64,
155  "%s -- ERROR on fd %d -- %s",
156  prefix, ci->fd, ERR_error_string(errval, ebuf));
157  (void)ci;
158  (void)lvl;
159  (void)errval;
160  (void)ebuf;
161  (void)prefix;
162 }
163 
164 static ssize_t socket_read_ssl(struct bus *b, listener *l, int pfd_i, connection_info *ci) {
165  BUS_ASSERT(b, b->udata, ci->ssl);
166  ssize_t accum = 0;
167  while (ci->to_read_size > 0) {
168  // ssize_t pending = SSL_pending(ci->ssl);
169  ssize_t size = (ssize_t)syscall_SSL_read(ci->ssl, l->read_buf, ci->to_read_size);
170 
171  if (size == -1) {
172  int reason = syscall_SSL_get_error(ci->ssl, size);
173  switch (reason) {
174  case SSL_ERROR_WANT_READ:
175  BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64,
176  "SSL_read fd %d: WANT_READ", ci->fd);
177  return accum;
178 
179  case SSL_ERROR_WANT_WRITE:
180  BUS_ASSERT(b, b->udata, false);
181 
182  case SSL_ERROR_SYSCALL:
183  {
184  if (errno == 0) {
185  print_SSL_error(b, ci, 1, "SSL_ERROR_SYSCALL errno 0");
186  BUS_ASSERT(b, b->udata, false);
187  } else if (Util_IsResumableIOError(errno)) {
188  errno = 0;
189  continue;
190  } else {
191  BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64,
192  "SSL_read fd %d: errno %d", ci->fd, errno);
193  print_SSL_error(b, ci, 1, "SSL_ERROR_SYSCALL");
195  return -1;
196  }
197  break;
198  }
199  case SSL_ERROR_ZERO_RETURN:
200  {
201  BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64,
202  "SSL_read fd %d: ZERO_RETURN (HUP)", ci->fd);
203  set_error_for_socket(l, pfd_i, ci->fd, RX_ERROR_POLLHUP);
204  return -1;
205  }
206 
207  default:
208  print_SSL_error(b, ci, 1, "SSL_ERROR UNKNOWN");
210  BUS_ASSERT(b, b->udata, false);
211  }
212  } else if (size > 0) {
213  sink_socket_read(b, l, ci, size);
214  accum += size;
215  if ((size_t)accum == ci->to_read_size) { break; }
216  } else {
217  break;
218  }
219  }
220  return accum;
221 }
222 
223 #define DUMP_READ 0
224 
225 static bool sink_socket_read(struct bus *b,
226  listener *l, connection_info *ci, ssize_t size) {
227  BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64,
228  "read %zd bytes, calling sink CB", size);
229 
230 #if DUMP_READ
231  printf("\n");
232  for (int i = 0; i < size; i++) {
233  if (i > 0 && (i & 15) == 0) { printf("\n"); }
234  printf("%02x ", l->read_buf[i]);
235  }
236  printf("\n\n");
237 #endif
238 
239  bus_sink_cb_res_t sres = b->sink_cb(l->read_buf, size, ci->udata);
240  if (sres.full_msg_buffer) {
241  BUS_LOG(b, 3, LOG_LISTENER, "calling unpack CB", b->udata);
242  bus_unpack_cb_res_t ures = b->unpack_cb(sres.full_msg_buffer, ci->udata);
243  BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64,
244  "process_unpacked_message: ok? %d, seq_id:%lld",
245  ures.ok, (long long)ures.u.success.seq_id);
246  process_unpacked_message(l, ci, ures);
247  }
248 
249  ci->to_read_size = sres.next_read;
250 
251  BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64,
252  "expecting next read to have %zd bytes", ci->to_read_size);
253 
254  /* Grow read buffer if necessary. */
255  if (ci->to_read_size > l->read_buf_size) {
256  if (!ListenerTask_GrowReadBuf(l, ci->to_read_size)) {
257  BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128,
258  "Read buffer realloc failure for %p (%zd to %zd)",
259  l->read_buf, l->read_buf_size, ci->to_read_size);
260  BUS_ASSERT(b, b->udata, false);
261  }
262  }
263  return true;
264 }
265 
266 static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err) {
267  l->error_occured = true;
268 
269  /* Mark all pending messages on this socket as being failed due to error. */
270  struct bus *b = l->bus;
271  BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64,
272  "set_error_for_socket %d, err %d", fd, err);
273 
274  for (int i = 0; i <= l->rx_info_max_used; i++) {
275  rx_info_t *info = &l->rx_info[i];
276  switch (info->state) {
277  case RIS_INACTIVE:
278  break;
279  case RIS_HOLD:
280  /* We should set an error on the info, but let the timeout
281  * or a pending EXPECT message handle the error. That way,
282  * it can be handled via the status callback whenever
283  * possible. */
284  info->u.hold.error = err;
285  break;
286  case RIS_EXPECT:
287  {
288  struct boxed_msg *box = info->u.expect.box;
289  if (box && box->fd == fd) {
290  info->u.expect.error = err;
291  }
292  break;
293  }
294  default:
295  {
296  BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64,
297  "match fail %d on line %d", info->state, __LINE__);
298  BUS_ASSERT(b, b->udata, false);
299  }
300  }
301  }
302 
303  connection_info *newly_inactive_ci = l->fd_info[id];
304  newly_inactive_ci->error = err;
305 }
306 
308  for (uint16_t id = 0; id < l->tracked_fds - l->inactive_fds; id++) {
309  connection_info *ci = l->fd_info[id];
310  struct pollfd *pfd = &l->fds[id + INCOMING_MSG_PIPE];
311  int fd = pfd->fd;
312  if (ci->error < 0 && pfd->events & POLLIN) {
313  pfd->events &= ~POLLIN;
314  /* move socket to end, so it won't be poll'd and get repeated POLLHUP. */
315  int last_active = l->tracked_fds - l->inactive_fds - 1;
316  if (id != last_active) {
317  fprintf(stderr, "swapping %u and %u\n", id, last_active);
318  assert(l->fds[last_active + INCOMING_MSG_PIPE].fd != fd);
319  struct pollfd newly_inactive_fd = l->fds[id + INCOMING_MSG_PIPE];
320  struct pollfd last_active_fd = l->fds[last_active + INCOMING_MSG_PIPE];
321  connection_info *last_active_ci = l->fd_info[last_active];
322  /* Swap pollfds */
323  l->fds[id + INCOMING_MSG_PIPE] = last_active_fd;
324  l->fds[last_active + INCOMING_MSG_PIPE] = newly_inactive_fd;
325  /* Swap connection_info pointers */
326  l->fd_info[last_active] = ci;
327  l->fd_info[id] = last_active_ci;
328  }
329  l->inactive_fds++;
330  assert(l->inactive_fds <= l->tracked_fds);
331  }
332  }
333 }
334 
336  connection_info *ci, bus_unpack_cb_res_t result) {
337  struct bus *b = l->bus;
338 
339  /* NOTE: message may be an unsolicited status message */
340 
341  if (result.ok) {
342  int64_t seq_id = result.u.success.seq_id;
343  void *opaque_msg = result.u.success.msg;
344 
345  rx_info_t *info = ListenerHelper_FindInfoBySequenceID(l, ci->fd, seq_id);
346 
347  if (info) {
348  switch (info->state) {
349  case RIS_HOLD:
350  /* Just save result, to match up later. */
351  BUS_ASSERT(b, b->udata, !info->u.hold.has_result);
352  info->u.hold.has_result = true;
353  info->u.hold.result = result;
354  break;
355  case RIS_EXPECT:
356  {
357  BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 128,
358  "marking info %d, seq_id:%lld ready for delivery",
359  info->id, (long long)result.u.success.seq_id);
360  info->u.expect.error = RX_ERROR_READY_FOR_DELIVERY;
361  BUS_ASSERT(b, b->udata, !info->u.hold.has_result);
362  info->u.expect.has_result = true;
363  info->u.expect.result = result;
365  break;
366  }
367  case RIS_INACTIVE:
368  default:
369  BUS_ASSERT(b, b->udata, false);
370  }
371  } else {
372  /* We received a response that we weren't expecting. */
373  if (seq_id != BUS_NO_SEQ_ID) {
374  BUS_LOG_SNPRINTF(b, 2 - 2, LOG_LISTENER, b->udata, 128,
375  "Couldn't find info for fd:%d, seq_id:%lld, msg %p",
376  ci->fd, (long long)seq_id, opaque_msg);
377  }
378  if (b->unexpected_msg_cb) {
379  b->unexpected_msg_cb(opaque_msg, seq_id, b->udata, ci->udata);
380  }
381  }
382  } else {
383  uintptr_t e_id = result.u.error.opaque_error_id;
384  BUS_LOG_SNPRINTF(b, 1, LOG_LISTENER, b->udata, 128,
385  "Got opaque_error_id of %"PRIuPTR" (0x%08"PRIxPTR")",
386  e_id, e_id);
387  (void)e_id;
388 
389  /* Timeouts will clean up after it; give user code a chance to
390  * clean up after it here, though technically speaking they
391  * could in the unpack_cb above. */
392  b->error_cb(result, ci->udata);
393  }
394 }
int syscall_SSL_get_error(const SSL *ssl, int ret)
Definition: syscall.c:52
const uint16_t id
bool Util_IsResumableIOError(int errno_)
Definition: util.c:26
Record in table for partially processed messages.
struct rx_info_t::@12::@13 hold
struct pollfd fds[1000+1]
Tracked file descriptors, for polling.
const bus_socket_t type
union bus_unpack_cb_res_t::@0 u
Receiver of responses.
bus_unexpected_msg_cb * unexpected_msg_cb
connection_info * fd_info[1000]
The connection info, corresponding to the the file descriptors tracked in l->fds. ...
SSL * ssl
SSL handle. Must be valid or BUS_NO_SSL.
bus_unpack_cb * unpack_cb
Message unpacking callback.
static ssize_t socket_read_ssl(struct bus *b, listener *l, int pfd_i, connection_info *ci)
Definition: listener_io.c:164
Message bus.
static ssize_t socket_read_plain(struct bus *b, listener *l, int pfd_i, connection_info *ci)
Definition: listener_io.c:117
void * udata
User data for callbacks.
struct bus * bus
ssize_t syscall_read(int fildes, void *buf, size_t nbyte)
Definition: syscall.c:39
bool ListenerTask_GrowReadBuf(listener *l, size_t nsize)
Grow the listener's read buffer to NSIZE.
union rx_info_t::@12 u
#define BUS_NO_SEQ_ID
Definition: bus_types.h:39
bus_error_cb * error_cb
Error handling callback.
uint16_t tracked_fds
FDs currently tracked by listener.
static void print_SSL_error(struct bus *b, connection_info *ci, int lvl, const char *prefix)
Definition: listener_io.c:151
void * udata
user connection data
static void process_unpacked_message(listener *l, connection_info *ci, bus_unpack_cb_res_t result)
Definition: listener_io.c:335
rx_info_t rx_info[(1024)]
void ListenerTask_AttemptDelivery(listener *l, struct rx_info_t *info)
Attempt delivery of the message boxed in INFO.
Per-socket connection context.
int syscall_SSL_read(SSL *ssl, void *buf, int num)
Definition: syscall.c:48
uint16_t rx_info_max_used
#define BUS_LOG_SNPRINTF(B, LEVEL, EVENT_KEY, UDATA, MAX_SZ, FMT,...)
Definition: bus_types.h:59
static bool sink_socket_read(struct bus *b, listener *l, connection_info *ci, ssize_t size)
Definition: listener_io.c:225
struct rx_info_t::@12::@14 expect
bus_sink_cb * sink_cb
IO sink callback.
struct bus_unpack_cb_res_t::@0::@2 error
bool error_occured
Flag indicating post-poll handling is necessary.
#define INCOMING_MSG_PIPE
Offset to account for the first file descriptor being the incoming message pipe.
void * full_msg_buffer
Definition: bus_types.h:115
rx_info_t * ListenerHelper_FindInfoBySequenceID(listener *l, int fd, int64_t seq_id)
Try to find an RX_INFO record by a pair.
struct bus_unpack_cb_res_t::@0::@1 success
#define BUS_ASSERT(B, UDATA, COND)
Definition: bus_types.h:83
static void move_errored_active_sockets_to_end(listener *l)
Definition: listener_io.c:307
rx_info_state state
uint16_t inactive_fds
File descriptors that are inactive due to errors, but have not yet been explicitly removed/closed by ...
int fd
Destination filename and message body.
void ListenerIO_AttemptRecv(listener *l, int available)
Definition: listener_io.c:45
static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err)
Definition: listener_io.c:266
#define BUS_LOG(B, LEVEL, EVENT_KEY, MSG, UDATA)
Definition: bus_types.h:45