kinetic-c  v0.12.0
Seagate Kinetic Protocol Client Library for C
listener_cmd.c
Go to the documentation of this file.
1 /*
2 * kinetic-c
3 * Copyright (C) 2015 Seagate Technology.
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20 #include <unistd.h>
21 #include <err.h>
22 #include <assert.h>
23 #include "syscall.h"
24 
25 #include "listener_cmd.h"
26 #include "listener_cmd_internal.h"
27 #include "listener_task.h"
28 #include "listener_helper.h"
29 
30 static void msg_handler(listener *l, listener_msg *pmsg);
31 static void add_socket(listener *l, connection_info *ci, int notify_fd);
32 static void remove_socket(listener *l, int fd, int notify_fd);
33 static void hold_response(listener *l, int fd, int64_t seq_id, int16_t timeout_sec, int notify_fd);
34 static void expect_response(listener *l, boxed_msg *box);
35 static void shutdown(listener *l, int notify_fd);
36 
37 #ifdef TEST
38 uint8_t reply_buf[sizeof(uint8_t) + sizeof(uint16_t)];
39 uint8_t cmd_buf[LISTENER_CMD_BUF_SIZE];
40 #endif
41 
43  if (fd == -1) { return; }
44  #ifndef TEST
45  uint8_t reply_buf[sizeof(uint8_t) + sizeof(uint16_t)];
46  #endif
47  reply_buf[0] = LISTENER_MSG_TAG;
48 
49  /* reply_buf[1:2] is little-endian backpressure. */
50  uint16_t backpressure = ListenerTask_GetBackpressure(l);
51  reply_buf[1] = (uint8_t)(backpressure & 0xff);
52  reply_buf[2] = (uint8_t)((backpressure >> 8) & 0xff);
53  struct bus *b = l->bus;
54 
55  for (;;) {
56  BUS_LOG_SNPRINTF(b, 6, LOG_LISTENER, b->udata, 128,
57  "NotifyCaller on %d with backpressure %u",
58  fd, backpressure);;
59 
60  ssize_t wres = syscall_write(fd, reply_buf, sizeof(reply_buf));
61  if (wres == sizeof(reply_buf)) { break; }
62  if (wres == -1) {
63  if (errno == EINTR) {
64  errno = 0;
65  continue;
66  } else {
67  err(1, "write");
68  }
69  }
70  }
71 }
72 
74  struct bus *b = l->bus;
75  short events = l->fds[INCOMING_MSG_PIPE_ID].revents;
76 
77  if (events & (POLLERR | POLLHUP | POLLNVAL)) { /* hangup/error */
78  BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 128,
79  "hangup on listener incoming command pipe: %d", events);
80  return;
81  }
82 
83  if (events & POLLIN) {
84  #ifndef TEST
85  char cmd_buf[LISTENER_CMD_BUF_SIZE];
86  #endif
87  for (;;) {
88  ssize_t rd = syscall_read(l->fds[INCOMING_MSG_PIPE_ID].fd, cmd_buf, sizeof(cmd_buf));
89  if (rd == -1) {
90  if (errno == EINTR) {
91  errno = 0;
92  continue;
93  } else {
94  BUS_LOG_SNPRINTF(b, 6, LOG_LISTENER, b->udata, 128,
95  "check_and_flush_incoming_msg_pipe: %s", strerror(errno));
96  errno = 0;
97  break;
98  }
99  } else {
100  for (ssize_t i = 0; i < rd; i++) {
101  uint8_t msg_id = cmd_buf[i];
102  listener_msg *msg = &l->msgs[msg_id];
103  msg_handler(l, msg);
104  }
105  (*res)--;
106  break;
107  }
108  }
109  }
110 }
111 
112 static void msg_handler(listener *l, listener_msg *pmsg) {
113  struct bus *b = l->bus;
114  BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 128,
115  "Handling message -- %p, type %d", (void*)pmsg, pmsg->type);
116 
117  l->is_idle = false;
118 
119  listener_msg msg = *pmsg;
120  switch (msg.type) {
121 
122  case MSG_ADD_SOCKET:
123  add_socket(l, msg.u.add_socket.info, msg.u.add_socket.notify_fd);
124  break;
125  case MSG_REMOVE_SOCKET:
126  remove_socket(l, msg.u.remove_socket.fd, msg.u.remove_socket.notify_fd);
127  break;
128  case MSG_HOLD_RESPONSE:
129  hold_response(l, msg.u.hold.fd, msg.u.hold.seq_id,
130  msg.u.hold.timeout_sec, msg.u.hold.notify_fd);
131  break;
132  case MSG_EXPECT_RESPONSE:
133  expect_response(l, msg.u.expect.box);
134  break;
135  case MSG_SHUTDOWN:
136  shutdown(l, msg.u.shutdown.notify_fd);
137  break;
138 
139  case MSG_NONE:
140  default:
141  BUS_ASSERT(b, b->udata, false);
142  break;
143  }
144  ListenerTask_ReleaseMsg(l, pmsg);
145 }
146 
147 /* Swap poll and connection info for tracked sockets, by array offset. */
148 static void swap(listener *l, int a, int b) {
149  struct pollfd a_pfd = l->fds[a + INCOMING_MSG_PIPE];
150  struct pollfd b_pfd = l->fds[b + INCOMING_MSG_PIPE];
151  connection_info *a_ci = l->fd_info[a];
152  connection_info *b_ci = l->fd_info[b];
153 
154  l->fds[b + INCOMING_MSG_PIPE] = a_pfd;
155  l->fds[a + INCOMING_MSG_PIPE] = b_pfd;
156 
157  l->fd_info[a] = b_ci;
158  l->fd_info[b] = a_ci;
159 }
160 
161 static void add_socket(listener *l, connection_info *ci, int notify_fd) {
162  /* TODO: if epoll, just register with the OS. */
163  struct bus *b = l->bus;
164  BUS_LOG(b, 3, LOG_LISTENER, "adding socket", b->udata);
165 
166  if (l->tracked_fds == MAX_FDS) {
167  /* error: full */
168  BUS_LOG(b, 3, LOG_LISTENER, "FULL", b->udata);
169  ListenerCmd_NotifyCaller(l, notify_fd);
170  return;
171  }
172  for (int i = 0; i < l->tracked_fds; i++) {
173  if (l->fds[i + INCOMING_MSG_PIPE].fd == ci->fd) {
174  free(ci);
175  ListenerCmd_NotifyCaller(l, notify_fd);
176  return; /* already present */
177  }
178  }
179 
180  int id = l->tracked_fds;
181  l->fd_info[id] = ci;
182  l->fds[id + INCOMING_MSG_PIPE].fd = ci->fd;
183  l->fds[id + INCOMING_MSG_PIPE].events = POLLIN;
184 
185  /* If there are any inactive FDs, we need to swap the new last FD
186  * and the first inactive FD so that the active and inactive FDs
187  * remain contiguous. */
188  if (l->inactive_fds > 0) {
189  int first_inactive = l->tracked_fds - l->inactive_fds;
190  swap(l, id, first_inactive);
191  }
192 
193  l->tracked_fds++;
194 
195  for (int i = 0; i < l->tracked_fds; i++) {
196  if (l->fds[i + INCOMING_MSG_PIPE].events & POLLIN) {
197  assert(i < l->tracked_fds - l->inactive_fds);
198  } else {
199  assert(i >= l->tracked_fds - l->inactive_fds);
200  }
201  }
202 
203  /* Prime the pump by sinking 0 bytes and getting a size to expect. */
204  bus_sink_cb_res_t sink_res = b->sink_cb(l->read_buf, 0, ci->udata);
205  BUS_ASSERT(b, b->udata, sink_res.full_msg_buffer == NULL); // should have nothing to handle yet
206  ci->to_read_size = sink_res.next_read;
207 
208  if (!ListenerTask_GrowReadBuf(l, ci->to_read_size)) {
209  free(ci);
210  ListenerCmd_NotifyCaller(l, notify_fd);
211  return; /* alloc failure */
212  }
213 
214  BUS_LOG(b, 3, LOG_LISTENER, "added socket", b->udata);
215  ListenerCmd_NotifyCaller(l, notify_fd);
216 }
217 
218 static void remove_socket(listener *l, int fd, int notify_fd) {
219  struct bus *b = l->bus;
220  BUS_LOG_SNPRINTF(b, 2, LOG_LISTENER, b->udata, 128,
221  "removing socket %d", fd);
222 
223  /* Don't really close it, just drop info about it in the listener.
224  * The client thread will actually free the structure, close SSL, etc. */
225  for (int id = 0; id < l->tracked_fds; id++) {
226  struct pollfd removing_pfd = l->fds[id + INCOMING_MSG_PIPE];
227  if (removing_pfd.fd == fd) {
228  bool is_active = (removing_pfd.events & POLLIN) > 0;
229  if (l->tracked_fds > 1) {
230  int last_active = l->tracked_fds - l->inactive_fds - 1;
231 
232  /* If removing active node and it isn't the last active one, swap them */
233  if (is_active && id != last_active) {
234  assert(id < last_active);
235  swap(l, id, last_active);
236  id = last_active;
237  }
238 
239  /* If node (which is either last active node or inactive) is not at the end,
240  * and there are inactive nodes, swap it with the last.*/
241  int last = l->tracked_fds - 1;
242  if (id < last) {
243  swap(l, id, last);
244  id = last;
245  }
246 
247  /* The node is now at the end of the array. */
248  }
249 
250  l->tracked_fds--;
251  if (!is_active) { l->inactive_fds--; }
252  }
253  }
254  /* CI will be freed by the client thread. */
255  ListenerCmd_NotifyCaller(l, notify_fd);
256 }
257 
258 static void hold_response(listener *l, int fd, int64_t seq_id,
259  int16_t timeout_sec, int notify_fd) {
260  struct bus *b = l->bus;
261 
262  BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 128,
263  "hold_response <fd:%d, seq_id:%lld>", fd, (long long)seq_id);
264 
266  if (info == NULL) {
267  BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 128,
268  "failed to get free rx_info for <fd:%d, seq_id:%lld>, dropping it",
269  fd, (long long)seq_id);
270  ListenerCmd_NotifyCaller(l, notify_fd);
271  return;
272  }
273  BUS_ASSERT(b, b->udata, info);
274  BUS_ASSERT(b, b->udata, info->state == RIS_INACTIVE);
275  BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 128,
276  "setting info %p(+%d) to hold response <fd:%d, seq_id:%lld>",
277  (void *)info, info->id, fd, (long long)seq_id);
278 
279  info->state = RIS_HOLD;
280  info->timeout_sec = timeout_sec;
281  info->u.hold.fd = fd;
282  info->u.hold.seq_id = seq_id;
283  info->u.hold.has_result = false;
284  memset(&info->u.hold.result, 0, sizeof(info->u.hold.result));
285  ListenerCmd_NotifyCaller(l, notify_fd);
286 }
287 
288 static void expect_response(listener *l, struct boxed_msg *box) {
289  struct bus *b = l->bus;
290  BUS_ASSERT(b, b->udata, box);
291  BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 128,
292  "notifying to expect response <box:%p, fd:%d, seq_id:%lld>",
293  (void *)box, box->fd, (long long)box->out_seq_id);
294 
295  /* If there's a pending HOLD message, convert it. */
297  if (info && info->state == RIS_HOLD) {
298  BUS_ASSERT(b, b->udata, info->state == RIS_HOLD);
299  if (info->u.hold.error == RX_ERROR_NONE && info->u.hold.has_result) {
300  bus_unpack_cb_res_t result = info->u.hold.result;
301 
302  BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 256,
303  "converting HOLD to EXPECT for info %d (%p) with result, attempting delivery <box:%p, fd:%d, seq_id:%lld>",
304  info->id, (void *)info,
305  (void *)box, info->u.hold.fd, (long long)info->u.hold.seq_id);
306 
307  info->state = RIS_EXPECT;
308  info->u.expect.error = RX_ERROR_READY_FOR_DELIVERY;
309  info->u.expect.box = box;
310  info->u.expect.has_result = true;
311  info->u.expect.result = result;
312  BUS_ASSERT(b, b->udata,
314 
316  } else if (info->u.hold.error != RX_ERROR_NONE) {
317  BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 256,
318  "info %p (%d) with <box:%p, fd:%d, seq_id:%lld> has error %d",
319  (void *)info, info->id,
320  (void *)box, info->u.hold.fd, (long long)info->u.hold.seq_id, info->u.hold.error);
321  rx_error_t error = info->u.hold.error;
322  bus_unpack_cb_res_t result = info->u.hold.result;
323  info->state = RIS_EXPECT;
324  info->u.expect.error = error;
325  info->u.expect.result = result;
326  info->u.expect.box = box;
328  } else {
329  BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 256,
330  "converting HOLD to EXPECT info %d (%p), attempting delivery <box:%p, fd:%d, seq_id:%lld>",
331  info->id, (void *)info,
332  (void *)box, info->u.hold.fd, (long long)info->u.hold.seq_id);
333  info->state = RIS_EXPECT;
334  info->u.expect.box = box;
335  info->u.expect.error = RX_ERROR_NONE;
336  info->u.expect.has_result = false;
337  /* Switch over to client's transferred timeout */
338  info->timeout_sec = box->timeout_sec;
339  }
340  } else if (info && info->state == RIS_EXPECT) {
341  /* Multiple identical EXPECTs should never happen, outside of
342  * memory corruption in the queue. */
343  assert(false);
344  } else {
345  /* should never happen; just drop the message */
346  }
347 }
348 
349 static void shutdown(listener *l, int notify_fd) {
350  l->shutdown_notify_fd = notify_fd;
351 }
const uint16_t id
time_t timeout_sec
Message send timeout.
Record in table for partially processed messages.
struct rx_info_t::@12::@13 hold
struct pollfd fds[1000+1]
Tracked file descriptors, for polling.
static void shutdown(listener *l, int notify_fd)
Definition: listener_cmd.c:349
struct listener_msg::@6::@7 add_socket
#define LISTENER_MSG_TAG
Arbitrary byte used to tag writes from the listener.
static void msg_handler(listener *l, listener_msg *pmsg)
Definition: listener_cmd.c:112
static void remove_socket(listener *l, int fd, int notify_fd)
Definition: listener_cmd.c:218
#define LISTENER_CMD_BUF_SIZE
A queue message, with a command in the tagged union.
bus_msg_result_t result
Result message, constructed in place after the request/response cycle has completed or failed due to ...
Receiver of responses.
listener_msg msgs[(32)]
connection_info * fd_info[1000]
The connection info, corresponding to the the file descriptors tracked in l->fds. ...
#define INCOMING_MSG_PIPE_ID
ID of the struct pollfd for the listener's incoming command pipe.
uint16_t ListenerTask_GetBackpressure(struct listener *l)
Get the current backpressure from the listener.
struct listener_msg::@6::@10 expect
Message bus.
void * udata
User data for callbacks.
struct bus * bus
ssize_t syscall_read(int fildes, void *buf, size_t nbyte)
Definition: syscall.c:39
struct listener_msg::@6::@9 hold
bool ListenerTask_GrowReadBuf(listener *l, size_t nsize)
Grow the listener's read buffer to NSIZE.
union rx_info_t::@12 u
int64_t out_seq_id
void ListenerCmd_CheckIncomingMessages(listener *l, int *res)
Process incoming commands, if any.
Definition: listener_cmd.c:73
uint16_t tracked_fds
FDs currently tracked by listener.
bus_send_status_t status
Definition: bus_types.h:216
void * udata
user connection data
void ListenerTask_ReleaseMsg(struct listener *l, listener_msg *msg)
Release a message to the listener's message pool.
void ListenerCmd_NotifyCaller(listener *l, int fd)
Notify the listener's caller that a command has completed.
Definition: listener_cmd.c:42
void ListenerTask_AttemptDelivery(listener *l, struct rx_info_t *info)
Attempt delivery of the message boxed in INFO.
Per-socket connection context.
static void hold_response(listener *l, int fd, int64_t seq_id, int16_t timeout_sec, int notify_fd)
Definition: listener_cmd.c:258
#define BUS_LOG_SNPRINTF(B, LEVEL, EVENT_KEY, UDATA, MAX_SZ, FMT,...)
Definition: bus_types.h:59
#define MAX_FDS
Max number of sockets to monitor.
struct rx_info_t::@12::@14 expect
bus_sink_cb * sink_cb
IO sink callback.
union listener_msg::@6 u
static void add_socket(listener *l, connection_info *ci, int notify_fd)
Definition: listener_cmd.c:161
rx_info_t * ListenerHelper_GetFreeRXInfo(struct listener *l)
Get a free RX_INFO record, if any are available.
#define INCOMING_MSG_PIPE
Offset to account for the first file descriptor being the incoming message pipe.
void * full_msg_buffer
Definition: bus_types.h:115
void ListenerTask_NotifyMessageFailure(listener *l, rx_info_t *info, bus_send_status_t status)
Notify the client that the event in INFO has failed with STATUS.
rx_info_t * ListenerHelper_FindInfoBySequenceID(listener *l, int fd, int64_t seq_id)
Try to find an RX_INFO record by a pair.
#define BUS_ASSERT(B, UDATA, COND)
Definition: bus_types.h:83
ssize_t syscall_write(int fildes, const void *buf, size_t nbyte)
Definition: syscall.c:35
static void swap(listener *l, int a, int b)
Definition: listener_cmd.c:148
struct listener_msg::@6::@11 shutdown
rx_info_state state
static void expect_response(listener *l, boxed_msg *box)
Definition: listener_cmd.c:288
uint16_t inactive_fds
File descriptors that are inactive due to errors, but have not yet been explicitly removed/closed by ...
struct listener_msg::@6::@8 remove_socket
int fd
Destination filename and message body.
#define BUS_LOG(B, LEVEL, EVENT_KEY, MSG, UDATA)
Definition: bus_types.h:45