kinetic-c  v0.12.0
Seagate Kinetic Protocol Client Library for C
threadpool.c
Go to the documentation of this file.
1 /*
2 * kinetic-c
3 * Copyright (C) 2015 Seagate Technology.
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20 #include <stdio.h>
21 #include <pthread.h>
22 #include <unistd.h>
23 #include <string.h>
24 #include <assert.h>
25 #include <err.h>
26 #include <poll.h>
27 #include <errno.h>
28 
29 #include "threadpool_internals.h"
30 
31 #define MIN_DELAY 10 /* msec */
32 #define DEFAULT_MAX_DELAY 10000 /* msec */
33 #define INFINITE_DELAY -1 /* poll will only return upon an event */
34 #define DEFAULT_TASK_RINGBUF_SIZE2 8
35 #define DEFAULT_MAX_THREADS 8
36 
37 static void notify_new_task(struct threadpool *t);
38 static bool notify_shutdown(struct threadpool *t);
39 static bool spawn(struct threadpool *t);
40 static void *thread_task(void *thread_info);
41 static void commit_current_task(struct threadpool *t, struct marked_task *task, size_t wh);
42 static void release_current_task(struct threadpool *t, struct marked_task *task, size_t rh);
43 
44 static void set_defaults(struct threadpool_config *cfg) {
45  if (cfg->task_ringbuf_size2 == 0) {
47  }
48 
49  if (cfg->max_threads == 0) { cfg->max_threads = DEFAULT_MAX_THREADS; }
50 }
51 
53  set_defaults(cfg);
54 
56  return NULL;
57  }
58  if (cfg->max_threads < 1) { return NULL; }
59 
60  struct threadpool *t = NULL;
61  struct marked_task *tasks = NULL;
62  struct thread_info *threads = NULL;
63 
64  t = malloc(sizeof(*t));
65  if (t == NULL) { goto cleanup; }
66 
67  size_t tasks_sz = (1 << cfg->task_ringbuf_size2) * sizeof(*tasks);
68  size_t threads_sz = cfg->max_threads * sizeof(struct thread_info);
69 
70  tasks = malloc(tasks_sz);
71  if (tasks == NULL) { goto cleanup; }
72 
73  threads = malloc(threads_sz);
74  if (threads == NULL) { goto cleanup; }
75 
76  memset(t, 0, sizeof(*t));
77  memset(threads, 0, threads_sz);
78 
79  /* Note: tasks is memset to a non-0 value so that the first slot,
80  * tasks[0].mark, will not match its ID and leave it in a
81  * prematurely commit-able state. */
82  memset(tasks, 0xFF, tasks_sz);
83 
84  t->tasks = tasks;
85  t->threads = threads;
89  t->max_threads = cfg->max_threads;
90  return t;
91 
92 cleanup:
93  if (t) { free(t); }
94  if (tasks) { free(tasks); }
95  if (threads) { free(threads); }
96  return NULL;
97 }
98 
99 bool Threadpool_Schedule(struct threadpool *t, struct threadpool_task *task,
100  size_t *pushback) {
101  if (t == NULL) { return false; }
102  if (task == NULL || task->task == NULL) { return false; }
103 
104  /* New tasks must not be scheduled after the threadpool starts
105  * shutting down. */
106  if (t->shutting_down) { return false; }
107 
108  size_t queue_size = t->task_ringbuf_size - 1;
109  size_t mask = queue_size;
110 
111  for (;;) {
112  size_t wh = t->task_reserve_head;
113  size_t rh = t->task_release_head;
114 
115  if (wh - rh >= queue_size - 1) {
116  if (pushback) { *pushback = wh - rh; }
117  //printf("FULL, %zd, %zd\n", wh - rh, t->task_commit_head - t->task_request_head);
118  return false; /* full, cannot schedule */
119  }
120  if (ATOMIC_BOOL_COMPARE_AND_SWAP(&t->task_reserve_head, wh, wh + 1)) {
121  assert(t->task_reserve_head - t->task_release_head < queue_size);
122  struct marked_task *tbuf = &t->tasks[wh & mask];
123  tbuf->task = task->task;
124  tbuf->cleanup = task->cleanup;
125  tbuf->udata = task->udata;
126 
127  commit_current_task(t, tbuf, wh);
128  notify_new_task(t);
129  if (pushback) { *pushback = wh - rh; }
130  return true;
131  }
132  }
133 }
134 
135 static void commit_current_task(struct threadpool *t, struct marked_task *task, size_t wh) {
136  size_t mask = t->task_ringbuf_mask;
137  task->mark = wh;
138  for (;;) {
139  size_t ch = t->task_commit_head;
140  task = &t->tasks[ch & mask];
141  if (ch != task->mark) { break; }
142  assert(ch < t->task_reserve_head);
143  if (ATOMIC_BOOL_COMPARE_AND_SWAP(&t->task_commit_head, ch, ch + 1)) {
144  assert(t->task_request_head <= t->task_commit_head);
145  }
146  }
147 }
148 
149 void Threadpool_Stats(struct threadpool *t, struct threadpool_info *info) {
150  if (info) {
151  uint8_t at = 0;
152  for (int i = 0; i < t->live_threads; i++) {
153  struct thread_info *ti = &t->threads[i];
154  if (ti->status == STATUS_AWAKE) { at++; }
155  }
156  info->active_threads = at;
157 
158  info->dormant_threads = t->live_threads - at;
160  }
161 }
162 
163 bool Threadpool_Shutdown(struct threadpool *t, bool kill_all) {
164  t->shutting_down = true;
165  size_t mask = t->task_ringbuf_mask;
166 
167  if (kill_all) {
168  for (int i = 0; i < t->live_threads; i++) {
169  struct thread_info *ti = &t->threads[i];
170  if (ti->status < STATUS_SHUTDOWN) {
171  ti->status = STATUS_SHUTDOWN;
172  int pcres = pthread_cancel(ti->t);
173  if (pcres != 0) {
174  /* If this fails, tolerate the failure that the
175  * pthread has already shut down. */
176  assert(pcres == ESRCH);
177  }
178  }
179  }
180  }
181 
182  notify_shutdown(t);
183 
184  while (t->task_commit_head > t->task_request_head) {
185  size_t rh = t->task_request_head;
186 
187  struct marked_task *tbuf = &t->tasks[rh & mask];
188  if (ATOMIC_BOOL_COMPARE_AND_SWAP(&t->task_request_head, rh, rh + 1)) {
189  if (tbuf->cleanup) {
190  tbuf->cleanup(tbuf->udata);
191  tbuf->udata = NULL;
192  }
194  }
195  }
196 
197  return notify_shutdown(t);
198 }
199 
200 void Threadpool_Free(struct threadpool *t) {
201  free(t->tasks);
202  t->tasks = NULL;
203  free(t->threads);
204  t->threads = NULL;
205  free(t);
206 }
207 
208 static void notify_new_task(struct threadpool *t) {
209  for (int i = 0; i < t->live_threads; i++) {
210  struct thread_info *ti = &t->threads[i];
211  if (ti->status == STATUS_ASLEEP) {
212  ssize_t res = write(ti->parent_fd,
214  if (NOTIFY_MSG_LEN == res) {
215  return;
216  } else if (res == -1) {
217  err(1, "write");
218  } else {
219  ;
220  }
221  }
222  }
223 
224  if (t->live_threads < t->max_threads) { /* spawn */
225  if (spawn(t)) {
226  SPIN_ADJ(t->live_threads, 1);
227  }
228  } else {
229  /* all awake & busy, just keep out of the way & let them work */
230  }
231 }
232 
233 static bool notify_shutdown(struct threadpool *t) {
234  int done = 0;
235 
236  for (int i = 0; i < t->live_threads; i++) {
237  struct thread_info *ti = &t->threads[i];
238  if (ti->status == STATUS_JOINED) {
239  done++;
240  } else if (ti->status == STATUS_SHUTDOWN) {
241  void *v = NULL;
242  int joinres = pthread_join(ti->t, &v);
243  if (0 == joinres) {
244  ti->status = STATUS_JOINED;
245  done++;
246  } else {
247  fprintf(stderr, "pthread_join: %d\n", joinres);
248  assert(joinres == ESRCH);
249  }
250  } else {
251  close(ti->parent_fd);
252  }
253  }
254 
255  return (done == t->live_threads);
256 }
257 
258 static bool spawn(struct threadpool *t) {
259  int id = t->live_threads;
260  if (id >= t->max_threads) { return false; }
261  struct thread_info *ti = &t->threads[id];
262 
263  struct thread_context *tc = malloc(sizeof(*tc));
264  if (tc == NULL) { return false; }
265 
266  int pipe_fds[2];
267  if (0 != pipe(pipe_fds)) {
268  printf("pipe(2) failure\n");
269  free(tc);
270  return false;
271  }
272 
273  ti->child_fd = pipe_fds[0];
274  ti->parent_fd = pipe_fds[1];
275 
276  *tc = (struct thread_context){ .t = t, .ti = ti };
277 
278  int res = pthread_create(&ti->t, NULL, thread_task, tc);
279  if (res == 0) {
280  ti->status = STATUS_AWAKE;
281  return true;
282  } else if (res == EAGAIN) {
283  close(pipe_fds[0]);
284  close(pipe_fds[1]);
285  free(tc);
286  return false;
287  } else {
288  assert(false);
289  }
290 }
291 
292 static void *thread_task(void *arg) {
293  struct thread_context *tc = (struct thread_context *)arg;
294  struct threadpool *t = tc->t;
295  struct thread_info *ti = tc->ti;
296 
297  size_t mask = t->task_ringbuf_mask;
298  struct pollfd pfd[1] = { { .fd=ti->child_fd, .events=POLLIN }, };
299  uint8_t read_buf[NOTIFY_MSG_LEN*32];
300 
301  while (ti->status < STATUS_SHUTDOWN) {
302  if (t->task_request_head == t->task_commit_head) {
303  if (ti->status == STATUS_AWAKE) {
304  ti->status = STATUS_ASLEEP;
305  }
306  int res = poll(pfd, 1, -1);
307  if (res == 1) {
308  if (pfd[0].revents & (POLLHUP | POLLERR | POLLNVAL)) {
309  /* TODO: HUP should be distinct from ERR -- hup is
310  * intentional shutdown, ERR probably isn't. */
311  ti->status = STATUS_SHUTDOWN;
312  break;
313  } else if (pfd[0].revents & POLLIN) {
314  if (ti->status == STATUS_ASLEEP) { ti->status = STATUS_AWAKE; }
315  //SPIN_ADJ(t->active_threads, 1);
316  ssize_t rres = read(ti->child_fd, read_buf, sizeof(read_buf));
317  if (rres < 0) {
318  assert(0);
319  }
320  }
321  }
322  }
323 
324  while (ti->status < STATUS_SHUTDOWN) {
325  size_t ch = t->task_commit_head;
326  size_t rh = t->task_request_head;
327  if (rh > ch - 1) {
328  break; /* nothing to do */
329  }
330  if (ATOMIC_BOOL_COMPARE_AND_SWAP(&t->task_request_head, rh, rh + 1)) {
331  struct marked_task *ptask = &t->tasks[rh & mask];
332  assert(ptask->mark == rh);
333 
334  struct marked_task task = {
335  .task = ptask->task,
336  .cleanup = ptask->cleanup,
337  .udata = ptask->udata,
338  };
339 
340  release_current_task(t, ptask, rh);
341  ptask = NULL;
342  task.task(task.udata);
343  break;
344  }
345  }
346  }
347 
348  close(ti->child_fd);
349  free(tc);
350  return NULL;
351 }
352 
353 static void release_current_task(struct threadpool *t, struct marked_task *task, size_t rh) {
354  size_t mask = t->task_ringbuf_mask;
355  task->mark = ~rh;
356  for (;;) {
357  size_t relh = t->task_release_head;
358  task = &t->tasks[relh & mask];
359  if (task->mark != ~relh) { break; }
360  if (ATOMIC_BOOL_COMPARE_AND_SWAP(&t->task_release_head, relh, relh + 1)) {
361  assert(relh < t->task_commit_head);
362  }
363  }
364 }
size_t task_request_head
void Threadpool_Free(struct threadpool *t)
Free a threadpool.
Definition: threadpool.c:200
size_t task_release_head
thread_status_t status
size_t task_ringbuf_size
#define NOTIFY_MSG
Thread_info, plus pointer back to main threadpool manager.
size_t backlog_size
Definition: threadpool.h:58
threadpool_task_cb * task
Definition: threadpool.h:49
#define THREADPOOL_MAX_RINGBUF_SIZE2
Definition: threadpool.h:28
Configuration for thread pool.
Definition: threadpool.h:34
A task, with an additional mark.
struct thread_info * ti
struct marked_task * tasks
static uint8_t read_buf[(2 *1024L *1024)]
Definition: echosrv.c:44
#define NOTIFY_MSG_LEN
size_t task_reserve_head
uint8_t task_ringbuf_size2
bool Threadpool_Shutdown(struct threadpool *t, bool kill_all)
Notify the threadpool's threads that the system is going to shut down soon.
Definition: threadpool.c:163
static bool notify_shutdown(struct threadpool *t)
Definition: threadpool.c:233
Internal threadpool state.
Info retained by a thread while working.
uint8_t active_threads
Definition: threadpool.h:56
#define DEFAULT_TASK_RINGBUF_SIZE2
Definition: threadpool.c:34
uint8_t task_ringbuf_size2
Definition: threadpool.h:35
static void * thread_task(void *thread_info)
Definition: threadpool.c:292
#define SPIN_ADJ(F, ADJ)
Definition: atomic.h:31
static bool spawn(struct threadpool *t)
Definition: threadpool.c:258
struct threadpool * Threadpool_Init(struct threadpool_config *cfg)
Initialize a threadpool, according to a config.
Definition: threadpool.c:52
Statistics about the current state of the threadpool.
Definition: threadpool.h:55
threadpool_task_cb * task
A task.
Definition: threadpool.h:48
struct thread_info * threads
static void release_current_task(struct threadpool *t, struct marked_task *task, size_t rh)
Definition: threadpool.c:353
struct threadpool * t
static void set_defaults(struct threadpool_config *cfg)
Definition: threadpool.c:44
size_t task_ringbuf_mask
static void notify_new_task(struct threadpool *t)
Definition: threadpool.c:208
#define ATOMIC_BOOL_COMPARE_AND_SWAP(PTR, OLD, NEW)
Definition: atomic.h:27
threadpool_task_cleanup_cb * cleanup
Definition: threadpool.h:50
void Threadpool_Stats(struct threadpool *t, struct threadpool_info *info)
If TI is non-NULL, fill out some statistics about the operating state of the thread pool...
Definition: threadpool.c:149
uint8_t dormant_threads
Definition: threadpool.h:57
#define DEFAULT_MAX_THREADS
Definition: threadpool.c:35
static void commit_current_task(struct threadpool *t, struct marked_task *task, size_t wh)
Definition: threadpool.c:135
uint8_t max_threads
Definition: threadpool.h:37
bool Threadpool_Schedule(struct threadpool *t, struct threadpool_task *task, size_t *pushback)
Schedule a task in the threadpool.
Definition: threadpool.c:99
threadpool_task_cleanup_cb * cleanup