kinetic-c  v0.12.0
Seagate Kinetic Protocol Client Library for C
write_file_nonblocking.c
Go to the documentation of this file.
1 /*
2 * kinetic-c
3 * Copyright (C) 2015 Seagate Technology.
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20 #include "kinetic_client.h"
21 #include "kinetic_types.h"
22 #include "byte_array.h"
23 #include <stdlib.h>
24 #include <getopt.h>
25 #include <stdio.h>
26 #include <sys/param.h>
27 #include <sys/stat.h>
28 #include <sys/file.h>
29 #include <pthread.h>
30 #include <errno.h>
31 
32 #define REPORT_ERRNO(en, msg) if(en != 0){errno = en; perror(msg);}
33 
34 struct kinetic_thread_arg {
35  char ip[16];
36  struct kinetic_put_arg* opArgs;
37  int opCount;
38 };
39 
40 typedef struct {
41  size_t opsInProgress;
42  size_t currentChunk;
43  size_t maxOverlappedChunks;
44  int fd;
45  uint64_t keyPrefix;
46  pthread_mutex_t transferMutex;
47  pthread_mutex_t completeMutex;
48  pthread_cond_t completeCond;
49  KineticStatus status;
50  KineticSession* session;
51 } FileTransferProgress;
52 
53 typedef struct {
54  KineticEntry entry;
55  uint8_t key[KINETIC_DEFAULT_KEY_LEN];
56  uint8_t value[KINETIC_OBJ_SIZE];
57  uint8_t tag[KINETIC_DEFAULT_KEY_LEN];
58  FileTransferProgress* currentTransfer;
59 } AsyncWriteClosureData;
60 
61 FileTransferProgress * start_file_transfer(KineticSession* session,
62  char const * const filename, uint64_t keyPrefix, uint32_t maxOverlappedChunks);
63 KineticStatus wait_for_put_finish(FileTransferProgress* const transfer);
64 
65 static int put_chunk_of_file(FileTransferProgress* transfer);
66 static void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* client_data);
67 
68 
69 int main(int argc, char** argv)
70 {
71  (void)argc;
72  (void)argv;
73 
74  // Initialize kinetic-c and establish session
75  KineticSession* session;
76  KineticClientConfig client_config = {
77  .logFile = "stdout",
78  .logLevel = 0,
79  };
80  KineticClient * client = KineticClient_Init(&client_config);
81  if (client == NULL) { return 1; }
82  const char HmacKeyString[] = "asdfasdf";
83  KineticSessionConfig config = {
84  .host = "localhost",
85  .port = KINETIC_PORT,
86  .clusterVersion = 0,
87  .identity = 1,
88  .hmacKey = ByteArray_CreateWithCString(HmacKeyString),
89  };
90  KineticStatus status = KineticClient_CreateSession(&config, client, &session);
91  if (status != KINETIC_STATUS_SUCCESS) {
92  fprintf(stderr, "Failed connecting to the Kinetic device w/status: %s\n",
94  return -1;
95  }
96 
97  // Create a unique/common key prefix
98  struct timeval now;
99  gettimeofday(&now, NULL);
100  uint64_t prefix = (uint64_t)now.tv_sec << sizeof(8);
101 
102  // Kick off the chained write/PUT operations and wait for completion
103  const uint32_t maxOverlappedChunks = 4;
104  const char* dataFile = "test/support/data/test.data";
105  FileTransferProgress* transfer = start_file_transfer(session, dataFile, prefix, maxOverlappedChunks);
106  printf("Waiting for transfer to complete...\n");
107  status = wait_for_put_finish(transfer);
108  if (status != KINETIC_STATUS_SUCCESS) {
109  fprintf(stderr, "Transfer failed w/status: %s\n", Kinetic_GetStatusDescription(status));
110  return -2;
111  }
112  printf("Transfer completed successfully!\n");
113 
114  // Shutdown client connection and cleanup
116  KineticClient_Shutdown(client);
117 
118  return 0;
119 }
120 
121 static int put_chunk_of_file(FileTransferProgress* transfer)
122 {
123  AsyncWriteClosureData* closureData = calloc(1, sizeof(AsyncWriteClosureData));
124  transfer->opsInProgress++;
125  closureData->currentTransfer = transfer;
126 
127  int bytesRead = read(transfer->fd, closureData->value, sizeof(closureData->value));
128  if (bytesRead > 0) {
129  transfer->currentChunk++;
130  closureData->entry = (KineticEntry){
131  .key = ByteBuffer_CreateAndAppend(closureData->key, sizeof(closureData->key),
132  &transfer->keyPrefix, sizeof(transfer->keyPrefix)),
133  .tag = ByteBuffer_CreateAndAppendFormattedCString(closureData->tag, sizeof(closureData->tag),
134  "some_value_tag..._%04d", transfer->currentChunk),
135  .algorithm = KINETIC_ALGORITHM_SHA1,
136  .value = ByteBuffer_Create(closureData->value, sizeof(closureData->value), (size_t)bytesRead),
137  .synchronization = KINETIC_SYNCHRONIZATION_WRITETHROUGH,
138  };
139  KineticStatus status = KineticClient_Put(transfer->session,
140  &closureData->entry,
142  .callback = put_chunk_of_file_finished,
143  .clientData = closureData,
144  });
145  if (status != KINETIC_STATUS_SUCCESS) {
146  transfer->opsInProgress--;
147  free(closureData);
148  fprintf(stderr, "Failed writing chunk! PUT request reported status: %s\n",
150  }
151  }
152  else if (bytesRead == 0) { // EOF reached
153  transfer->opsInProgress--;
154  free(closureData);
155  }
156  else {
157  transfer->opsInProgress--;
158  free(closureData);
159  fprintf(stderr, "Failed reading data from file!\n");
160  REPORT_ERRNO(bytesRead, "read");
161  }
162 
163  return bytesRead;
164 }
165 
166 static void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* clientData)
167 {
168  AsyncWriteClosureData* closureData = clientData;
169  FileTransferProgress* currentTransfer = closureData->currentTransfer;
170  free(closureData);
171  currentTransfer->opsInProgress--;
172 
173  if (kinetic_data->status == KINETIC_STATUS_SUCCESS) {
174  int bytesPut = put_chunk_of_file(currentTransfer);
175  if (bytesPut <= 0 && currentTransfer->opsInProgress == 0) {
176  if (currentTransfer->status == KINETIC_STATUS_NOT_ATTEMPTED) {
177  currentTransfer->status = KINETIC_STATUS_SUCCESS;
178  }
179  pthread_cond_signal(&currentTransfer->completeCond);
180  }
181  }
182  else {
183  currentTransfer->status = kinetic_data->status;
184  // only signal when finished
185  // keep track of outstanding operations
186  // if there is no more data to read (or error), and no outstanding operations,
187  // then signal
188  pthread_cond_signal(&currentTransfer->completeCond);
189  fprintf(stderr, "Failed writing chunk! PUT response reported status: %s\n",
190  Kinetic_GetStatusDescription(kinetic_data->status));
191  }
192 }
193 
194 FileTransferProgress * start_file_transfer(KineticSession* session,
195  char const * const filename, uint64_t keyPrefix, uint32_t maxOverlappedChunks)
196 {
197  FileTransferProgress * transferState = malloc(sizeof(FileTransferProgress));
198  *transferState = (FileTransferProgress) {
199  .session = session,
200  .maxOverlappedChunks = maxOverlappedChunks,
201  .keyPrefix = keyPrefix,
202  .fd = open(filename, O_RDONLY),
203  };
204  pthread_mutex_init(&transferState->transferMutex, NULL);
205  pthread_mutex_init(&transferState->completeMutex, NULL);
206  pthread_cond_init(&transferState->completeCond, NULL);
207 
208  // Start max overlapped PUT operations
209  for (size_t i = 0; i < transferState->maxOverlappedChunks; i++) {
210  put_chunk_of_file(transferState);
211  }
212  return transferState;
213 }
214 
215 KineticStatus wait_for_put_finish(FileTransferProgress* const transfer)
216 {
217  pthread_mutex_lock(&transfer->completeMutex);
218  pthread_cond_wait(&transfer->completeCond, &transfer->completeMutex);
219  pthread_mutex_unlock(&transfer->completeMutex);
220 
221  KineticStatus status = transfer->status;
222 
223  pthread_mutex_destroy(&transfer->completeMutex);
224  pthread_cond_destroy(&transfer->completeCond);
225 
226  close(transfer->fd);
227 
228  free(transfer);
229 
230  return status;
231 }
This request is made persistent before returning.
Definition: kinetic_types.h:93
Operation successful.
KineticStatus KineticClient_CreateSession(KineticSessionConfig *const config, KineticClient *const client, KineticSession **session)
Creates a session with the Kinetic Device per specified configuration.
Structure used to specify the configuration for a session.
#define KINETIC_OBJ_SIZE
Max object/value size.
Definition: kinetic_types.h:47
FileTransferProgress * start_file_transfer(KineticSession *session, char const *const filename, uint64_t keyPrefix, uint32_t maxOverlappedChunks)
KineticStatus wait_for_put_finish(FileTransferProgress *const transfer)
KineticStatus KineticClient_DestroySession(KineticSession *const session)
Closes the connection to a host.
int main(int argc, char **argv)
#define REPORT_ERRNO(en, msg)
static void put_chunk_of_file_finished(KineticCompletionData *kinetic_data, void *client_data)
Closure which can be specified for operations which support asynchronous mode.
Kinetic object instance.
char host[256]
Host name/IP address of Kinetic Device.
const char * Kinetic_GetStatusDescription(KineticStatus status)
Provides a string representation for a KineticStatus code.
Definition: kinetic_types.c:67
ByteBuffer ByteBuffer_Create(void *data, size_t max_len, size_t used)
Definition: byte_array.c:68
#define KINETIC_PORT
Default kinetic port.
Definition: kinetic_types.h:40
Completion data which will be provided to KineticCompletionClosure for asynchronous operations...
No operation has been attempted.
const char * logFile
Path to log file. Specify 'stdout' to log to STDOUT or NULL to disable logging.
KineticStatus status
Resultant status of the operation.
ByteBuffer ByteBuffer_CreateAndAppendFormattedCString(void *data, size_t max_len, const char *format,...)
Definition: byte_array.c:221
#define KINETIC_DEFAULT_KEY_LEN
Default key length.
Definition: kinetic_types.h:45
KineticStatus
Kinetic status codes.
KineticStatus KineticClient_Put(KineticSession *const session, KineticEntry *const entry, KineticCompletionClosure *closure)
Executes a PUT operation to store/update an entry on the Kinetic Device.
static int put_chunk_of_file(FileTransferProgress *transfer)
Configuration values for the KineticClient connection.
void KineticClient_Shutdown(KineticClient *const client)
Performs shutdown/cleanup of the kinetic-c client library.
KineticClient * KineticClient_Init(KineticClientConfig *config)
Initializes the Kinetic API and configures logging.
ByteBuffer ByteBuffer_CreateAndAppend(void *data, size_t max_len, const void *value, size_t value_len)
Definition: byte_array.c:83
ByteArray ByteArray_CreateWithCString(const char *str)
Definition: byte_array.c:38