kinetic-c  v0.12.0
Seagate Kinetic Protocol Client Library for C
write_file_blocking_threads.c
Go to the documentation of this file.
1 /*
2 * kinetic-c
3 * Copyright (C) 2015 Seagate Technology.
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20 
21 #include "kinetic_client.h"
22 #include "kinetic_types.h"
23 #include "byte_array.h"
24 #include <stdlib.h>
25 #include <getopt.h>
26 #include <stdio.h>
27 #include <sys/param.h>
28 #include <sys/stat.h>
29 #include <sys/file.h>
30 #include <pthread.h>
31 #include <errno.h>
32 
33 #define NUM_FILES (3)
34 
35 #define REPORT_ERRNO(en, msg) if(en != 0){errno = en; perror(msg);}
36 
37 typedef struct {
38  pthread_t threadID;
39  char ip[16];
40  KineticSession* session;
41  char keyPrefix[KINETIC_DEFAULT_KEY_LEN];
42  uint8_t key[KINETIC_DEFAULT_KEY_LEN];
43  uint8_t version[KINETIC_DEFAULT_KEY_LEN];
44  uint8_t tag[KINETIC_DEFAULT_KEY_LEN];
45  uint8_t value[KINETIC_OBJ_SIZE];
46  KineticEntry entry;
47  ByteBuffer data;
48  KineticStatus status;
49 } write_args;
50 
51 void* store_data(void* args)
52 {
53  write_args* thread_args = (write_args*)args;
54  KineticEntry* entry = &(thread_args->entry);
55  int32_t objIndex;
56  for (objIndex = 0; ByteBuffer_BytesRemaining(thread_args->data) > 0; objIndex++) {
57 
58  // Configure meta-data
59  char keySuffix[8];
60  snprintf(keySuffix, sizeof(keySuffix), "%02d", objIndex);
61  entry->key.bytesUsed = strlen(thread_args->keyPrefix);
62  ByteBuffer_AppendCString(&entry->key, keySuffix);
64 
65  // Prepare the next chunk of data to store
66  ByteBuffer_AppendArray(&entry->value, ByteBuffer_Consume(&thread_args->data, KINETIC_OBJ_SIZE));
67 
68  // Store the data slice
69  KineticStatus status = KineticClient_Put(thread_args->session, entry, NULL);
70  if (status != KINETIC_STATUS_SUCCESS) {
71  fprintf(stderr, "Failed writing entry %d to disk w/status: %s",
72  objIndex+1, Kinetic_GetStatusDescription(status));
73  return (void*)NULL;
74  }
75  }
76  printf("File stored successfully to Kinetic device across %d entries!\n", objIndex);
77  return (void*)NULL;
78 }
79 
80 int main(int argc, char** argv)
81 {
82  (void)argc;
83  (void)argv;
84 
85  // Initialize kinetic-c and configure sessions
86  KineticSession* session;
87  KineticClientConfig clientConfig = {
88  .logFile = "stdout",
89  .logLevel = 1,
90  };
91  KineticClient * client = KineticClient_Init(&clientConfig);
92  if (client == NULL) { return 1; }
93  const char HmacKeyString[] = "asdfasdf";
94  KineticSessionConfig sessionConfig = {
95  .host = "localhost",
96  .port = KINETIC_PORT,
97  .clusterVersion = 0,
98  .identity = 1,
99  .hmacKey = ByteArray_CreateWithCString(HmacKeyString),
100  };
101  KineticStatus status = KineticClient_CreateSession(&sessionConfig, client, &session);
102  if (status != KINETIC_STATUS_SUCCESS) {
103  fprintf(stderr, "Failed connecting to the Kinetic device w/status: %s\n",
105  return -1;
106  }
107 
108  // Read in file contents to store
109  const char* dataFile = "test/support/data/test.data";
110  struct stat st;
111  stat(dataFile, &st);
112  char* buf = malloc(st.st_size);
113  int fd = open(dataFile, O_RDONLY);
114  long dataLen = read(fd, buf, st.st_size);
115  close(fd);
116  if (dataLen <= 0) {
117  fprintf(stderr, "Failed reading data file to store: %s\n", dataFile);
118  exit(-1);
119  }
120 
121  write_args* writeArgs = calloc(NUM_FILES, sizeof(write_args));
122  if (writeArgs == NULL) {
123  fprintf(stderr, "Failed allocating overlapped thread arguments!\n");
124  }
125 
126  // Kick off a thread for each file to store
127  for (int i = 0; i < NUM_FILES; i++) {
128 
129  // Establish connection
130  status = KineticClient_CreateSession(&sessionConfig, client, &writeArgs[i].session);
131  if (status != KINETIC_STATUS_SUCCESS) {
132  fprintf(stderr, "Failed connecting to the Kinetic device w/status: %s\n",
134  return -1;
135  }
136  strcpy(writeArgs[i].ip, sessionConfig.host);
137 
138  // Create a ByteBuffer for consuming chunks of data out of for overlapped PUTs
139  writeArgs[i].data = ByteBuffer_Create(buf, dataLen, 0);
140 
141  // Configure common entry attributes
142  struct timeval now;
143  gettimeofday(&now, NULL);
144  snprintf(writeArgs[i].keyPrefix, sizeof(writeArgs[i].keyPrefix), "%010llu_%02d_",
145  (unsigned long long)now.tv_sec, i);
146  ByteBuffer valBuf = ByteBuffer_Create(writeArgs[i].value, sizeof(writeArgs[i].value), 0);
147  writeArgs[i].entry = (KineticEntry) {
149  writeArgs[i].key, sizeof(writeArgs[i].key), writeArgs[i].keyPrefix),
151  writeArgs[i].tag, sizeof(writeArgs[i].tag), "some_value_tag..."),
152  .algorithm = KINETIC_ALGORITHM_SHA1,
153  .value = valBuf,
154  };
155 
156  // Store the entry
157  int threadCreateStatus = pthread_create(&writeArgs[i].threadID, NULL, store_data, &writeArgs[i]);
158  REPORT_ERRNO(threadCreateStatus, "pthread_create");
159  if (threadCreateStatus != 0) {
160  fprintf(stderr, "pthread create failed!\n");
161  exit(-2);
162  }
163  }
164 
165  // Wait for all PUT operations to complete and cleanup
166  for (int i = 0; i < NUM_FILES; i++) {
167  int joinStatus = pthread_join(writeArgs[i].threadID, NULL);
168  if (joinStatus != 0) {
169  fprintf(stderr, "pthread join failed!\n");
170  }
171  KineticClient_DestroySession(writeArgs[i].session);
172  }
173 
174  // Shutdown client connection and cleanup
175  KineticClient_Shutdown(client);
176  free(writeArgs);
177  free(buf);
178 
179  return 0;
180 }
Structure for an embedded ByteArray as a buffer.
Definition: byte_array.h:53
Operation successful.
KineticStatus KineticClient_CreateSession(KineticSessionConfig *const config, KineticClient *const client, KineticSession **session)
Creates a session with the Kinetic Device per specified configuration.
Structure used to specify the configuration for a session.
#define KINETIC_OBJ_SIZE
Max object/value size.
Definition: kinetic_types.h:47
ByteBuffer * ByteBuffer_AppendArray(ByteBuffer *buffer, const ByteArray array)
Definition: byte_array.c:149
#define REPORT_ERRNO(en, msg)
KineticStatus KineticClient_DestroySession(KineticSession *const session)
Closes the connection to a host.
Kinetic object instance.
char host[256]
Host name/IP address of Kinetic Device.
ByteBuffer * ByteBuffer_AppendCString(ByteBuffer *buffer, const char *data)
Definition: byte_array.c:176
const char * Kinetic_GetStatusDescription(KineticStatus status)
Provides a string representation for a KineticStatus code.
Definition: kinetic_types.c:67
ByteBuffer ByteBuffer_Create(void *data, size_t max_len, size_t used)
Definition: byte_array.c:68
#define KINETIC_PORT
Default kinetic port.
Definition: kinetic_types.h:40
They can be made persistent when the drive chooses, or when a subsequent FLUSH is sent to the drive...
Definition: kinetic_types.h:97
#define NUM_FILES
long ByteBuffer_BytesRemaining(const ByteBuffer buffer)
Definition: byte_array.c:111
const char * logFile
Path to log file. Specify 'stdout' to log to STDOUT or NULL to disable logging.
ByteBuffer key
Key associated with the object stored on disk.
ByteArray ByteBuffer_Consume(ByteBuffer *buffer, size_t max_len)
Definition: byte_array.c:117
#define KINETIC_DEFAULT_KEY_LEN
Default key length.
Definition: kinetic_types.h:45
KineticStatus
Kinetic status codes.
KineticStatus KineticClient_Put(KineticSession *const session, KineticEntry *const entry, KineticCompletionClosure *closure)
Executes a PUT operation to store/update an entry on the Kinetic Device.
Configuration values for the KineticClient connection.
void KineticClient_Shutdown(KineticClient *const client)
Performs shutdown/cleanup of the kinetic-c client library.
KineticClient * KineticClient_Init(KineticClientConfig *config)
Initializes the Kinetic API and configures logging.
size_t bytesUsed
Reflects the number of bytes used from the array
Definition: byte_array.h:55
int main(int argc, char **argv)
KineticSynchronization synchronization
Synchronization method to use for PUT/DELETE requests.
void * store_data(void *args)
ByteArray ByteArray_CreateWithCString(const char *str)
Definition: byte_array.c:38
ByteBuffer ByteBuffer_CreateAndAppendCString(void *data, size_t max_len, const char *value)
Definition: byte_array.c:97
ByteBuffer value
Value data associated with the key.