41 uint32_t magic_number;
46 #define MAGIC_NUMBER 3
48 #define MAX_SOCKETS 1000
49 #define DEFAULT_BUF_SIZE (1024 * 1024 + sizeof(prot_header_t))
50 #define PRINT_RESPONSES 0
60 size_t cur_payload_size;
77 size_t completed_deliveries;
85 static void run_bus(example_state *s,
struct bus *b);
86 static void parse_args(
int argc,
char **argv, example_state *s);
90 example_state *s = (example_state *)udata;
92 fprintf(stdout,
"%ld -- %s[%d] -- %s\n",
93 s->last_second, event_str, log_level, msg);
96 #define LOG(VERBOSITY, ...) \
97 do { if (state.verbosity >= VERBOSITY) { printf(__VA_ARGS__); } } while(0)
113 size_t read_size,
void *socket_udata) {
115 socket_info *si = (socket_info *)socket_udata;
121 assert(read_size == 0);
126 bool valid_header =
true;
127 bool split_header =
false;
129 size_t header_rem =
sizeof(prot_header_t) - si->used;
130 if (read_size > header_rem) {
131 printf(
"surplus read_size %zd\n", read_size);
132 printf(
"header_rem %zd (sizeof(prot_header_t) %zd)\n", header_rem,
sizeof(prot_header_t));
134 }
else if (read_size <
sizeof(prot_header_t)) {
139 size_t copied = read_size;
140 if (copied > header_rem) { copied = header_rem; }
142 memcpy(&si->buf[si->used], read_buf, copied);
145 if (si->used <
sizeof(prot_header_t)) {
147 .
next_read =
sizeof(prot_header_t) - si->used,
153 assert(si->used ==
sizeof(prot_header_t));
155 prot_header_t *header = (prot_header_t *)&si->buf[0];
157 if (si->used <
sizeof(prot_header_t)) {
158 printf(
"INVALID HEADER A: read_size %zd\n", si->used);
159 valid_header =
false;
161 printf(
"INVALID HEADER B: magic number 0x%08x\n", header->magic_number);
162 valid_header =
false;
166 prot_header_t *header = (prot_header_t *)&si->buf[0];
167 si->cur_payload_size = header->size;
168 memcpy(&si->buf[si->used], read_buf + copied, read_size - copied);
169 si->used += read_size - copied;
185 memcpy(&si->buf[si->used], read_buf, read_size);
186 si->used += read_size;
187 assert(si->used <= si->cur_payload_size +
sizeof(prot_header_t));
188 size_t rem = si->cur_payload_size +
sizeof(prot_header_t) - si->used;
193 .full_msg_buffer = read_buf,
212 socket_info *si = (socket_info *)socket_udata;
213 prot_header_t *header = (prot_header_t *)&si->buf[0];
214 uint8_t *payload = (uint8_t *)&si->buf[
sizeof(prot_header_t)];
217 for (
int i = 0; i < si->used; i++) {
218 if ((i & 15) == 0 && i > 0) { printf(
"\n"); }
219 printf(
"%02x ", si->buf[i]);
226 .seq_id = header->seq_id,
234 int64_t seq_id,
void *bus_udata,
void *socket_udata) {
235 printf(
"\n\n\nUNEXPECTED MESSAGE: %p, seq_id %lld, bus_udata %p, socket_udata %p\n\n\n\n",
236 msg, (
long long)seq_id, bus_udata, socket_udata);
241 int main(
int argc,
char **argv) {
249 .log_level =
state.verbosity,
257 LOG(0,
"failed to init bus: %d\n", res.
status);
266 LOG(1,
"shutting down\n");
280 LOG(3,
"-- caught signal %d\n", arg);
282 signal(arg, old_sigint_handler);
289 if (old_handler == SIG_ERR) { err(1,
"signal"); }
300 "Usage: %s [-b BUF_SIZE] [-l LOW_PORT] [-h HIGH_PORT] [-s STOP_AT_SEQUENCE_ID] [-v] \n"
301 " If only one of -l or -h are specified, it will use just that one port.\n"
302 " -v can be used multiple times to increase verbosity.\n"
307 static void parse_args(
int argc,
char **argv, example_state *s) {
312 while ((a = getopt(argc, argv,
"b:l:h:s:v")) != -1) {
315 s->buf_size = atol(optarg);
318 s->port_low = atoi(optarg);
321 s->port_high = atoi(optarg);
324 s->max_seq_id = atoi(optarg);
330 fprintf(stderr,
"illegal option: -- %c\n", a);
335 if (s->port_low == 0) { s->port_low = s->port_high; }
336 if (s->port_high == 0) { s->port_high = s->port_low; }
337 if (s->port_high < s->port_low || s->port_low == 0) {
usage(); }
339 if (s->verbosity > 0) {
340 printf(
"bus_size: %zd\n", s->buf_size);
341 printf(
"port_low: %d\n", s->port_low);
342 printf(
"port_high: %d\n", s->port_high);
343 printf(
"verbosity: %d\n", s->verbosity);
348 int socket_count = s->port_high - s->port_low + 1;
350 size_t info_size =
sizeof(socket_info) + s->buf_size;
353 for (
int i = 0; i < socket_count; i++) {
354 int port = i + s->port_low;
356 socket99_config cfg = {
363 if (!socket99_open(&cfg, &res)) {
364 socket99_fprintf(stderr, &res);
368 s->sockets[i] = res.fd;
370 socket_info *si = malloc(info_size);
372 memset(si, 0, info_size);
377 static size_t construct_msg(uint8_t *buf,
size_t buf_size,
size_t payload_size, int64_t seq_id) {
378 size_t header_size =
sizeof(prot_header_t);
379 assert(buf_size > header_size);
380 prot_header_t *header = (prot_header_t *)buf;
382 uint8_t *payload = &buf[header_size];
383 for (
int i = 0; i < payload_size; i++) {
384 payload[i] = (uint8_t)(0xFF & i);
388 header->seq_id = seq_id;
389 header->size = payload_size;
391 return header_size + payload_size;
396 #define INCREMENT_COMPLETION_COUNTER 1
399 example_state *s = &
state;
400 socket_info *si = (socket_info *)udata;
405 #if INCREMENT_COMPLETION_COUNTER
407 size_t cur = s->completed_deliveries;
409 LOG(3,
" -- ! got %zd bytes, seq_id 0x%08llx, %p\n",
410 si->cur_payload_size, res->
u.
response.seq_id,
422 LOG(1,
"BUS_SEND_TIMEOUT\n");
425 LOG(1,
"BUS_SEND_TX_FAILURE\n");
428 LOG(1,
"BUS_SEND_RX_FAILURE\n");
431 LOG(1,
"BUS_SEND_RX_TIMEOUT\n");
434 fprintf(stderr,
"match fail: %d\n", res->
status);
441 LOG(1,
"%ld -- %zd ticks, %zd requests, %zd responses (delta %zd)\n",
442 s->last_second, s->ticks, s->sent_msgs, s->completed_deliveries,
443 s->completed_deliveries - s->sent_msgs);
458 for (
int i = 0; i < s->sockets_used; i++) {
462 int cur_socket_i = 0;
466 size_t buf_size =
sizeof(msg_buf);
467 size_t payload_size = seq_id;
471 int sleep_counter = 0;
476 if (cur_second != s->last_second) {
478 s->last_second = cur_second;
480 if (sleep_counter > 0) {
482 if (sleep_counter == 0) {
483 printf(
" -- resuming\n");
485 }
else if ((cur_second & 0x3f) == 0x00) {
486 printf(
" -- sleeping for 10 seconds\n");
491 if (sleep_counter == 0) {
493 100 * 1024L, seq_id);
494 LOG(3,
" @@ sending message with %zd bytes\n", msg_size);
496 .
fd = s->sockets[cur_socket_i],
499 .msg_size = msg_size,
501 .udata = s->info[cur_socket_i],
507 LOG(1,
" @@@ Bus_SendRequest failed!\n");
509 if (dropped >= 100) {
510 LOG(1,
" @@@ more than 100 send failures, halting\n");
516 if (cur_socket_i == s->sockets_used) {
520 if (seq_id == s->max_seq_id) {
static void completion_cb(bus_msg_result_t *res, void *udata)
static void run_bus(example_state *s, struct bus *b)
bool Bus_Shutdown(bus *b)
Begin shutting the system down.
static void log_cb(log_event_t event, int log_level, const char *msg, void *udata)
static const char * executable_name
static sig_t register_signal_handler(int sig)
static void parse_args(int argc, char **argv, example_state *s)
static uint8_t read_buf[(2 *1024L *1024)]
bool Bus_SendRequest(struct bus *b, bus_user_msg *msg)
Send a request.
static bus_sink_cb_res_t sink_cb(uint8_t *read_buf, size_t read_size, void *socket_udata)
void Bus_Free(bus *b)
Free internal data structures for the bus.
static void open_sockets(example_state *s)
const char * Bus_LogEventStr(log_event_t event)
Get the string key for a log event ID.
static void register_signal_handlers(void)
void * udata
User data for callbacks.
static bus_unpack_cb_res_t unpack_cb(void *msg, void *socket_udata)
int main(int argc, char **argv)
static time_t get_cur_second(void)
struct bus_msg_result_t::@3::@5 response
static void signal_handler(int arg)
bool Bus_Init(bus_config *config, struct bus_result *res)
Initialize a bus, based on configuration in *config.
static bus_sink_cb_res_t reset_transfer(socket_info *si)
#define ATOMIC_BOOL_COMPARE_AND_SWAP(PTR, OLD, NEW)
bool Bus_RegisterSocket(struct bus *b, bus_socket_t type, int fd, void *udata)
Register a socket connected to an endpoint, and data that will be passed to all interactions on that ...
static sig_t old_sigint_handler
union bus_msg_result_t::@3 u
static void unexpected_msg_cb(void *msg, int64_t seq_id, void *bus_udata, void *socket_udata)
#define LOG(VERBOSITY,...)
bool Util_Timestamp(struct timeval *tv, bool relative)
static size_t construct_msg(uint8_t *buf, size_t buf_size, size_t payload_size, int64_t seq_id)
static void tick_handler(example_state *s)