26 #include <sys/param.h>
32 #define REPORT_ERRNO(en, msg) if(en != 0){errno = en; perror(msg);}
34 struct kinetic_thread_arg {
36 struct kinetic_put_arg* opArgs;
43 size_t maxOverlappedChunks;
46 pthread_mutex_t transferMutex;
47 pthread_mutex_t completeMutex;
48 pthread_cond_t completeCond;
50 KineticSession* session;
51 } FileTransferProgress;
58 FileTransferProgress* currentTransfer;
59 } AsyncWriteClosureData;
62 char const *
const filename, uint64_t keyPrefix, uint32_t maxOverlappedChunks);
69 int main(
int argc,
char** argv)
75 KineticSession* session;
81 if (client == NULL) {
return 1; }
82 const char HmacKeyString[] =
"asdfasdf";
92 fprintf(stderr,
"Failed connecting to the Kinetic device w/status: %s\n",
99 gettimeofday(&now, NULL);
100 uint64_t prefix = (uint64_t)now.tv_sec <<
sizeof(8);
103 const uint32_t maxOverlappedChunks = 4;
104 const char* dataFile =
"test/support/data/test.data";
105 FileTransferProgress* transfer =
start_file_transfer(session, dataFile, prefix, maxOverlappedChunks);
106 printf(
"Waiting for transfer to complete...\n");
112 printf(
"Transfer completed successfully!\n");
123 AsyncWriteClosureData* closureData = calloc(1,
sizeof(AsyncWriteClosureData));
124 transfer->opsInProgress++;
125 closureData->currentTransfer = transfer;
127 int bytesRead = read(transfer->fd, closureData->value,
sizeof(closureData->value));
129 transfer->currentChunk++;
132 &transfer->keyPrefix,
sizeof(transfer->keyPrefix)),
134 "some_value_tag..._%04d", transfer->currentChunk),
136 .value =
ByteBuffer_Create(closureData->value,
sizeof(closureData->value), (
size_t)bytesRead),
142 .callback = put_chunk_of_file_finished,
143 .clientData = closureData,
146 transfer->opsInProgress--;
148 fprintf(stderr,
"Failed writing chunk! PUT request reported status: %s\n",
152 else if (bytesRead == 0) {
153 transfer->opsInProgress--;
157 transfer->opsInProgress--;
159 fprintf(stderr,
"Failed reading data from file!\n");
168 AsyncWriteClosureData* closureData = clientData;
169 FileTransferProgress* currentTransfer = closureData->currentTransfer;
171 currentTransfer->opsInProgress--;
175 if (bytesPut <= 0 && currentTransfer->opsInProgress == 0) {
179 pthread_cond_signal(¤tTransfer->completeCond);
183 currentTransfer->status = kinetic_data->
status;
188 pthread_cond_signal(¤tTransfer->completeCond);
189 fprintf(stderr,
"Failed writing chunk! PUT response reported status: %s\n",
195 char const *
const filename, uint64_t keyPrefix, uint32_t maxOverlappedChunks)
197 FileTransferProgress * transferState = malloc(
sizeof(FileTransferProgress));
198 *transferState = (FileTransferProgress) {
200 .maxOverlappedChunks = maxOverlappedChunks,
201 .keyPrefix = keyPrefix,
202 .fd = open(filename, O_RDONLY),
204 pthread_mutex_init(&transferState->transferMutex, NULL);
205 pthread_mutex_init(&transferState->completeMutex, NULL);
206 pthread_cond_init(&transferState->completeCond, NULL);
209 for (
size_t i = 0; i < transferState->maxOverlappedChunks; i++) {
212 return transferState;
217 pthread_mutex_lock(&transfer->completeMutex);
218 pthread_cond_wait(&transfer->completeCond, &transfer->completeMutex);
219 pthread_mutex_unlock(&transfer->completeMutex);
223 pthread_mutex_destroy(&transfer->completeMutex);
224 pthread_cond_destroy(&transfer->completeCond);
This request is made persistent before returning.
KineticStatus KineticClient_CreateSession(KineticSessionConfig *const config, KineticClient *const client, KineticSession **session)
Creates a session with the Kinetic Device per specified configuration.
Structure used to specify the configuration for a session.
#define KINETIC_OBJ_SIZE
Max object/value size.
FileTransferProgress * start_file_transfer(KineticSession *session, char const *const filename, uint64_t keyPrefix, uint32_t maxOverlappedChunks)
KineticStatus wait_for_put_finish(FileTransferProgress *const transfer)
KineticStatus KineticClient_DestroySession(KineticSession *const session)
Closes the connection to a host.
int main(int argc, char **argv)
#define REPORT_ERRNO(en, msg)
static void put_chunk_of_file_finished(KineticCompletionData *kinetic_data, void *client_data)
Closure which can be specified for operations which support asynchronous mode.
char host[256]
Host name/IP address of Kinetic Device.
const char * Kinetic_GetStatusDescription(KineticStatus status)
Provides a string representation for a KineticStatus code.
ByteBuffer ByteBuffer_Create(void *data, size_t max_len, size_t used)
#define KINETIC_PORT
Default kinetic port.
Completion data which will be provided to KineticCompletionClosure for asynchronous operations...
No operation has been attempted.
const char * logFile
Path to log file. Specify 'stdout' to log to STDOUT or NULL to disable logging.
KineticStatus status
Resultant status of the operation.
ByteBuffer ByteBuffer_CreateAndAppendFormattedCString(void *data, size_t max_len, const char *format,...)
#define KINETIC_DEFAULT_KEY_LEN
Default key length.
KineticStatus
Kinetic status codes.
KineticStatus KineticClient_Put(KineticSession *const session, KineticEntry *const entry, KineticCompletionClosure *closure)
Executes a PUT operation to store/update an entry on the Kinetic Device.
static int put_chunk_of_file(FileTransferProgress *transfer)
Configuration values for the KineticClient connection.
void KineticClient_Shutdown(KineticClient *const client)
Performs shutdown/cleanup of the kinetic-c client library.
KineticClient * KineticClient_Init(KineticClientConfig *config)
Initializes the Kinetic API and configures logging.
ByteBuffer ByteBuffer_CreateAndAppend(void *data, size_t max_len, const void *value, size_t value_len)
ByteArray ByteArray_CreateWithCString(const char *str)