kinetic-c  v0.12.0
Seagate Kinetic Protocol Client Library for C
test_threadpool_sequencing.c
Go to the documentation of this file.
1 /*
2 * kinetic-c
3 * Copyright (C) 2015 Seagate Technology.
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20 #include <stdlib.h>
21 #include <stdio.h>
22 #include <unistd.h>
23 #include <assert.h>
24 #include <err.h>
25 #include <poll.h>
26 #include <sys/time.h>
27 #include <string.h>
28 #include <pthread.h>
29 
30 #include "threadpool.h"
31 
32 /* Start a task that starts another task that starts another task that ... */
33 
34 static size_t completed_count = 0;
35 #define MAX_TASKS 32
36 
37 static void dump_stats(const char *prefix, struct threadpool_info *stats, size_t ticks) {
38  printf("%s -- %8ld thread tasks / sec -- (at %d, dt %d, bl %zd) -- %zd\n",
39  prefix, completed_count / ticks,
40  stats->active_threads, stats->dormant_threads, stats->backlog_size,
42 }
43 
44 #define ATOMIC_BOOL_COMPARE_AND_SWAP(PTR, OLD, NEW) \
45  (__sync_bool_compare_and_swap(PTR, OLD, NEW))
46 
47 /* Spin attempting to atomically adjust F by ADJ until successful */
48 #define SPIN_ADJ(F, ADJ) \
49  do { \
50  for (;;) { \
51  size_t v = F; \
52  if (ATOMIC_BOOL_COMPARE_AND_SWAP(&F, v, v + ADJ)) { \
53  break; \
54  } \
55  } \
56  } while (0)
57 
58 typedef struct {
59  struct threadpool *t;
60  struct threadpool_task *task;
61  size_t count;
62 } env;
63 
64 static size_t limit = 1000000;
65 
66 static void task_cb(void *udata) {
67  env *e = (env *)udata;
68  if (e->count == limit) {
70  } else {
71  e->count++;
72  if ((e->count & ((1 << 16) - 1)) == 0) {
73  printf("count: %zd on %p\n", e->count, (void *)pthread_self());
74  }
75  for (;;) {
76  if (Threadpool_Schedule(e->t, e->task, NULL)) {
77  break;
78  }
79  usleep(1 * 1000);
80  }
81  }
82 }
83 
84 int main(int argc, char **argv) {
85  uint8_t sz2 = 8;
86  uint8_t max_threads = 8;
87 
88  char *sz2_env = getenv("SZ2");
89  char *max_threads_env = getenv("MAX_THREADS");
90  char *limit_env = getenv("LIMIT");
91  if (sz2_env) { sz2 = atoi(sz2_env); }
92  if (max_threads_env) { max_threads = atoi(max_threads_env); }
93  if (limit_env) { limit = atol(limit_env); }
94 
95  if (max_threads > MAX_TASKS) {
96  printf("too many threads\n");
97  exit(1);
98  }
99 
100  struct threadpool_config cfg = {
101  .task_ringbuf_size2 = sz2,
102  .max_threads = max_threads,
103  };
104  struct threadpool *t = Threadpool_Init(&cfg);
105  assert(t);
106 
107  struct threadpool_info stats;
108 
109  struct timeval tv;
110  gettimeofday(&tv, NULL);
111  time_t last_sec = tv.tv_sec;
112  size_t counterpressure = 0;
113  size_t ticks = 0;
114 
115  struct threadpool_task tasks[MAX_TASKS];
116  env envs[MAX_TASKS];
117  memset(&tasks, 0, sizeof(tasks));
118 
119  for (int i = 0; i < max_threads; i++) {
120  envs[i] = (env){ .t = t, .task = &tasks[i], .count = 0, };
121  tasks[i] = (struct threadpool_task){ .task = task_cb, .udata = &envs[i], };
122 
123  for (;;) {
124  if (Threadpool_Schedule(t, &tasks[i], &counterpressure)) {
125  break;
126  }
127  }
128  }
129 
130  printf("waiting...\n");
131 
132  while (completed_count < max_threads) {
133  gettimeofday(&tv, NULL);
134  if (tv.tv_sec > last_sec) {
135  last_sec = tv.tv_sec;
136  Threadpool_Stats(t, &stats);
137  ticks++;
138  dump_stats("tick...", &stats, ticks);
139  }
140  }
141 
142  return 0;
143 }
static size_t limit
size_t backlog_size
Definition: threadpool.h:58
threadpool_task_cb * task
Definition: threadpool.h:49
Configuration for thread pool.
Definition: threadpool.h:34
#define MAX_TASKS
#define SPIN_ADJ(F, ADJ)
static void dump_stats(const char *prefix, struct threadpool_info *stats, size_t ticks)
Internal threadpool state.
static void task_cb(void *udata)
uint8_t active_threads
Definition: threadpool.h:56
int main(int argc, char **argv)
uint8_t task_ringbuf_size2
Definition: threadpool.h:35
struct threadpool * Threadpool_Init(struct threadpool_config *cfg)
Initialize a threadpool, according to a config.
Definition: threadpool.c:52
Statistics about the current state of the threadpool.
Definition: threadpool.h:55
A task.
Definition: threadpool.h:48
static size_t completed_count
void Threadpool_Stats(struct threadpool *t, struct threadpool_info *info)
If TI is non-NULL, fill out some statistics about the operating state of the thread pool...
Definition: threadpool.c:149
uint8_t dormant_threads
Definition: threadpool.h:57
uint8_t max_threads
Definition: threadpool.h:37
bool Threadpool_Schedule(struct threadpool *t, struct threadpool_task *task, size_t *pushback)
Schedule a task in the threadpool.
Definition: threadpool.c:99