39 if (session == NULL) {
40 LOG0(
"Session is NULL");
45 LOG0(
"Client is NULL");
49 session->connected =
false;
53 if (pthread_mutex_init(&session->sendMutex, NULL) != 0) {
54 LOG0(
"Failed initializing session send mutex!");
58 session->outstandingOperations =
60 if (session->outstandingOperations == NULL) {
61 LOG0(
"Failed creating session counting semaphore!");
70 if (session == NULL) {
81 if (session == NULL) {
88 session->config.host, session->config.port);
90 LOG0(
"Session connection failed!");
92 session->connected =
false;
95 session->connected =
true;
100 bool success =
Bus_RegisterSocket(session->messageBus, socket_type, session->socket, session);
102 LOG0(
"Failed registering connection with client!");
103 goto connection_error_cleanup;
109 LOG0(
"Timed out waiting for connection ID from device!");
110 goto connection_error_cleanup;
112 LOGF1(
"Received connection ID %lld for session %p",
117 connection_error_cleanup:
119 if (session->si != NULL) {
127 session->connected =
false;
133 if (session == NULL) {
136 if (!session->connected || session->socket < 0) {
145 session->connected =
false;
146 pthread_mutex_destroy(&session->sendMutex);
153 if (session == NULL) {
156 return session->terminationStatus;
162 session->terminationStatus = status;
165 #define ATOMIC_FETCH_AND_INCREMENT(P) __sync_fetch_and_add(P, 1)
177 return session->config.clusterVersion;
183 session->config.clusterVersion = cluster_version;
189 return session->connectionID;
195 session->connectionID = id;
int64_t KineticSession_GetConnectionID(KineticSession const *const session)
#define LOGF1(message,...)
#define KINETIC_SOCKET_INVALID
Invalid socket file descriptor value.
bool Bus_ReleaseSocket(struct bus *b, int fd, void **socket_udata_out)
Free metadata about a socket that has been disconnected.
KineticStatus KineticSession_Connect(KineticSession *const session)
void KineticCountingSemaphore_Destroy(KineticCountingSemaphore *const sem)
KineticStatus KineticSession_Create(KineticSession *const session, KineticClient *const client)
void KineticSession_SetClusterVersion(KineticSession *const session, int64_t cluster_version)
#define ATOMIC_FETCH_AND_INCREMENT(P)
#define KINETIC_MAX_OUTSTANDING_OPERATIONS_PER_SESSION
void KineticSession_SetTerminationStatus(KineticSession *const session, KineticStatus status)
void KineticAllocator_FreeSession(KineticSession *session)
int KineticSocket_Connect(const char *host, int port)
Failed allocating/deallocating memory.
#define KINETIC_SOCKET_DESCRIPTOR_INVALID
#define KINETIC_ASSERT(cond)
No connection/disconnected.
bool KineticResourceWaiter_WaitTilAvailable(KineticResourceWaiter *const waiter, uint32_t max_wait_sec)
Session was NULL in request.
int64_t KineticSession_GetNextSequenceCount(KineticSession *const session)
int64_t KineticSession_GetClusterVersion(KineticSession const *const session)
void KineticSocket_Close(int socket)
KineticStatus KineticSession_Disconnect(KineticSession *const session)
KineticStatus KineticSession_GetTerminationStatus(KineticSession const *const session)
bool Bus_RegisterSocket(struct bus *b, bus_socket_t type, int fd, void *udata)
Register a socket connected to an endpoint, and data that will be passed to all interactions on that ...
KineticStatus
Kinetic status codes.
#define KINETIC_CONNECTION_TIMEOUT_SECS
#define PDU_PROTO_MAX_LEN
KineticCountingSemaphore * KineticCountingSemaphore_Create(uint32_t counts)
void KineticSession_SetConnectionID(KineticSession *const session, int64_t id)
KineticStatus KineticSession_Destroy(KineticSession *const session)
Session configuration was invalid or NULL.