kinetic-c  v0.12.0
Seagate Kinetic Protocol Client Library for C
kinetic_controller.c
Go to the documentation of this file.
1 /*
2 * kinetic-c
3 * Copyright (C) 2015 Seagate Technology.
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20 
21 #include "kinetic_controller.h"
22 #include "kinetic_session.h"
23 #include "kinetic_operation.h"
24 #include "kinetic_bus.h"
25 #include "kinetic_response.h"
26 #include "kinetic_auth.h"
27 #include "kinetic_socket.h"
28 #include "kinetic_allocator.h"
29 #include "kinetic_resourcewaiter.h"
30 #include "kinetic_logger.h"
31 #include <pthread.h>
32 #include "bus.h"
33 
34 typedef struct {
35  pthread_mutex_t receiveCompleteMutex;
36  pthread_cond_t receiveComplete;
37  bool completed;
38  KineticStatus status;
39 } DefaultCallbackData;
40 
41 static void DefaultCallback(KineticCompletionData* kinetic_data, void* client_data)
42 {
43  DefaultCallbackData * data = client_data;
44  pthread_mutex_lock(&data->receiveCompleteMutex);
45  data->status = kinetic_data->status;
46  data->completed = true;
47  pthread_cond_signal(&data->receiveComplete);
48  pthread_mutex_unlock(&data->receiveCompleteMutex);
49 }
50 
51 STATIC KineticCompletionClosure DefaultClosure(DefaultCallbackData * const data)
52 {
53  return (KineticCompletionClosure) {
55  .clientData = data,
56  };
57 }
58 
60 {
61  KINETIC_ASSERT(operation != NULL);
62  KINETIC_ASSERT(operation->session != NULL);
64  KineticSession *session = operation->session;
65 
68  }
69 
70  if (closure != NULL) {
71  operation->closure = *closure;
72  return KineticOperation_SendRequest(operation);
73  }
74  else {
75  DefaultCallbackData data;
76  pthread_mutex_init(&data.receiveCompleteMutex, NULL);
77  pthread_cond_init(&data.receiveComplete, NULL);
78  data.status = KINETIC_STATUS_INVALID;
79  data.completed = false;
80 
81  operation->closure = DefaultClosure(&data);
82 
83  // Send the request
84  status = KineticOperation_SendRequest(operation);
85 
86  if (status == KINETIC_STATUS_SUCCESS) {
87  pthread_mutex_lock(&data.receiveCompleteMutex);
88  while(data.completed == false) {
89  pthread_cond_wait(&data.receiveComplete, &data.receiveCompleteMutex);
90  }
91  status = data.status;
92  pthread_mutex_unlock(&data.receiveCompleteMutex);
93  }
94 
95  pthread_cond_destroy(&data.receiveComplete);
96  pthread_mutex_destroy(&data.receiveCompleteMutex);
97 
98  if (status != KINETIC_STATUS_SUCCESS) {
100  (void)KineticSession_Disconnect(session);
101  if (status == KINETIC_STATUS_SOCKET_ERROR) {
103  }
104  }
105  }
106 
107  return status;
108  }
109 }
110 
112 {
114 
115  switch(status) {
116  // TODO scrutinize all these mappings
117  case BUS_SEND_SUCCESS:
119  break;
120  case BUS_SEND_TX_TIMEOUT:
122  break;
123  case BUS_SEND_TX_FAILURE:
125  break;
126  case BUS_SEND_RX_TIMEOUT:
128  break;
129  case BUS_SEND_RX_FAILURE:
131  break;
134  break;
137  break;
140  break;
141  case BUS_SEND_UNDEFINED:
142  default:
143  {
144  LOGF0("bus_to_kinetic_status: UNMATCHED %d", status);
145  KINETIC_ASSERT(false);
146  return KINETIC_STATUS_INVALID;
147  }
148  }
149 
150  LOGF3("bus_to_kinetic_status: mapping status %d => %d",
151  status, res);
152  return res;
153 }
154 
155 static const char *bus_error_string(bus_send_status_t t) {
156  switch (t) {
157  default:
158  case BUS_SEND_UNDEFINED:
159  return "undefined";
160  case BUS_SEND_SUCCESS:
161  return "success";
162  case BUS_SEND_TX_TIMEOUT:
163  return "tx_timeout";
164  case BUS_SEND_TX_FAILURE:
165  return "tx_failure";
166  case BUS_SEND_RX_TIMEOUT:
167  return "rx_timeout";
168  case BUS_SEND_RX_FAILURE:
169  return "rx_failure";
171  return "bad_response";
173  return "unregistered socket";
175  return "internal timeout";
176  }
177 }
178 
180  int64_t seq_id,
181  void *bus_udata,
182  void *socket_udata)
183 {
184  KineticResponse * response = msg;
185  KineticSession* session = socket_udata;
186  bool connetionInfoReceived = false;
187  char const * statusTag = "[PDU RX STATUS]";
188  char const * unexpectedTag = "[PDU RX UNEXPECTED]";
189  char const * logTag = unexpectedTag;
190  int logAtLevel, protoLogAtLevel;
191 
192  (void)bus_udata;
193 
194  // Handle unsolicited status PDUs
196  int64_t connectionID = KineticResponse_GetConnectionID(response);
197  if (connectionID != 0)
198  {
199  // Store connectionID from unsolicited status message in the session for future requests
200  KineticSession_SetConnectionID(session, connectionID);
201  LOGF2("Extracted connection ID from unsolicited status PDU (id=%lld)", connectionID);
202  connetionInfoReceived = true;
203  logTag = statusTag;
204  logAtLevel = 2;
205  protoLogAtLevel = 3;
206  }
207  else {
208  LOG0("WARNING: Unsolicited status received. Connection being terminated by remote!");
209  logTag = statusTag;
210  logAtLevel = 0;
211  protoLogAtLevel = 0;
212  KineticStatus status = KineticResponse_GetStatus(response);
213  KineticSession_SetTerminationStatus(session, status);
214  }
215  }
216  else {
217  LOG0("WARNING: Received unexpected response!");
218  logTag = unexpectedTag;
219  logAtLevel = 0;
220  protoLogAtLevel = 0;
221  }
222 
223  KineticLogger_LogPrintf(logAtLevel, "%s pdu: %p, session: %p, bus: %p, "
224  "fd: %6d, seq: %8lld, protoLen: %8u, valueLen: %8u",
225  logTag,
226  (void*)response, (void*)session,
227  (void*)session->messageBus,
228  session->socket, (long long)seq_id,
231  KineticLogger_LogProtobuf(protoLogAtLevel, response->proto);
232 
234 
235  if (connetionInfoReceived) {
236  KineticResourceWaiter_SetAvailable(&session->connectionReady);
237  }
238 }
239 
241 {
242  KineticOperation* op = udata;
243  KINETIC_ASSERT(op);
244  KINETIC_ASSERT(op->session);
245 
247 
248  if (status == KINETIC_STATUS_SUCCESS) {
249  KineticResponse * response = res->u.response.opaque_msg;
250 
251  status = KineticResponse_GetStatus(response);
252 
253  LOGF2("[PDU RX] pdu: %p, session: %p, bus: %p, "
254  "fd: %6d, seq: %8lld, protoLen: %8u, valueLen: %8u, op: %p, status: %s",
255  (void*)response,
256  (void*)op->session, (void*)op->session->messageBus,
257  op->session->socket, response->command->header->acksequence,
260  (void*)op,
262  KineticLogger_LogHeader(3, &response->header);
263  KineticLogger_LogProtobuf(3, response->proto);
264 
265  if (op->response == NULL) {
266  op->response = response;
267  }
268  } else {
269  LOGF0("Error receiving response, got message bus error: %s", bus_error_string(res->status));
270  if (res->status == BUS_SEND_RX_TIMEOUT) {
271  LOG0("RX_TIMEOUT");
272  }
273  }
274 
275  // Call operation-specific callback, if configured
276  if (op->opCallback != NULL) {
277  status = op->opCallback(op, status);
278  }
279 
280  KineticOperation_Complete(op, status);
281 }
282 
void KineticLogger_LogHeader(int log_level, const KineticPDUHeader *header)
Operation successful.
void KineticResourceWaiter_SetAvailable(KineticResourceWaiter *const waiter)
void KineticController_HandleResult(bus_msg_result_t *res, void *udata)
Device did not respond to the operation in time.
static KineticCompletionClosure DefaultClosure(DefaultCallbackData *const data)
Com__Seagate__Kinetic__Proto__Message * proto
uint32_t KineticResponse_GetValueLength(KineticResponse *response)
uint32_t KineticResponse_GetProtobufLength(KineticResponse *response)
void KineticSession_SetTerminationStatus(KineticSession *const session, KineticStatus status)
KineticStatus KineticResponse_GetStatus(KineticResponse *response)
Closure which can be specified for operations which support asynchronous mode.
const char * Kinetic_GetStatusDescription(KineticStatus status)
Provides a string representation for a KineticStatus code.
Definition: kinetic_types.c:67
void KineticOperation_Complete(KineticOperation *op, KineticStatus status)
bus_send_status_t status
Definition: bus_types.h:216
#define KINETIC_ASSERT(cond)
A timeout occurred while waiting for a socket operation.
KineticCompletionCallback callback
Function to be called upon completion.
struct bus_msg_result_t::@3::@5 response
#define LOG0(message)
Completion data which will be provided to KineticCompletionClosure for asynchronous operations...
Com__Seagate__Kinetic__Proto__Command * command
#define LOGF3(message,...)
bus_send_status_t
Definition: bus_types.h:193
KineticPDUHeader header
#define LOGF0(message,...)
#define STATIC
The session has been terminated by the Kinetic device.
KineticStatus bus_to_kinetic_status(bus_send_status_t const status)
void KineticAllocator_FreeKineticResponse(KineticResponse *response)
void KineticLogger_LogProtobuf(int log_level, const Com__Seagate__Kinetic__Proto__Message *msg)
KineticStatus status
Resultant status of the operation.
KineticStatus KineticSession_Disconnect(KineticSession *const session)
KineticStatus KineticSession_GetTerminationStatus(KineticSession const *const session)
Status not available (no reponse/status available)
KineticStatus
Kinetic status codes.
union bus_msg_result_t::@3 u
void KineticLogger_LogPrintf(int log_level, const char *format,...)
An I/O error occurred during a socket operation.
static const char * bus_error_string(bus_send_status_t t)
static void DefaultCallback(KineticCompletionData *kinetic_data, void *client_data)
KineticStatus KineticController_ExecuteOperation(KineticOperation *operation, KineticCompletionClosure *const closure)
void KineticSession_SetConnectionID(KineticSession *const session, int64_t id)
void KineticController_HandleUnexpectedResponse(void *msg, int64_t seq_id, void *bus_udata, void *socket_udata)
int64_t KineticResponse_GetConnectionID(KineticResponse *response)
KineticStatus KineticOperation_SendRequest(KineticOperation *const op)
#define LOGF2(message,...)