28 #include <sys/socket.h>
31 #include <netinet/tcp.h>
39 #define BUF_SZ (2 * 1024L * 1024)
40 #define MAX_CLIENTS 10
42 #define NO_CLIENT ((int)-1)
46 #define LOG(VERBOSITY, ...) \
48 if (VERBOSITY <= cfg->verbosity) { \
49 printf(__VA_ARGS__); \
70 int successful_writes;
71 int last_successful_writes;
73 struct pollfd *accept_fds;
74 struct pollfd *client_fds;
86 struct sockaddr *addr, socklen_t addr_len);
89 uint8_t *buf,
size_t write_size);
93 "Usage: echosrv [-l LOW_PORT] [-h HIGH_PORT] [-v] \n"
94 " If only one of -l or -h are specified, it will use just that one port.\n"
95 " -v can be used multiple times to increase verbosity.\n");
99 static void parse_args(
int argc,
char **argv, config *cfg) {
102 while ((a = getopt(argc, argv,
"l:h:v")) != -1) {
105 cfg->port_low = atoi(optarg);
108 cfg->port_high = atoi(optarg);
114 fprintf(stderr,
"illegal option: -- %c\n", a);
119 if (cfg->port_low == 0) { cfg->port_low = cfg->port_high; }
120 if (cfg->port_high == 0) { cfg->port_high = cfg->port_low; }
121 if (cfg->port_high < cfg->port_low || cfg->port_low == 0) {
usage(); }
122 if (cfg->verbosity > 0) { printf(
"verbosity: %d\n", cfg->verbosity); }
125 int main(
int argc,
char **argv) {
127 memset(&cfg, 0,
sizeof(cfg));
138 cfg->port_count = cfg->port_high - cfg->port_low + 1;
140 size_t accept_fds_sz = cfg->port_count *
sizeof(
struct pollfd);
141 struct pollfd *accept_fds = malloc(accept_fds_sz);
143 memset(accept_fds, 0, accept_fds_sz);
144 cfg->accept_fds = accept_fds;
146 size_t client_fds_sz =
MAX_CLIENTS *
sizeof(
struct pollfd);
147 struct pollfd *client_fds = malloc(client_fds_sz);
149 memset(client_fds, 0, client_fds_sz);
150 cfg->client_fds = client_fds;
156 size_t out_bufs_sz = MAX_CLIENTS *
sizeof(out_buf);
157 cfg->out_bufs = malloc(out_bufs_sz);
158 assert(cfg->out_bufs);
159 memset(cfg->out_bufs, 0, out_bufs_sz);
163 socket99_config scfg = {
172 for (
int i = 0; i < cfg->port_count; i++) {
173 scfg.port = i + cfg->port_low;
174 bool ok = socket99_open(&scfg, &res);
176 fprintf(stderr,
"Error opening port %d: ", i + cfg->port_low);
177 socket99_fprintf(stderr, &res);
180 cfg->accept_fds[i].fd = res.fd;
181 cfg->accept_fds[i].events = (POLLIN);
182 LOG(2,
" -- Accepting on %s:%d\n", scfg.host, scfg.port);
187 #define MAX_TIMEOUT 1000
191 LOG(1,
"%ld -- client_count: %d, successful writes: %d (avg %d/sec, delta %d)\n",
192 cfg->last_second, cfg->client_count, cfg->successful_writes,
193 cfg->successful_writes / cfg->ticks,
194 cfg->successful_writes - cfg->last_successful_writes);
195 cfg->last_successful_writes = cfg->successful_writes;
202 cfg->last_second = tv.tv_sec;
204 assert(cfg->client_fds[0].fd ==
NO_CLIENT);
210 if (tv.tv_sec != cfg->last_second) {
212 cfg->last_second = tv.tv_sec;
215 int accept_delay = 0;
216 int client_delay = 0;
218 if (cfg->client_count > 0) {
219 client_delay = delay;
221 accept_delay = delay;
228 int res = poll(cfg->accept_fds, cfg->port_count, accept_delay);
229 LOG((res == 0 ? 6 : 3),
"accept poll res %d\n", res);
237 }
else if (res == 0) {
245 if (cfg->client_count > 0) {
246 int res = poll(cfg->client_fds, cfg->client_count, client_delay);
248 LOG((res == 0 ? 6 : 3),
"client poll res %d\n", res);
256 }
else if (res == 0) {
259 LOG(3,
"poll(client_fds, %d) => res of %d\n",
260 cfg->client_count, res);
277 for (
int i = 0; i < cfg->port_count; i++) {
278 if (checked == available) {
break; }
279 struct pollfd *fd = &cfg->accept_fds[i];
280 if ((fd->revents & POLLERR) || (fd->revents & POLLHUP)) {
282 }
else if (fd->revents & POLLIN) {
284 struct sockaddr address;
286 int client_fd = accept(fd->fd, &address, &addr_len);
287 if (client_fd == -1) {
288 if (errno == EWOULDBLOCK) {
298 LOG(2,
"accepting client %d\n", client_fd);
305 struct sockaddr *addr, socklen_t addr_len) {
308 int client_index = 0;
309 for (client_index = 0; client_index <
MAX_CLIENTS; client_index++) {
310 LOG(4,
" -- cfg->client_fds[%d].fd == %d\n", client_index, cfg->client_fds[client_index].fd);
311 if (cfg->client_fds[client_index].fd ==
NO_CLIENT) {
break; }
313 assert(client_index != MAX_CLIENTS);
314 LOG(3,
" -- assigning client in slot %d\n", client_index);
316 out_buf *out = &cfg->out_bufs[client_index];
321 if (0 != setsockopt(cfd, IPPROTO_TCP, TCP_NODELAY, &flag,
sizeof(
int))) {
322 err(1,
"setsockopt");
325 struct pollfd *fd = &cfg->client_fds[client_index];
327 fd->events = (POLLIN);
334 for (
int i = 0; i < cfg->client_count; i++) {
335 if (checked == available) {
break; }
336 struct pollfd *fd = &cfg->client_fds[i];
338 LOG(4,
"fd[%d]->events 0x%08x ==> revents: 0x%08x\n", i, fd->events, fd->revents);
340 if ((fd->revents & POLLERR) || (fd->revents & POLLHUP)) {
341 LOG(3,
"Disconnecting client %d\n", fd->fd);
343 }
else if (fd->revents & POLLOUT) {
345 out_buf *buf = &cfg->out_bufs[i];
346 LOG(2,
"writing %zd bytes to %d\n", buf->out_bytes, buf->fd);
347 size_t wr_size = buf->out_bytes - buf->written_bytes;
348 ssize_t wres = write(buf->fd, &buf->buf[buf->written_bytes], wr_size);
352 }
else if (errno == EPIPE) {
358 buf->written_bytes += wres;
359 if (buf->written_bytes == buf->out_bytes) {
361 buf->written_bytes = 0;
362 cfg->successful_writes++;
366 }
else if (fd->revents & POLLIN) {
369 out_buf *buf = &cfg->out_bufs[i];
375 }
else if (errno == EPIPE) {
380 }
else if (rres > 0) {
382 LOG(2,
"%ld -- got %zd bytes\n",
383 cfg->last_second, rres);
386 LOG(2,
"else, rres %zd\n", rres);
393 uint8_t *buf,
size_t write_size) {
394 assert(write_size <=
BUF_SZ);
396 for (
int i = 0; i < cfg->client_count; i++) {
397 out_buf *out = &cfg->out_bufs[i];
399 buf[write_size] =
'\0';
400 LOG(2,
"%ld -- enqueing write of %zd bytes\n",
401 cfg->last_second, write_size);
403 size_t free_space =
BUF_SZ - out->out_bytes;
404 assert(free_space >= write_size);
405 memcpy(&out->buf[out->out_bytes], buf, write_size);
406 out->out_bytes += write_size;
408 cfg->client_fds[i].events = POLLOUT;
418 if (cfg->client_fds[i].fd == fd) {
419 LOG(3,
"disconnecting client %d\n", fd);
423 cfg->out_bufs[i].out_bytes = 0;
static void disconnect_client(config *cfg, int fd)
bool Util_IsResumableIOError(int errno_)
static void register_client(config *cfg, int cfd, struct sockaddr *addr, socklen_t addr_len)
static void listen_loop_poll(config *cfg)
static uint8_t read_buf[(2 *1024L *1024)]
static void tick_handler(config *cfg)
static void handle_client_io(config *cfg, int available)
static void handle_incoming_connections(config *cfg, int available)
static void enqueue_write(config *cfg, int fd, uint8_t *buf, size_t write_size)
int main(int argc, char **argv)
static void parse_args(int argc, char **argv, config *cfg)
#define LOG(VERBOSITY,...)
static void open_ports(config *cfg)
static void init_polling(config *cfg)
bool Util_Timestamp(struct timeval *tv, bool relative)