kinetic-c  v0.12.0
Seagate Kinetic Protocol Client Library for C
listener.c
Go to the documentation of this file.
1 /*
2 * kinetic-c
3 * Copyright (C) 2015 Seagate Technology.
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20 #include <stdio.h>
21 #include <stdint.h>
22 #include <string.h>
23 #include <unistd.h>
24 #include <errno.h>
25 #include <sys/time.h>
26 #include <assert.h>
27 #include <err.h>
28 #include <time.h>
29 
30 #include "bus_internal_types.h"
31 #include "listener.h"
32 #include "listener_helper.h"
33 #include "listener_cmd.h"
34 #include "listener_task.h"
35 #include "listener_internal.h"
36 #include "syscall.h"
37 #include "util.h"
38 
39 struct listener *Listener_Init(struct bus *b, struct bus_config *cfg) {
40  struct listener *l = calloc(1, sizeof(*l));
41  if (l == NULL) { return NULL; }
42 
43  assert(b);
44  l->bus = b;
45  BUS_LOG(b, 2, LOG_LISTENER, "init", b->udata);
46 
47  int pipes[2];
48  if (0 != pipe(pipes)) {
49  free(l);
50  return NULL;
51  }
52 
53  l->commit_pipe = pipes[1];
54  l->incoming_msg_pipe = pipes[0];
56  l->fds[INCOMING_MSG_PIPE_ID].events = POLLIN;
58 
59  for (int i = MAX_PENDING_MESSAGES - 1; i >= 0; i--) {
60  rx_info_t *info = &l->rx_info[i];
61  info->state = RIS_INACTIVE;
62 
63  uint16_t *p_id = (uint16_t *)&info->id;
64  info->next = l->rx_info_freelist;
65  l->rx_info_freelist = info;
66  *p_id = i;
67  }
68 
69  for (int pipe_count = 0; pipe_count < MAX_QUEUE_MESSAGES; pipe_count++) {
70  listener_msg *msg = &l->msgs[pipe_count];
71  uint8_t *p_id = (uint8_t *)&msg->id;
72  *p_id = pipe_count; /* Set (const) ID. */
73 
74  if (0 != pipe(msg->pipes)) {
75  for (int i = 0; i < pipe_count; i++) {
76  msg = &l->msgs[i];
77  syscall_close(msg->pipes[0]);
78  syscall_close(msg->pipes[1]);
79  }
82  free(l);
83  return NULL;
84  }
85  msg->next = l->msg_freelist;
86  l->msg_freelist = msg;
87  }
88  l->rx_info_max_used = 0;
89 
90  (void)cfg;
91  return l;
92 }
93 
95  connection_info *ci, int *notify_fd) {
97  if (msg == NULL) { return false; }
98 
99  msg->type = MSG_ADD_SOCKET;
100  msg->u.add_socket.info = ci;
101  msg->u.add_socket.notify_fd = msg->pipes[1];
102  return ListenerHelper_PushMessage(l, msg, notify_fd);
103 }
104 
105 bool Listener_RemoveSocket(struct listener *l, int fd, int *notify_fd) {
107  if (msg == NULL) { return false; }
108 
109  msg->type = MSG_REMOVE_SOCKET;
110  msg->u.remove_socket.fd = fd;
111  msg->u.remove_socket.notify_fd = msg->pipes[1];
112  return ListenerHelper_PushMessage(l, msg, notify_fd);
113 }
114 
115 bool Listener_HoldResponse(struct listener *l, int fd,
116  int64_t seq_id, int16_t timeout_sec, int *notify_fd) {
118  struct bus *b = l->bus;
119  if (msg == NULL) {
120  BUS_LOG(b, 1, LOG_LISTENER, "OUT OF MESSAGES", b->udata);
121  return false;
122  }
123 
124  BUS_LOG_SNPRINTF(b, 5, LOG_MEMORY, b->udata, 128,
125  "Listener_HoldResponse with <fd:%d, seq_id:%lld>",
126  fd, (long long)seq_id);
127 
128  msg->type = MSG_HOLD_RESPONSE;
129  msg->u.hold.fd = fd;
130  msg->u.hold.seq_id = seq_id;
131  msg->u.hold.timeout_sec = timeout_sec;
132  msg->u.hold.notify_fd = msg->pipes[1];
133 
134  bool pm_res = ListenerHelper_PushMessage(l, msg, notify_fd);
135  if (!pm_res) {
136  BUS_LOG_SNPRINTF(b, 0, LOG_MEMORY, b->udata, 128,
137  "Listener_HoldResponse with <fd:%d, seq_id:%lld> FAILED",
138  fd, (long long)seq_id);
139  }
140  return pm_res;
141 }
142 
144  uint16_t *backpressure) {
146  struct bus *b = l->bus;
147  if (msg == NULL) {
148  BUS_LOG_SNPRINTF(b, 0, LOG_MEMORY, b->udata, 128,
149  "! ListenerHelper_GetFreeMsg fail %p", (void*)box);
150  return false;
151  }
152 
153  BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128,
154  "Listener_ExpectResponse with box of %p, seq_id:%lld",
155  (void*)box, (long long)box->out_seq_id);
156 
157  msg->type = MSG_EXPECT_RESPONSE;
158  msg->u.expect.box = box;
159  *backpressure = ListenerTask_GetBackpressure(l);
161 
162  bool pm = ListenerHelper_PushMessage(l, msg, NULL);
163  if (!pm) {
164  BUS_LOG_SNPRINTF(b, 0, LOG_MEMORY, b->udata, 128,
165  "! ListenerHelper_PushMessage fail %p", (void*)box);
166  }
167  return pm;
168 }
169 
170 bool Listener_Shutdown(struct listener *l, int *notify_fd) {
172  if (msg == NULL) { return false; }
173 
174  msg->type = MSG_SHUTDOWN;
175  msg->u.shutdown.notify_fd = msg->pipes[1];
176  return ListenerHelper_PushMessage(l, msg, notify_fd);
177 }
178 
179 void Listener_Free(struct listener *l) {
180  if (l) {
181  struct bus *b = l->bus;
182  /* Thread has joined but data has not been freed yet. */
184  for (int i = 0; i < MAX_PENDING_MESSAGES; i++) {
185  rx_info_t *info = &l->rx_info[i];
186 
187  switch (info->state) {
188  case RIS_INACTIVE:
189  break;
190  case RIS_HOLD:
191  break;
192  case RIS_EXPECT:
193  if (info->u.expect.box) {
194  /* TODO: This can leak memory, since the caller's
195  * callback is not being called. It should be called
196  * with BUS_SEND_RX_FAILURE, if it's safe to do so. */
197  free(info->u.expect.box);
198  info->u.expect.box = NULL;
199  }
200  break;
201  default:
202  BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64,
203  "match fail %d on line %d", info->state, __LINE__);
204  BUS_ASSERT(b, b->udata, false);
205  }
206  }
207 
208  for (int i = 0; i < MAX_QUEUE_MESSAGES; i++) {
209  listener_msg *msg = &l->msgs[i];
210  switch (msg->type) {
211  case MSG_ADD_SOCKET:
212  ListenerCmd_NotifyCaller(l, msg->u.add_socket.notify_fd);
213  break;
214  case MSG_REMOVE_SOCKET:
215  ListenerCmd_NotifyCaller(l, msg->u.remove_socket.notify_fd);
216  break;
217  case MSG_EXPECT_RESPONSE:
218  if (msg->u.expect.box) { free(msg->u.expect.box); }
219  break;
220  default:
221  break;
222  }
223 
224  syscall_close(msg->pipes[0]);
225  syscall_close(msg->pipes[1]);
226  }
227 
228  if (l->read_buf) {
229  free(l->read_buf);
230  }
231 
234 
235  free(l);
236  }
237 }
238 
const uint16_t id
bool Listener_HoldResponse(struct listener *l, int fd, int64_t seq_id, int16_t timeout_sec, int *notify_fd)
The client is about to start a write, the listener should hold on to the response (with timeout) if i...
Definition: listener.c:115
#define MAX_QUEUE_MESSAGES
Max number of unprocessed queue messages.
Record in table for partially processed messages.
listener_msg * msg_freelist
struct pollfd fds[1000+1]
Tracked file descriptors, for polling.
struct listener_msg::@6::@7 add_socket
A queue message, with a command in the tagged union.
bool Listener_RemoveSocket(struct listener *l, int fd, int *notify_fd)
Definition: listener.c:105
bus_msg_result_t result
Result message, constructed in place after the request/response cycle has completed or failed due to ...
int syscall_close(int fd)
Definition: syscall.c:31
Receiver of responses.
listener_msg msgs[(32)]
struct listener * Listener_Init(struct bus *b, struct bus_config *cfg)
Initialize the listener.
Definition: listener.c:39
#define INCOMING_MSG_PIPE_ID
ID of the struct pollfd for the listener's incoming command pipe.
uint16_t ListenerTask_GetBackpressure(struct listener *l)
Get the current backpressure from the listener.
bool ListenerHelper_PushMessage(struct listener *l, listener_msg *msg, int *reply_fd)
Push a message into the listener's message queue.
struct listener_msg::@6::@10 expect
Message bus.
bool Listener_AddSocket(struct listener *l, connection_info *ci, int *notify_fd)
Add/remove sockets' metadata from internal info.
Definition: listener.c:94
void * udata
User data for callbacks.
struct listener_msg * next
struct bus * bus
struct listener_msg::@6::@9 hold
union rx_info_t::@12 u
int64_t out_seq_id
struct rx_info_t * next
rx_info_t * rx_info_freelist
#define LISTENER_SHUTDOWN_COMPLETE_FD
bus_send_status_t status
Definition: bus_types.h:216
void ListenerCmd_NotifyCaller(listener *l, int fd)
Notify the listener's caller that a command has completed.
Definition: listener_cmd.c:42
rx_info_t rx_info[(1024)]
Per-socket connection context.
uint16_t rx_info_max_used
#define BUS_LOG_SNPRINTF(B, LEVEL, EVENT_KEY, UDATA, MAX_SZ, FMT,...)
Definition: bus_types.h:59
bool Listener_Shutdown(struct listener *l, int *notify_fd)
Shut down the listener.
Definition: listener.c:170
struct rx_info_t::@12::@14 expect
union listener_msg::@6 u
void Listener_Free(struct listener *l)
Free the listener, which must already be shut down.
Definition: listener.c:179
bool Listener_ExpectResponse(struct listener *l, boxed_msg *box, uint16_t *backpressure)
The client has finished a write, the listener should expect a response.
Definition: listener.c:143
#define BUS_ASSERT(B, UDATA, COND)
Definition: bus_types.h:83
struct listener_msg::@6::@11 shutdown
rx_info_state state
#define LISTENER_NO_FD
Sentinel values used for listener.shutdown_notify_fd.
struct listener_msg::@6::@8 remove_socket
listener_msg * ListenerHelper_GetFreeMsg(listener *l)
Get a free message from the listener's message pool.
#define MAX_PENDING_MESSAGES
#define BUS_LOG(B, LEVEL, EVENT_KEY, MSG, UDATA)
Definition: bus_types.h:45