kinetic-c  v0.12.0
Seagate Kinetic Protocol Client Library for C
kinetic_client.c
Go to the documentation of this file.
1 /*
2 * kinetic-c
3 * Copyright (C) 2015 Seagate Technology.
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20 
21 #include "kinetic_types_internal.h"
22 #include "kinetic_version_info.h"
23 #include "kinetic_client.h"
24 #include "kinetic_allocator.h"
25 #include "kinetic_session.h"
26 #include "kinetic_controller.h"
27 #include "kinetic_operation.h"
28 #include "kinetic_builder.h"
29 #include "kinetic_logger.h"
30 #include "kinetic_response.h"
31 #include "kinetic_bus.h"
32 #include "kinetic_memory.h"
33 #include <stdlib.h>
34 #include <sys/time.h>
35 
38  .protocolVersion = KINETIC_C_PROTOCOL_VERSION,
39  .repoCommitHash = KINETIC_C_REPO_HASH,
40 };
41 
43 {
44  return VersionInfo;
45 }
46 
47 KineticClient * KineticClient_Init(KineticClientConfig *config)
48 {
49  KineticLogger_Init(config->logFile, config->logLevel);
50  KineticClient * client = KineticCalloc(1, sizeof(*client));
51  if (client == NULL) { return NULL; }
52 
53  /* Use defaults if set to 0. */
54  if (config->readerThreads == 0) {
56  }
57  if (config->maxThreadpoolThreads == 0) {
59  }
60 
61  bool success = KineticBus_Init(client, config);
62  if (!success) {
63  KineticFree(client);
64  return NULL;
65  }
66  return client;
67 }
68 
69 void KineticClient_Shutdown(KineticClient * const client)
70 {
71  KineticBus_Shutdown(client);
72  KineticFree(client);
74 }
75 
77  KineticClient * const client, KineticSession** session)
78 {
79  if (config == NULL) {
80  LOG0("KineticSessionConfig is NULL!");
82  }
83 
84  if (session == NULL) {
85  LOG0("Pointer to KineticSession pointer is NULL!");
87  }
88 
89  if (strlen(config->host) == 0) {
90  LOG0("Host is empty!");
92  }
93 
94  if (config->hmacKey.len < 1 || config->hmacKey.data == NULL)
95  {
96  LOG0("HMAC key is NULL or empty!");
98  }
99 
100  // Create a new session
101  KineticSession* s = KineticAllocator_NewSession(client->bus, config);
102  if (s == NULL) {
103  LOG0("Failed to create session instance!");
105  }
106  KineticStatus status = KineticSession_Create(s, client);
107  if (status != KINETIC_STATUS_SUCCESS) {
108  LOG0("Failed to create session instance!");
110  return status;
111  }
112 
113  // Establish the connection
114  status = KineticSession_Connect(s);
115  if (status != KINETIC_STATUS_SUCCESS) {
116  LOGF0("Failed creating connection to %s:%d", config->host, config->port);
118  return status;
119  }
120 
121  *session = s;
122 
123  return status;
124 }
125 
126 KineticStatus KineticClient_DestroySession(KineticSession* const session)
127 {
128  if (session == NULL) {
129  LOG0("KineticSession is NULL!");
131  }
132 
133  KineticStatus status = KineticSession_Disconnect(session);
134  if (status != KINETIC_STATUS_SUCCESS) {LOG0("Disconnection failed!");}
135  KineticSession_Destroy(session);
136 
137  return status;
138 }
139 
140 KineticStatus KineticClient_GetTerminationStatus(KineticSession * const session)
141 {
142  return KineticSession_GetTerminationStatus(session);
143 }
144 
145 KineticStatus KineticClient_NoOp(KineticSession* const session)
146 {
147  KINETIC_ASSERT(session);
148 
149  KineticOperation* operation = KineticAllocator_NewOperation(session);
150  if (operation == NULL) {return KINETIC_STATUS_MEMORY_ERROR;}
151 
152  KineticBuilder_BuildNoop(operation);
153  return KineticController_ExecuteOperation(operation, NULL);
154 }
155 
156 KineticStatus KineticClient_Put(KineticSession* const session,
157  KineticEntry* const entry,
158  KineticCompletionClosure* closure)
159 {
160  KINETIC_ASSERT(session);
161  KINETIC_ASSERT(entry);
162 
163  // Assert non-NULL value upon non-zero length
164  if (entry->value.array.len > 0) {
165  KINETIC_ASSERT(entry->value.array.data);
166  }
167 
168  KineticOperation* operation = KineticAllocator_NewOperation(session);
169  if (operation == NULL) {return KINETIC_STATUS_MEMORY_ERROR;}
170  KINETIC_ASSERT(operation->session == session);
171 
172  // Initialize request
173  KineticStatus status = KineticBuilder_BuildPut(operation, entry);
174  if (status != KINETIC_STATUS_SUCCESS) {
176  return status;
177  }
178 
179  // Execute the operation
180  KineticStatus res = KineticController_ExecuteOperation(operation, closure);
181  return res;
182 }
183 
184 KineticStatus KineticClient_Flush(KineticSession* const session,
185  KineticCompletionClosure* closure)
186 {
187  KINETIC_ASSERT(session);
188 
189  KineticOperation* operation = KineticAllocator_NewOperation(session);
190  if (operation == NULL) { return KINETIC_STATUS_MEMORY_ERROR; }
191 
192  // Initialize request
193  KineticBuilder_BuildFlush(operation);
194 
195  // Execute the operation
196  return KineticController_ExecuteOperation(operation, closure);
197 }
198 
199 static bool has_key(KineticEntry* const entry)
200 {
201  return entry->key.array.data != NULL;
202 }
203 
204 static bool has_value_buffer(KineticEntry* const entry)
205 {
206  return entry->value.array.data != NULL;
207 }
208 
209 typedef enum {
213 } GET_COMMAND;
214 
216  KineticSession* const session,
217  KineticEntry* const entry,
218  KineticCompletionClosure* closure)
219 {
220  KINETIC_ASSERT(session);
221  KINETIC_ASSERT(entry);
222 
223  if (!has_key(entry)) {return KINETIC_STATUS_MISSING_KEY;}
224  if (!has_value_buffer(entry) && !entry->metadataOnly) {
226  }
227 
228  KineticOperation* operation = KineticAllocator_NewOperation(session);
229  if (operation == NULL) {
231  }
232 
233  // Initialize request
234  switch (cmd)
235  {
236  case CMD_GET:
237  KineticBuilder_BuildGet(operation, entry);
238  break;
239  case CMD_GET_NEXT:
240  KineticBuilder_BuildGetNext(operation, entry);
241  break;
242  case CMD_GET_PREVIOUS:
243  KineticBuilder_BuildGetPrevious(operation, entry);
244  break;
245  default:
246  KINETIC_ASSERT(false);
247  }
248 
249  // Execute the operation
250  return KineticController_ExecuteOperation(operation, closure);
251 }
252 
253 KineticStatus KineticClient_Get(KineticSession* const session,
254  KineticEntry* const entry,
255  KineticCompletionClosure* closure)
256 {
257  return handle_get_command(CMD_GET, session, entry, closure);
258 }
259 
260 KineticStatus KineticClient_GetPrevious(KineticSession* const session,
261  KineticEntry* const entry,
262  KineticCompletionClosure* closure)
263 {
264  return handle_get_command(CMD_GET_PREVIOUS, session, entry, closure);
265 }
266 
267 KineticStatus KineticClient_GetNext(KineticSession* const session,
268  KineticEntry* const entry,
269  KineticCompletionClosure* closure)
270 {
271  return handle_get_command(CMD_GET_NEXT, session, entry, closure);
272 }
273 
274 KineticStatus KineticClient_Delete(KineticSession* const session,
275  KineticEntry* const entry,
276  KineticCompletionClosure* closure)
277 {
278  KINETIC_ASSERT(session);
279  KINETIC_ASSERT(entry);
280 
281  KineticOperation* operation = KineticAllocator_NewOperation(session);
282  if (operation == NULL) {return KINETIC_STATUS_MEMORY_ERROR;}
283 
284  // Initialize request
285  KineticBuilder_BuildDelete(operation, entry);
286 
287  // Execute the operation
288  return KineticController_ExecuteOperation(operation, closure);
289 }
290 
291 KineticStatus KineticClient_GetKeyRange(KineticSession* const session,
292  KineticKeyRange* range,
293  ByteBufferArray* keys,
294  KineticCompletionClosure* closure)
295 {
296  KINETIC_ASSERT(session);
297  KINETIC_ASSERT(range);
298  KINETIC_ASSERT(keys);
299  KINETIC_ASSERT(keys->buffers);
300  KINETIC_ASSERT(keys->count > 0);
301 
302  KineticOperation* operation = KineticAllocator_NewOperation(session);
303  if (operation == NULL) {return KINETIC_STATUS_MEMORY_ERROR;}
304 
305  // Initialize request
306  KineticBuilder_BuildGetKeyRange(operation, range, keys);
307 
308  // Execute the operation
309  return KineticController_ExecuteOperation(operation, closure);
310 }
311 
312 KineticStatus KineticClient_P2POperation(KineticSession* const session,
313  KineticP2P_Operation* const p2pOp,
314  KineticCompletionClosure* closure)
315 {
316  KINETIC_ASSERT(session);
317  KINETIC_ASSERT(p2pOp);
318 
319  KineticOperation* operation = KineticAllocator_NewOperation(session);
320  if (operation == NULL) {return KINETIC_STATUS_MEMORY_ERROR;}
321 
322  // Initialize request
323  KineticStatus status = KineticBuilder_BuildP2POperation(operation, p2pOp);
324  if (status != KINETIC_STATUS_SUCCESS) {
325  // TODO we need to find a more generic way to handle errors on command construction
326  if (closure != NULL) {
327  operation->closure = *closure;
328  }
329  return status;
330  }
331 
332  // Execute the operation
333  return KineticController_ExecuteOperation(operation, closure);
334 }
#define KINETIC_C_VERSION
uint8_t maxThreadpoolThreads
Max number of threads to use for the threadpool that handles response callbacks.
void * KineticCalloc(size_t count, size_t size)
static bool has_value_buffer(KineticEntry *const entry)
KineticStatus KineticClient_DestroySession(KineticSession *const session)
Closes the connection to a host.
KineticStatus KineticClient_Put(KineticSession *const session, KineticEntry *const entry, KineticCompletionClosure *closure)
Executes a PUT operation to store/update an entry on the Kinetic Device.
bool KineticBus_Init(KineticClient *client, KineticClientConfig *config)
Definition: kinetic_bus.c:242
int logLevel
Logging level (-1:none, 0:error, 1:info, 2:verbose, 3:full)
Operation successful.
KineticStatus KineticBuilder_BuildGetPrevious(KineticOperation *const op, KineticEntry *const entry)
static KineticStatus handle_get_command(GET_COMMAND cmd, KineticSession *const session, KineticEntry *const entry, KineticCompletionClosure *closure)
GET_COMMAND
KineticStatus KineticSession_Connect(KineticSession *const session)
Structure used to specify the configuration for a session.
KineticStatus KineticClient_Get(KineticSession *const session, KineticEntry *const entry, KineticCompletionClosure *closure)
Executes a GET operation to retrieve an entry from the Kinetic Device.
void KineticAllocator_FreeOperation(KineticOperation *operation)
KineticStatus KineticSession_Create(KineticSession *const session, KineticClient *const client)
KineticStatus KineticClient_GetNext(KineticSession *const session, KineticEntry *const entry, KineticCompletionClosure *closure)
Executes a GETNEXT operation to retrieve the next entry from the Kinetic Device.
ByteBuffer * buffers
Definition: byte_array.h:59
KineticStatus KineticClient_NoOp(KineticSession *const session)
Executes a NOOP operation to test whether the Kinetic Device is operational.
Host was empty in request.
bool metadataOnly
If set for a GET request, will return only the metadata for the specified object (value will not be r...
void KineticAllocator_FreeSession(KineticSession *session)
KineticStatus KineticClient_Flush(KineticSession *const session, KineticCompletionClosure *closure)
Executes a FLUSHALLDATA operation to flush pending PUTs or DELETEs.
Failed allocating/deallocating memory.
#define KINETIC_CLIENT_DEFAULT_MAX_THREADPOOL_THREADS
const char * version
kinetic-c library version
Definition: kinetic_types.h:65
ByteArray array
ByteArray holding allocated array w/length = allocated size.
Definition: byte_array.h:54
Closure which can be specified for operations which support asynchronous mode.
Kinetic object instance.
char host[256]
Host name/IP address of Kinetic Device.
void KineticClient_Shutdown(KineticClient *const client)
Performs shutdown/cleanup of the kinetic-c client library.
KineticStatus KineticBuilder_BuildPut(KineticOperation *const op, KineticEntry *const entry)
kinetic-c library version info (returned from KineticClient_Version())
Definition: kinetic_types.h:64
void KineticBus_Shutdown(KineticClient *const client)
Definition: kinetic_bus.c:268
Kinetic Key Range request structure.
KineticStatus KineticBuilder_BuildGet(KineticOperation *const op, KineticEntry *const entry)
static bool has_key(KineticEntry *const entry)
#define KINETIC_ASSERT(cond)
KineticStatus KineticBuilder_BuildNoop(KineticOperation *const op)
KineticVersionInfo KineticClient_Version(void)
Gets current version info of kinetic-c library.
KineticStatus KineticClient_Delete(KineticSession *const session, KineticEntry *const entry, KineticCompletionClosure *closure)
Executes a DELETE operation to delete an entry from the Kinetic Device.
uint8_t readerThreads
Number of threads used for handling incoming responses and status messages.
KineticStatus KineticClient_P2POperation(KineticSession *const session, KineticP2P_Operation *const p2pOp, KineticCompletionClosure *closure)
Executes a PEER2PEERPUSH operation allows a client to instruct a Kinetic Device to copy a set of keys...
void KineticFree(void *pointer)
KineticStatus KineticBuilder_BuildGetKeyRange(KineticOperation *const op, KineticKeyRange *range, ByteBufferArray *buffers)
#define LOG0(message)
KineticClient * KineticClient_Init(KineticClientConfig *config)
Initializes the Kinetic API and configures logging.
int port
Port for Kinetic Device session.
size_t len
Number of bytes in the data field.
Definition: byte_array.h:35
Session was NULL in request.
#define LOGF0(message,...)
KineticOperation * KineticAllocator_NewOperation(KineticSession *const session)
const char * logFile
Path to log file. Specify 'stdout' to log to STDOUT or NULL to disable logging.
#define KINETIC_CLIENT_DEFAULT_READER_THREADS
KineticStatus KineticClient_CreateSession(KineticSessionConfig *const config, KineticClient *const client, KineticSession **session)
Creates a session with the Kinetic Device per specified configuration.
KineticStatus KineticBuilder_BuildGetNext(KineticOperation *const op, KineticEntry *const entry)
ByteBuffer key
Key associated with the object stored on disk.
uint8_t * data
Pointer to an allocated array of data bytes.
Definition: byte_array.h:36
An operation is missing a required key.
KineticStatus KineticSession_Disconnect(KineticSession *const session)
KineticStatus KineticSession_GetTerminationStatus(KineticSession const *const session)
static const KineticVersionInfo VersionInfo
KineticStatus KineticClient_GetPrevious(KineticSession *const session, KineticEntry *const entry, KineticCompletionClosure *closure)
Executes a GETPREVIOUS operation to retrieve the next entry from the Kinetic Device.
KineticStatus
Kinetic status codes.
#define KINETIC_C_REPO_HASH
KineticSession * KineticAllocator_NewSession(struct bus *b, KineticSessionConfig *config)
void KineticLogger_Close(void)
#define KINETIC_C_PROTOCOL_VERSION
Configuration values for the KineticClient connection.
KineticStatus KineticController_ExecuteOperation(KineticOperation *operation, KineticCompletionClosure *const closure)
KineticStatus KineticClient_GetTerminationStatus(KineticSession *const session)
Returns the reason reported in the case of the Kinetic device terminating a session in the case of a ...
KineticStatus KineticBuilder_BuildP2POperation(KineticOperation *const op, KineticP2P_Operation *const p2pOp)
KineticStatus KineticBuilder_BuildDelete(KineticOperation *const op, KineticEntry *const entry)
KineticStatus KineticBuilder_BuildFlush(KineticOperation *const op)
KineticStatus KineticSession_Destroy(KineticSession *const session)
An operation is missing a required value buffer.
HMAC key is empty or NULL.
void KineticLogger_Init(const char *log_file, int log_level)
KineticStatus KineticClient_GetKeyRange(KineticSession *const session, KineticKeyRange *range, ByteBufferArray *keys, KineticCompletionClosure *closure)
Executes a GETKEYRANGE operation to retrieve a set of keys in the range specified range from the Kine...
ByteBuffer value
Value data associated with the key.
Session configuration was invalid or NULL.