rpm  5.4.15
rpmzq.c
Go to the documentation of this file.
1 
6 /* pigz.c -- parallel implementation of gzip
7  * Copyright (C) 2007, 2008 Mark Adler
8  * Version 2.1.4 9 Nov 2008 Mark Adler
9  */
10 
11 /*
12  This software is provided 'as-is', without any express or implied
13  warranty. In no event will the author be held liable for any damages
14  arising from the use of this software.
15 
16  Permission is granted to anyone to use this software for any purpose,
17  including commercial applications, and to alter it and redistribute it
18  freely, subject to the following restrictions:
19 
20  1. The origin of this software must not be misrepresented; you must not
21  claim that you wrote the original software. If you use this software
22  in a product, an acknowledgment in the product documentation would be
23  appreciated but is not required.
24  2. Altered source versions must be plainly marked as such, and must not be
25  misrepresented as being the original software.
26  3. This notice may not be removed or altered from any source distribution.
27 
28  Mark Adler
29  madler@alumni.caltech.edu
30 
31  Mark accepts donations for providing this software. Donations are not
32  required or expected. Any amount that you feel is appropriate would be
33  appreciated. You can use this link:
34 
35  https://www.paypal.com/cgi-bin/webscr?cmd=_s-xclick&hosted_button_id=536055
36 
37  */
38 
39 #include "system.h"
40 
41 #if defined(WITH_BZIP2)
42 
43 #include <rpmiotypes.h>
44 #include <rpmlog.h>
45 
46 #define _RPMBZ_INTERNAL
47 #include "rpmbz.h"
48 
49 /*@access rpmbz @*/
50 
51 #include "yarn.h"
52 
53 #define _RPMZLOG_INTERNAL
54 #include "rpmzlog.h"
55 
56 /*@access rpmzMsg @*/
57 /*@access rpmzLog @*/
58 
59 #define Trace(x) \
60  do { \
61  if (zq->verbosity > 2) { \
62  rpmzLogAdd x; \
63  } \
64  } while (0)
65 
66 #define _RPMZQ_INTERNAL
67 #include "rpmzq.h"
68 
69 /*@access rpmzSpace @*/
70 /*@access rpmzPool @*/
71 /*@access rpmzQueue @*/
72 /*@access rpmzJob @*/
73 
74 #include "debug.h"
75 
76 #ifdef __cplusplus
77 GENfree(rpmzJob)
78 GENfree(rpmzPool)
79 GENfree(rpmzSpace)
80 GENfree(rpmzFIFO)
81 GENfree(rpmzSEQ)
82 #endif /* __cplusplus */
83 
84 /*@unchecked@*/
85 int _rpmzq_debug = 0;
86 
87 /*@unchecked@*/
88 static struct rpmzQueue_s __zq;
89 
90 /*@unchecked@*/
91 rpmzQueue _rpmzq = &__zq;
92 
93 /*==============================================================*/
96 static void _rpmzqArgCallback(poptContext con,
97  /*@unused@*/ enum poptCallbackReason reason,
98  const struct poptOption * opt, /*@unused@*/ const char * arg,
99  /*@unused@*/ void * data)
100  /*@globals _rpmzq, fileSystem, internalState @*/
101  /*@modifies _rpmzq, fileSystem, internalState @*/
102 {
103  rpmzQueue zq = _rpmzq;
104 
105  /* XXX avoid accidental collisions with POPT_BIT_SET for flags */
106  if (opt->arg == NULL)
107  switch (opt->val) {
108  case 'q': zq->verbosity = 0; break;
109  case 'v': zq->verbosity++; break;
110  default:
111  /* XXX really need to display longName/shortName instead. */
112  fprintf(stderr, _("Unknown option -%c\n"), (char)opt->val);
113  poptPrintUsage(con, stderr, 0);
114  /*@-exitarg@*/ exit(2); /*@=exitarg@*/
115  /*@notreached@*/ break;
116  }
117 }
118 
119 #ifdef REFERENCE
120 Usage: pigz [options] [files ...]
121  will compress files in place, adding the suffix '.gz'. If no files are
122  specified, stdin will be compressed to stdout. pigz does what gzip does,
123  but spreads the work over multiple processors and cores when compressing.
124 
125 Options:
126  -0 to -9, --fast, --best Compression levels, --fast is -1, --best is -9
127  -b, --blocksize mmm Set compression block size to mmmK (default 128K)
128  -p, --processes n Allow up to n compression threads (default 8)
129  -i, --independent Compress blocks independently for damage recovery
130  -R, --rsyncable Input-determined block locations for rsync
131  -d, --decompress Decompress the compressed input
132  -t, --test Test the integrity of the compressed input
133  -l, --list List the contents of the compressed input
134  -f, --force Force overwrite, compress .gz, links, and to terminal
135  -r, --recursive Process the contents of all subdirectories
136  -s, --suffix .sss Use suffix .sss instead of .gz (for compression)
137  -z, --zlib Compress to zlib (.zz) instead of gzip format
138  -K, --zip Compress to PKWare zip (.zip) single entry format
139  -k, --keep Do not delete original file after processing
140  -c, --stdout Write all processed output to stdout (wont delete)
141  -N, --name Store/restore file name and mod time in/from header
142  -n, --no-name Do not store or restore file name in/from header
143  -T, --no-time Do not store or restore mod time in/from header
144  -q, --quiet Print no messages, even on error
145  -v, --verbose Provide more verbose output
146 #endif
147 
148 #ifdef REFERENCE
149 Parallel BZIP2 v1.0.5 - by: Jeff Gilchrist http://compression.ca
150 [Jan. 08, 2009] (uses libbzip2 by Julian Seward)
151 
152 Usage: ./pbzip2 [-1 .. -9] [-b#cdfhklp#qrtVz] <filename> <filename2> <filenameN>
153  -b# : where # is the file block size in 100k (default 9 = 900k)
154  -c : output to standard out (stdout)
155  -d : decompress file
156  -f : force, overwrite existing output file
157  -h : print this help message
158  -k : keep input file, dont delete
159  -l : load average determines max number processors to use
160  -p# : where # is the number of processors (default 2)
161  -q : quiet mode (default)
162  -r : read entire input file into RAM and split between processors
163  -t : test compressed file integrity
164  -v : verbose mode
165  -V : display version info for pbzip2 then exit
166  -z : compress file (default)
167  -1 .. -9 : set BWT block size to 100k .. 900k (default 900k)
168 
169 Example: pbzip2 -b15vk myfile.tar
170 Example: pbzip2 -p4 -r -5 myfile.tar second*.txt
171 Example: tar cf myfile.tar.bz2 --use-compress-prog=pbzip2 dir_to_compress/
172 Example: pbzip2 -d myfile.tar.bz2
173 
174 #endif
175 
176 /*@unchecked@*/ /*@observer@*/
177 struct poptOption rpmzqOptionsPoptTable[] = {
178 /*@-type@*/ /* FIX: cast? */
179  { NULL, '\0', POPT_ARG_CALLBACK | POPT_CBFLAG_INC_DATA | POPT_CBFLAG_CONTINUE,
180  (void *)_rpmzqArgCallback, 0, NULL, NULL },
181 /*@=type@*/
182 
183  { "fast", '\0', POPT_ARG_VAL, &__zq.level, 1,
184  N_("fast compression"), NULL },
185  { "best", '\0', POPT_ARG_VAL, &__zq.level, 9,
186  N_("best compression"), NULL },
187  { "extreme", 'e', POPT_BIT_SET|POPT_ARGFLAG_TOGGLE|POPT_ARGFLAG_DOC_HIDDEN,
188  &__zq.flags, RPMZ_FLAGS_EXTREME,
189  N_("extreme compression"), NULL },
190  { NULL, '0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &__zq.level, 0,
191  NULL, NULL },
192  { NULL, '1', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &__zq.level, 1,
193  NULL, NULL },
194  { NULL, '2', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &__zq.level, 2,
195  NULL, NULL },
196  { NULL, '3', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &__zq.level, 3,
197  NULL, NULL },
198  { NULL, '4', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &__zq.level, 4,
199  NULL, NULL },
200  { NULL, '5', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &__zq.level, 5,
201  NULL, NULL },
202  { NULL, '6', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &__zq.level, 6,
203  NULL, NULL },
204  { NULL, '7', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &__zq.level, 7,
205  NULL, NULL },
206  { NULL, '8', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &__zq.level, 8,
207  NULL, NULL },
208  { NULL, '9', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &__zq.level, 9,
209  NULL, NULL },
210 
211 #ifdef NOTYET /* XXX --blocksize/--processes callback to validate arg */
212  { "blocksize", 'b', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, &__zq.blocksize, 0,
213  N_("Set compression block size to mmmK"), N_("mmm") },
214  /* XXX same as --threads */
215  { "processes", 'p', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, &__zq.threads, 0,
216  N_("Allow up to n compression threads"), N_("n") },
217 #else
218  /* XXX show default is bogus with callback, can't find value. */
219  { "blocksize", 'b', POPT_ARG_VAL|POPT_ARGFLAG_SHOW_DEFAULT, NULL, 'b',
220  N_("Set compression block size to mmmK"), N_("mmm") },
221  /* XXX same as --threads */
222  { "processes", 'p', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, NULL, 'p',
223  N_("Allow up to n compression threads"), N_("n") },
224 #endif
225  /* XXX display toggle "-i,--[no]indepndent" bustage. */
226  { "independent", 'i', POPT_BIT_SET|POPT_ARGFLAG_TOGGLE, &__zq.flags, RPMZ_FLAGS_INDEPENDENT,
227  N_("Compress blocks independently for damage recovery"), NULL },
228  /* XXX display toggle "-r,--[no]rsyncable" bustage. */
229  { "rsyncable", 'R', POPT_BIT_SET|POPT_ARGFLAG_TOGGLE, &__zq.flags, RPMZ_FLAGS_RSYNCABLE,
230  N_("Input-determined block locations for rsync"), NULL },
231 #if defined(NOTYET)
232  /* XXX -T collides with pigz -T,--no-time */
233  { "threads", '\0', POPT_ARG_INT|POPT_ARGFLAG_SHOW_DEFAULT, &__zq.threads, 0,
234  N_("Allow up to n compression threads"), N_("n") },
235 #endif /* _RPMZ_INTERNAL_XZ */
236 
237  /* ===== Operation modes */
238  { "compress", 'z', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &__zq.mode, RPMZ_MODE_COMPRESS,
239  N_("force compression"), NULL },
240  { "uncompress", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &__zq.mode, RPMZ_MODE_DECOMPRESS,
241  N_("force decompression"), NULL },
242  { "decompress", 'd', POPT_ARG_VAL, &__zq.mode, RPMZ_MODE_DECOMPRESS,
243  N_("Decompress the compressed input"), NULL },
244  { "test", 't', POPT_ARG_VAL, &__zq.mode, RPMZ_MODE_TEST,
245  N_("Test the integrity of the compressed input"), NULL },
246  { "list", 'l', POPT_BIT_SET, &__zq.flags, RPMZ_FLAGS_LIST,
247  N_("List the contents of the compressed input"), NULL },
248  { "info", '\0', POPT_BIT_SET|POPT_ARGFLAG_DOC_HIDDEN, &__zq.flags, RPMZ_FLAGS_LIST,
249  N_("list block sizes, total sizes, and possible metadata"), NULL },
250  { "force", 'f', POPT_BIT_SET, &__zq.flags, RPMZ_FLAGS_FORCE,
251  N_("Force: --overwrite --recompress --symlinks --tty"), NULL },
252  { "overwrite", '\0', POPT_BIT_SET|POPT_ARGFLAG_TOGGLE,
253  &__zq.flags, RPMZ_FLAGS_OVERWRITE,
254  N_(" Permit overwrite of output files"), NULL },
255  { "recompress",'\0', POPT_BIT_SET|POPT_ARGFLAG_TOGGLE,
256  &__zq.flags, RPMZ_FLAGS_ALREADY,
257  N_(" Permit compress of already compressed files"), NULL },
258  { "symlinks",'\0', POPT_BIT_SET|POPT_ARGFLAG_TOGGLE,
259  &__zq.flags, RPMZ_FLAGS_SYMLINKS,
260  N_(" Permit symlink input file to be compressed"), NULL },
261  { "tty",'\0', POPT_BIT_SET|POPT_ARGFLAG_TOGGLE,
262  &__zq.flags, RPMZ_FLAGS_TTY,
263  N_(" Permit compressed output to terminal"), NULL },
264 
265  /* ===== Operation modifiers */
266  /* XXX display toggle "-r,--[no]recursive" bustage. */
267  { "recursive", 'r', POPT_BIT_SET|POPT_ARGFLAG_TOGGLE, &__zq.flags, RPMZ_FLAGS_RECURSE,
268  N_("Process the contents of all subdirectories"), NULL },
269  { "suffix", 'S', POPT_ARG_STRING, &__zq.suffix, 0,
270  N_("Use suffix .sss instead of .gz (for compression)"), N_(".sss") },
271  { "ascii", 'a', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, NULL, 'a',
272  N_("Compress to LZW (.Z) instead of gzip format"), NULL },
273  { "bits", 'Z', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, NULL, 'Z',
274  N_("Compress to LZW (.Z) instead of gzip format"), NULL },
275  { "zlib", 'z', POPT_ARG_VAL, &__zq.format, RPMZ_FORMAT_ZLIB,
276  N_("Compress to zlib (.zz) instead of gzip format"), NULL },
277  { "zip", 'K', POPT_ARG_VAL, &__zq.format, RPMZ_FORMAT_ZIP2,
278  N_("Compress to PKWare zip (.zip) single entry format"), NULL },
279  { "keep", 'k', POPT_BIT_SET, &__zq.flags, RPMZ_FLAGS_KEEP,
280  N_("Do not delete original file after processing"), NULL },
281  { "stdout", 'c', POPT_BIT_SET, &__zq.flags, RPMZ_FLAGS_STDOUT,
282  N_("Write all processed output to stdout (won't delete)"), NULL },
283  { "to-stdout", 'c', POPT_BIT_SET|POPT_ARGFLAG_DOC_HIDDEN, &__zq.flags, RPMZ_FLAGS_STDOUT,
284  N_("write to standard output and don't delete input files"), NULL },
285 
286  /* ===== Metadata options */
287  /* XXX logic is reversed, disablers should clear with toggle. */
288  { "name", 'N', POPT_BIT_SET, &__zq.flags, (RPMZ_FLAGS_HNAME|RPMZ_FLAGS_HTIME),
289  N_("Store/restore file name and mod time in/from header"), NULL },
290  { "no-name", 'n', POPT_BIT_CLR, &__zq.flags, RPMZ_FLAGS_HNAME,
291  N_("Do not store or restore file name in/from header"), NULL },
292  /* XXX -T collides with xz -T,--threads */
293  { "no-time", 'T', POPT_BIT_CLR, &__zq.flags, RPMZ_FLAGS_HTIME,
294  N_("Do not store or restore mod time in/from header"), NULL },
295 
296  /* ===== Other options */
297 
298  POPT_TABLEEND
299 };
300 
301 #define zqFprintf if (_rpmzq_debug) fprintf
302 
303 /*==============================================================*/
304 
305 /*@-mustmod@*/
306 int rpmbzCompressBlock(void * _bz, rpmzJob job)
307 {
308  rpmbz bz = (rpmbz) _bz;
309  unsigned int len = job->out->len;
310  int rc;
311  rc = BZ2_bzBuffToBuffCompress((char *)job->out->buf, &len,
312  (char *)job->in->buf, job->in->len, bz->B, bz->V, bz->W);
313  job->out->len = len;
314  if (rc != BZ_OK)
315  zqFprintf(stderr, "==> %s(%p,%p) rc %d\n", __FUNCTION__, bz, job, rc);
316  return rc;
317 }
318 /*@=mustmod@*/
319 
320 /*@-mustmod@*/
321 static int rpmbzDecompressBlock(rpmbz bz, rpmzJob job)
322  /*@globals fileSystem @*/
323  /*@modifies job, fileSystem @*/
324 {
325  unsigned int len = job->out->len;
326  int rc;
327  rc = BZ2_bzBuffToBuffDecompress((char *)job->out->buf, &len,
328  (char *)job->in->buf, job->in->len, bz->S, bz->V);
329  job->out->len = len;
330  if (rc != BZ_OK)
331  zqFprintf(stderr, "==> %s(%p,%p) rc %d\n", __FUNCTION__, bz, job, rc);
332  return rc;
333 }
334 /*@=mustmod@*/
335 
336 /*==============================================================*/
337 
338 /* -- pool of spaces for buffer management -- */
339 
340 /* These routines manage a pool of spaces. Each pool specifies a fixed size
341  buffer to be contained in each space. Each space has a use count, which
342  when decremented to zero returns the space to the pool. If a space is
343  requested from the pool and the pool is empty, a space is immediately
344  created unless a specified limit on the number of spaces has been reached.
345  Only if the limit is reached will it wait for a space to be returned to the
346  pool. Each space knows what pool it belongs to, so that it can be returned.
347  */
348 
349 /* initialize a pool (pool structure itself provided, not allocated) -- the
350  limit is the maximum number of spaces in the pool, or -1 to indicate no
351  limit, i.e., to never wait for a buffer to return to the pool */
352 rpmzPool rpmzqNewPool(size_t size, int limit)
353 {
354  rpmzPool pool = (rpmzPool) xcalloc(1, sizeof(*pool));
355 /*@=mustfreeonly@*/
356  pool->have = yarnNewLock(0);
357  pool->head = NULL;
358 /*@=mustfreeonly@*/
359  pool->size = size;
360  pool->limit = limit;
361  pool->made = 0;
362 zqFprintf(stderr, " ++ pool %p[%u,%d]\n", pool, (unsigned)size, limit);
363  return pool;
364 }
365 
366 /* get a space from a pool -- the use count is initially set to one, so there
367  is no need to call rpmzqUseSpace() for the first use */
368 rpmzSpace rpmzqNewSpace(rpmzPool pool, size_t len)
369 {
370  rpmzSpace space;
371 
372  if (pool != NULL) {
373  /* if can't create any more, wait for a space to show up */
374  yarnPossess(pool->have);
375  if (pool->limit == 0)
376  yarnWaitFor(pool->have, NOT_TO_BE, 0);
377 
378  /* if a space is available, pull it from the list and return it */
379  if (pool->head != NULL) {
380  space = pool->head;
381  yarnPossess(space->use);
382  pool->head = space->next;
383  yarnTwist(pool->have, BY, -1); /* one less in pool */
384  yarnTwist(space->use, TO, 1); /* initially one user */
385  return space;
386  }
387 
388  /* nothing available, don't want to wait, make a new space */
389 assert(pool->limit != 0);
390  if (pool->limit > 0)
391  pool->limit--;
392  pool->made++;
393  yarnRelease(pool->have);
394  }
395 
396  space = (rpmzSpace) xcalloc(1, sizeof(*space));
397 /*@-mustfreeonly@*/
398  space->use = yarnNewLock(1); /* initially one user */
399  space->len = (pool ? pool->size : len);
400  if (space->len > 0)
401  space->buf = (unsigned char *) xmalloc(space->len);
402  space->ptr = space->buf; /* XXX save allocated buffer */
403  space->ix = 0; /* XXX initialize to 0 */
404 /*@-assignexpose -temptrans @*/
405  space->pool = pool; /* remember the pool this belongs to */
406 /*@=assignexpose =temptrans @*/
407 /*@=mustfreeonly@*/
408 zqFprintf(stderr, " ++ space %p[%d] buf %p[%u]\n", space, 1, space->buf, (unsigned)space->len);
409 /*@-nullret@*/
410  return space;
411 /*@=nullret@*/
412 }
413 
414 /* increment the use count to require one more drop before returning this space
415  to the pool */
416 void rpmzqUseSpace(rpmzSpace space)
417 {
418  int use;
419  yarnPossess(space->use);
420  use = yarnPeekLock(space->use);
421 zqFprintf(stderr, " ++ space %p[%d] buf %p[%u]\n", space, use+1, space->buf, (unsigned)space->len);
422  yarnTwist(space->use, BY, 1);
423 }
424 
425 /* drop a space, returning it to the pool if the use count is zero */
427 {
428  int use;
429 
430  if (space == NULL)
431  return NULL;
432 
433  yarnPossess(space->use);
434  use = yarnPeekLock(space->use);
435 zqFprintf(stderr, " -- space %p[%d] buf %p[%u]\n", space, use, space->buf, (unsigned)space->len);
436 #ifdef NOTYET
437 assert(use > 0);
438 #else
439 if (use <= 0)
440 fprintf(stderr, "==> FIXME: %s: space %p[%d]\n", __FUNCTION__, space, use);
441 #endif
442  if (use == 1) {
443  rpmzPool pool = space->pool;
444  if (pool != NULL) {
445  yarnPossess(pool->have);
446  space->buf = (unsigned char *) space->ptr; /* XXX reset to original allocation. */
447  space->len = pool->size; /* XXX reset to pool->size */
448  space->ix = 0; /* XXX reset to 0 */
449 /*@-mustfreeonly@*/
450  space->next = pool->head;
451 /*@=mustfreeonly@*/
452  pool->head = space;
453  yarnTwist(pool->have, BY, 1);
454  } else {
455  yarnTwist(space->use, BY, -1);
456  space->ptr = _free(space->ptr);
457  space->use = yarnFreeLock(space->use);
458 /*@-compdestroy@*/
459  space = _free(space);
460 /*@=compdestroy@*/
461  return NULL;
462  }
463  }
464  yarnTwist(space->use, BY, -1);
465  return NULL;
466 }
467 
468 /* free the memory and lock resources of a pool -- return number of spaces for
469  debugging and resource usage measurement */
470 rpmzPool rpmzqFreePool(rpmzPool pool, int *countp)
471 {
472  rpmzSpace space;
473  int count;
474 
475  yarnPossess(pool->have);
476  count = 0;
477  while ((space = pool->head) != NULL) {
478  pool->head = space->next;
479  space->ptr = _free(space->ptr);
480  space->use = yarnFreeLock(space->use);
481 /*@-compdestroy@*/
482  space = _free(space);
483 /*@=compdestroy@*/
484  count++;
485  }
486  yarnRelease(pool->have);
487  pool->have = yarnFreeLock(pool->have);
488 #ifdef NOTYET
489 assert(count == pool->made);
490 #else
491 if (count != pool->made)
492 fprintf(stderr, "==> FIXME: %s: count %d pool->made %d\n", __FUNCTION__, count, pool->made);
493 #endif
494 zqFprintf(stderr, " -- pool %p count %d\n", pool, count);
495 /*@-compdestroy@*/
496  pool = _free(pool);
497 /*@=compdestroy@*/
498  if (countp != NULL)
499  *countp = count;
500  return NULL;
501 }
502 
503 rpmzJob rpmzqNewJob(long seq)
504 {
505  rpmzJob job = (rpmzJob) xcalloc(1, sizeof(*job));
506  job->use = yarnNewLock(1); /* initially one user */
507  job->seq = seq;
508  job->calc = yarnNewLock(0);
509 zqFprintf(stderr, " ++ job %p[%ld] use %d\n", job, seq, 1);
510  return job;
511 }
512 
514 {
515  int use;
516  if (job == NULL) return NULL;
517  yarnPossess(job->use);
518  use = yarnPeekLock(job->use);
519 zqFprintf(stderr, " ++ job %p[%ld] use %d\n", job, job->seq, use+1);
520  yarnTwist(job->use, BY, 1);
521  return job;
522 }
523 
524 /* drop a job, returning it to the pool if the use count is zero */
526 {
527  int use;
528 
529  if (job == NULL)
530  return NULL;
531 
532  yarnPossess(job->use);
533  use = yarnPeekLock(job->use);
534 zqFprintf(stderr, " -- job %p[%ld] use %d %p => %p\n", job, job->seq, use, job->in, job->out);
535 #ifdef NOTYET
536 assert(use > 0);
537 #else
538 if (use <= 0)
539 fprintf(stderr, "==> FIXME: %s: job %p[%ld] use %d\n", __FUNCTION__, job, job->seq, use);
540 #endif
541  if (use == 1) {
542 #ifdef NOTYET
543  rpmzJPool jpool = job->pool;
544  if (jpool != NULL) {
545  yarnPossess(jpool->have);
546  jpool->head = job;
547  yarnTwist(jpool->have, BY, 1);
548  } else
549 #endif
550  {
551  yarnTwist(job->use, BY, -1);
552  if (job->use != NULL)
553  job->use = yarnFreeLock(job->use);
554  if (job->calc != NULL)
555  job->calc = yarnFreeLock(job->calc);
556  job = _free(job);
557  return NULL;
558  }
559  }
560  yarnTwist(job->use, BY, -1);
561  return NULL;
562 }
563 
564 /* compress or write job (passed from compress list to write list) -- if seq is
565  equal to -1, rpmzqCompressThread() is instructed to return; if more is false then
566  this is the last chunk, which after writing tells write_thread to return */
567 
568 /* command the compress threads to all return, then join them all (call from
569  main thread), free all the thread-related resources */
570 void rpmzqFini(rpmzQueue zq)
571 {
572  rpmzLog zlog = zq->zlog;
573 
574  struct rpmzJob_s job;
575  int caught;
576 
577 zqFprintf(stderr, "--> %s(%p)\n", __FUNCTION__, zq);
578  /* only do this once */
579  if (zq->_zc.q == NULL)
580  return;
581 
582  /* command all of the extant compress threads to return */
583  yarnPossess(zq->_zc.q->have);
584  job.seq = -1;
585  job.next = NULL;
586 /*@-immediatetrans -mustfreeonly@*/
587  zq->_zc.q->head = &job;
588 /*@=immediatetrans =mustfreeonly@*/
589  zq->_zc.q->tail = &job.next;
590  yarnTwist(zq->_zc.q->have, BY, 1); /* will wake them all up */
591 
592  /* join all of the compress threads, verify they all came back */
593  caught = yarnJoinAll();
594  Trace((zlog, "-- joined %d compress threads", caught));
595 #ifdef NOTYET
596 assert(caught == zq->_zc.cthreads);
597 #else
598 if (caught != zq->_zc.cthreads)
599 fprintf(stderr, "==> FIXME: %s: caught %d z->_zc.cthreads %d\n", __FUNCTION__, caught, zq->_zc.cthreads);
600 #endif
601  zq->_zc.cthreads = 0;
602 
603  /* free the resources */
604  zq->_zw.pool = rpmzqFreePool(zq->_zw.pool, &caught);
605  Trace((zlog, "-- freed %d output buffers", caught));
606  zq->_zc.pool = rpmzqFreePool(zq->_zc.pool, &caught);
607  Trace((zlog, "-- freed %d input buffers", caught));
608  zq->_zc.q = rpmzqFiniFIFO(zq->_zc.q);
609  zq->_zw.q = rpmzqFiniSEQ(zq->_zw.q);
610 }
611 
612 /* setup job lists (call from main thread) */
613 void rpmzqInit(rpmzQueue zq)
614 {
615 zqFprintf(stderr, "--> %s(%p)\n", __FUNCTION__, zq);
616  /* set up only if not already set up*/
617  if (zq->_zc.q != NULL)
618  return;
619 
620  /* allocate locks and initialize lists */
621 /*@-mustfreeonly@*/
622  zq->_zc.q = rpmzqInitFIFO(0L);
623  zq->_zw.q = rpmzqInitSEQ(-1L);
624 
625  zq->_zc.pool = rpmzqNewPool(zq->iblocksize, zq->ilimit);
626 zqFprintf(stderr, "--> in_pool: %p[%u] blocksize %u\n", zq->_zc.pool, (unsigned)zq->ilimit, (unsigned)zq->iblocksize);
627  zq->_zw.pool = rpmzqNewPool(zq->oblocksize, zq->olimit);
628 zqFprintf(stderr, "--> out_pool: %p[%u] blocksize %u\n", zq->_zw.pool, (unsigned)zq->olimit, (unsigned)zq->oblocksize);
629 
630 }
631 
632 rpmzQueue rpmzqFree(/*@unused@*/ rpmzQueue zq)
633 {
634  return NULL;
635 }
636 
637 rpmzQueue rpmzqNew(rpmzQueue zq, rpmzLog zlog, int limit)
638 {
639  zq->_zinp.fn = NULL;
640  zq->_zinp.fdno = -1;
641  zq->_zout.fn = NULL;
642  zq->_zout.fdno = -1;
643  zq->iblocksize = zq->blocksize;
644  zq->ilimit = limit;
645  zq->oblocksize = zq->blocksize;
646  zq->olimit = limit;
647 /*@-assignexpose@*/
648  zq->zlog = zlog;
649 /*@=assignexpose@*/
650  return zq;
651 }
652 
653 rpmzFIFO rpmzqInitFIFO(long val)
654 {
655  rpmzFIFO zs = (rpmzFIFO) xcalloc(1, sizeof(*zs));
656  zs->have = yarnNewLock(val);
657  zs->head = NULL;
658  zs->tail = &zs->head;
659  return zs;
660 }
661 
663 {
664  if (zs->have != NULL)
665  zs->have = yarnFreeLock(zs->have);
666  zs->head = NULL;
667  zs->tail = &zs->head;
668  zs = _free(zs);
669  return NULL;
670 }
671 
672 void rpmzqVerifyFIFO(rpmzFIFO zs)
673 {
674 assert(zs != NULL);
675  yarnPossess(zs->have);
676 assert(zs->head == NULL && yarnPeekLock(zs->have) == 0);
677  yarnRelease(zs->have);
678 }
679 
681 {
682  rpmzJob job;
683 
684  /* get job from compress list, let all the compressors know */
685  yarnPossess(zs->have);
686  yarnWaitFor(zs->have, NOT_TO_BE, 0);
687  job = zs->head;
688 assert(job != NULL);
689  if (job->seq == -1) {
690  yarnRelease(zs->have);
691  return NULL;
692  }
693 
694 /*@-assignexpose -dependenttrans@*/
695  zs->head = job->next;
696 /*@=assignexpose =dependenttrans@*/
697  if (job->next == NULL)
698  zs->tail = &zs->head;
699  yarnTwist(zs->have, BY, -1);
700 
701  return job;
702 }
703 
704 void rpmzqAddFIFO(rpmzFIFO zs, rpmzJob job)
705 {
706  /* put job at end of compress list, let all the compressors know */
707  yarnPossess(zs->have);
708  job->next = NULL;
709 /*@-assignexpose@*/
710  *zs->tail = job;
711  zs->tail = &job->next;
712 /*@=assignexpose@*/
713  yarnTwist(zs->have, BY, 1);
714 }
715 
716 rpmzSEQ rpmzqInitSEQ(long val)
717 {
718  rpmzSEQ zs = (rpmzSEQ) xcalloc(1, sizeof(*zs));
719  zs->first = yarnNewLock(val);
720  zs->head = NULL;
721  return zs;
722 }
723 
725 {
726  if (zs->first != NULL)
727  zs->first = yarnFreeLock(zs->first);
728  zs->head = NULL;
729  zs = _free(zs);
730  return NULL;
731 }
732 
733 void rpmzqVerifySEQ(rpmzSEQ zs)
734 {
735 assert(zs != NULL);
736  yarnPossess(zs->first);
737 assert(zs->head == NULL && yarnPeekLock(zs->first) == -1);
738  yarnRelease(zs->first);
739 }
740 
741 rpmzJob rpmzqDelSEQ(rpmzSEQ zs, long seq)
742 {
743  rpmzJob job;
744 
745  /* get next read job in order */
746  yarnPossess(zs->first);
747  yarnWaitFor(zs->first, TO_BE, seq);
748  job = zs->head;
749 assert(job != NULL);
750 /*@-assignexpose -dependenttrans@*/
751  zs->head = job->next;
752 /*@=assignexpose =dependenttrans@*/
753  yarnTwist(zs->first, TO, zs->head == NULL ? -1 : zs->head->seq);
754  return job;
755 }
756 
757 void rpmzqAddSEQ(rpmzSEQ zs, rpmzJob job)
758 {
759  rpmzJob here; /* pointers for inserting in SEQ list */
760  rpmzJob * prior; /* pointers for inserting in SEQ list */
761 
762  yarnPossess(zs->first);
763 
764  /* insert read job in list in sorted order, alert read thread */
765  prior = &zs->head;
766  while ((here = *prior) != NULL) {
767  if (here->seq > job->seq)
768  break;
769  prior = &here->next;
770  }
771 /*@-assignexpose@*/
772  job->next = here;
773 /*@=assignexpose@*/
774  *prior = job;
775 
776  yarnTwist(zs->first, TO, zs->head->seq);
777 }
778 
780 {
781  rpmzJob job;
782 
783  /* get job from compress list, let all the compressors know */
784  yarnPossess(zq->_zc.q->have);
785  yarnWaitFor(zq->_zc.q->have, NOT_TO_BE, 0);
786  job = zq->_zc.q->head;
787 assert(job != NULL);
788  if (job->seq == -1) {
789  yarnRelease(zq->_zc.q->have);
790  return NULL;
791  }
792 
793 /*@-assignexpose -dependenttrans@*/
794  zq->_zc.q->head = job->next;
795 /*@=assignexpose =dependenttrans@*/
796  if (job->next == NULL)
797  zq->_zc.q->tail = &zq->_zc.q->head;
798  yarnTwist(zq->_zc.q->have, BY, -1);
799 
800  return job;
801 }
802 
803 void rpmzqAddCJob(rpmzQueue zq, rpmzJob job)
804 {
805  /* put job at end of compress list, let all the compressors know */
806  yarnPossess(zq->_zc.q->have);
807  job->next = NULL;
808 /*@-assignexpose@*/
809  *zq->_zc.q->tail = job;
810  zq->_zc.q->tail = &job->next;
811 /*@=assignexpose@*/
812  yarnTwist(zq->_zc.q->have, BY, 1);
813 }
814 
815 rpmzJob rpmzqDelWJob(rpmzQueue zq, long seq)
816 {
817  rpmzJob job;
818 
819  /* get next write job in order */
820  yarnPossess(zq->_zw.q->first);
821  yarnWaitFor(zq->_zw.q->first, TO_BE, seq);
822  job = zq->_zw.q->head;
823 assert(job != NULL);
824 /*@-assignexpose -dependenttrans@*/
825  zq->_zw.q->head = job->next;
826 /*@=assignexpose =dependenttrans@*/
827  yarnTwist(zq->_zw.q->first, TO, zq->_zw.q->head == NULL ? -1 : zq->_zw.q->head->seq);
828  return job;
829 }
830 
831 void rpmzqAddWJob(rpmzQueue zq, rpmzJob job)
832 {
833  rpmzLog zlog = zq->zlog;
834 
835  rpmzJob here; /* pointers for inserting in write list */
836  rpmzJob * prior; /* pointers for inserting in write list */
837  double pct;
838 
839  yarnPossess(zq->_zw.q->first);
840 
841  switch (zq->omode) {
842  default: assert(0); break;
843  case O_WRONLY:
844  pct = (100.0 * job->out->len) / job->in->len;
845  zqFprintf(stderr, " job %p[%ld]:\t%p[%u] => %p[%u]\t(%3.1f%%)\n",
846  job, job->seq, job->in->buf, (unsigned)job->in->len,
847  job->out->buf, (unsigned)job->out->len, pct);
848  Trace((zlog, "-- compressed #%ld %3.1f%%%s", job->seq, pct,
849  (job->more ? "" : " (last)")));
850  break;
851  case O_RDONLY:
852  pct = (100.0 * job->in->len) / job->out->len;
853  zqFprintf(stderr, " job %p[%ld]:\t%p[%u] <= %p[%u]\t(%3.1f%%)\n",
854  job, job->seq, job->in->buf, (unsigned)job->in->len,
855  job->out->buf, (unsigned)job->out->len, pct);
856  Trace((zlog, "-- decompressed #%ld %3.1f%%%s", job->seq, pct,
857  (job->more ? "" : " (last)")));
858  break;
859  }
860 
861  /* insert write job in list in sorted order, alert write thread */
862  prior = &zq->_zw.q->head;
863  while ((here = *prior) != NULL) {
864  if (here->seq > job->seq)
865  break;
866  prior = &here->next;
867  }
868 /*@-assignexpose@*/
869  job->next = here;
870 /*@=assignexpose@*/
871  *prior = job;
872 
873  yarnTwist(zq->_zw.q->first, TO, zq->_zw.q->head->seq);
874 }
875 
876 static rpmzJob rpmzqFillOut(rpmzQueue zq, /*@returned@*/rpmzJob job, rpmbz bz)
877  /*@globals fileSystem, internalState @*/
878  /*@modifies zq, job, fileSystem, internalState @*/
879 {
880  size_t outlen;
881  int ret;
882 
883  switch (zq->omode) {
884  default: assert(0); break;
885  case O_WRONLY:
886  /* set up input and output (the output size is assured to be big enough
887  * for the worst case expansion of the input buffer size, plus five
888  * bytes for the terminating stored block) */
889  outlen = ((job->in->len*1.01)+600);
890 /*@-mustfreeonly@*/
891  job->out = rpmzqNewSpace(zq->_zw.pool, zq->_zw.pool->size);
892 /*@=mustfreeonly@*/
893  if (job->out->len < outlen) {
894 fprintf(stderr, "==> FIXME: %s: job->out %p %p[%u] malloc(%u)\n", __FUNCTION__, job->out, job->out->buf, (unsigned)job->out->len, (unsigned)outlen);
895  job->out = rpmzqDropSpace(job->out);
896  job->out = (rpmzSpace) xcalloc(1, sizeof(*job->out));
897  job->out->len = outlen;
898  job->out->buf = (unsigned char *) xmalloc(job->out->len);
899  }
900 
901  /* compress job->in to job-out */
902  ret = rpmbzCompressBlock(bz, job);
903  break;
904  case O_RDONLY:
905  outlen = 6 * job->in->len;
906  job->out = rpmzqNewSpace(zq->_zw.pool, zq->_zw.pool->size);
907  if (job->out->len < outlen) {
908 fprintf(stderr, "==> FIXME: %s: job->out %p %p[%u] malloc(%u)\n", __FUNCTION__, job->out, job->out->buf, (unsigned)job->out->len, (unsigned)outlen);
909  job->out = rpmzqDropSpace(job->out);
910  job->out = (rpmzSpace) xcalloc(1, sizeof(*job->out));
911  job->out->len = outlen;
912  job->out->buf = (unsigned char *) xmalloc(job->out->len);
913  }
914 
915  for (;;) {
916  static int _grow = 2; /* XXX factor to scale malloc by. */
917 
918  outlen = job->out->len * _grow;
919  ret = rpmbzDecompressBlock(bz, job);
920  if (ret != BZ_OUTBUFF_FULL)
921  /*@loopbreak@*/ break;
922 fprintf(stderr, "==> FIXME: %s: job->out %p %p[%u] realloc(%u)\n", __FUNCTION__, job->out, job->out->buf, (unsigned)job->out->len, (unsigned)outlen);
923  if (job->out->use != NULL)
924  job->out = rpmzqDropSpace(job->out);
925  else {
926 fprintf(stderr, "==> FIXME: %s: job->out %p %p[%u] free\n", __FUNCTION__, job->out, job->out->buf, (unsigned)job->out->len);
927  job->out->buf = _free(job->out->buf);
928  job->out = _free(job->out);
929  }
930  job->out = (rpmzSpace) xcalloc(1, sizeof(*job->out));
931  job->out->len = outlen;
932  job->out->buf = (unsigned char *) xmalloc(job->out->len);
933  }
934 assert(ret == BZ_OK);
935  break;
936  }
937  return job;
938 }
939 
940 /* get the next compression job from the head of the list, compress and compute
941  the check value on the input, and put a job in the write list with the
942  results -- keep looking for more jobs, returning when a job is found with a
943  sequence number of -1 (leave that job in the list for other incarnations to
944  find) */
945 static void rpmzqCompressThread (void *_zq)
946  /*@globals fileSystem, internalState @*/
947  /*@modifies _zq, fileSystem, internalState @*/
948 {
949  rpmzQueue zq = (rpmzQueue) _zq;
950  rpmbz bz = rpmbzInit(zq->level, -1, -1, zq->omode);
951  rpmzJob job;
952 
953 zqFprintf(stderr, "--> %s(%p) bz %p\n", __FUNCTION__, zq, bz);
954 
955  /* get job, insert write job in list in sorted order, alert write thread */
956 /*@-evalorder@*/
957  while ((job = rpmzqDelCJob(zq)) != NULL) {
958  rpmzqAddWJob(zq, rpmzqFillOut(zq, job, bz));
959  }
960 /*@=evalorder@*/
961 
962  bz = rpmbzFini(bz);
963 }
964 
965 static void rpmzqDecompressThread(void *_zq)
966  /*@globals fileSystem, internalState @*/
967  /*@modifies _zq, fileSystem, internalState @*/
968 {
969  rpmzQueue zq = (rpmzQueue) _zq;
970  rpmbz bz = rpmbzInit(zq->level, -1, -1, zq->omode);
971  rpmzJob job;
972 
973 zqFprintf(stderr, "--> %s(%p) bz %p\n", __FUNCTION__, zq, bz);
974 
975  /* get job, insert write job in list in sorted order, alert write thread */
976 /*@-evalorder@*/
977  while ((job = rpmzqDelCJob(zq)) != NULL) {
978  rpmzqAddWJob(zq, rpmzqFillOut(zq, job, bz));
979  }
980 /*@=evalorder@*/
981 
982  bz = rpmbzFini(bz);
983 }
984 
985 /* start another compress/decompress thread if needed */
986 void rpmzqLaunch(rpmzQueue zq, long seq, unsigned int threads)
987 {
988  if (zq->_zc.cthreads < seq && zq->_zc.cthreads < (int)threads) {
989  switch (zq->omode) {
990  default: assert(0); break;
991  case O_WRONLY: (void)yarnLaunch(rpmzqCompressThread, zq); break;
992  case O_RDONLY: (void)yarnLaunch(rpmzqDecompressThread, zq); break;
993  }
994  zq->_zc.cthreads++;
995  }
996 }
997 
998 /* verify no more jobs, prepare for next use */
999 void rpmzqVerify(rpmzQueue zq)
1000 {
1001  rpmzqVerifyFIFO(zq->_zc.q);
1002  rpmzqVerifySEQ(zq->_zw.q);
1003 }
1004 
1005 #endif /* WITH_BZIP2 */
const bson * b
Definition: bson.h:280
static const char * suffix[]
Definition: rpmgrep.c:188
const char const double d
Definition: bson.h:800
void yarnTwist(yarnLock bolt, yarnTwistOP op, long val)
Definition: yarn.c:279
rpmzFIFO rpmzqInitFIFO(long val)
static const char * prog
Definition: parseScript.c:58
void rpmzqAddCJob(rpmzQueue zq, rpmzJob job)
const char const char size_t len
Definition: bson.h:823
void rpmzqLaunch(rpmzQueue zq, long seq, unsigned int threads)
start another compress/decompress thread if needed
void yarnPossess(yarnLock bolt)
Definition: yarn.c:262
struct rpmzJob_s * rpmzJob
Definition: rpmzq.h:27
rpmzJob rpmzqDelWJob(rpmzQueue zq, long seq)
rpmzSEQ rpmzqFiniSEQ(rpmzSEQ zs)
Definition: yarn.h:171
long yarnPeekLock(yarnLock bolt)
Definition: yarn.c:325
rpmzSpace rpmzqNewSpace(rpmzPool pool, size_t len)
Get a space from a pool (or malloc if pool == NULL).
const char int time
Definition: bson.h:1005
struct rpmzQueue_s * rpmzQueue
Definition: rpmzq.h:23
rpmzPool rpmzqFreePool(rpmzPool pool, int *countp)
Definition: yarn.h:166
Job queue and buffer pool management.
rpmzQueue rpmzqNew(rpmzQueue zq, rpmzLog zlog, int limit)
int yarnJoinAll(void)
Definition: yarn.c:524
int _rpmzq_debug
static yarnThread threads
Definition: yarn.c:358
int rpmbzCompressBlock(void *_bz, rpmzJob job)
Yet Another syslog(3) API clone.
void yarnWaitFor(yarnLock bolt, yarnWaitOP op, long val)
Definition: yarn.c:295
void * xcalloc(size_t nmemb, size_t size)
Definition: rpmmalloc.c:300
const char const bson_bool_t v
Definition: bson.h:919
const char * mode
Definition: mongo.h:440
void rpmzqUseSpace(rpmzSpace space)
Increment the use count to require one more drop before returning this space to the pool...
#define N_(Text)
Definition: system.h:531
struct rpmzSEQ_s * rpmzSEQ
Definition: rpmzq.h:30
rpmzPool rpmzqNewPool(size_t size, int limit)
void rpmzqAddWJob(rpmzQueue zq, rpmzJob job)
const char const bson * data
Definition: mongo.h:463
struct rpmzLog_s * rpmzLog
trace log pointer
Definition: rpmzlog.h:11
#define POPT_ARGFLAG_TOGGLE
Definition: poptIO.c:69
yarnLock yarnNewLock(long initial)
Definition: yarn.c:248
const char const bson const bson int limit
Definition: mongo.h:569
yarnLock yarnFreeLock(yarnLock bolt)
Definition: yarn.c:330
void rpmzqVerifySEQ(rpmzSEQ zs)
static const char * file
Definition: parseFiles.c:20
rpmzJob rpmzqUseJob(rpmzJob job)
struct rpmbz_s * rpmbz
Definition: rpmbz.h:157
struct rpmzPool_s * rpmzPool
Definition: rpmzq.h:19
rpmzJob rpmzqDelCJob(rpmzQueue zq)
rpmzJob rpmzqDelFIFO(rpmzFIFO zs)
void rpmzqAddFIFO(rpmzFIFO zs, rpmzJob job)
struct rpmzSpace_s * rpmzSpace
Definition: rpmzq.h:15
const char const char int arg
Definition: mongo.h:777
void rpmzqAddSEQ(rpmzSEQ zs, rpmzJob job)
void yarnRelease(yarnLock bolt)
Definition: yarn.c:270
rpmzJob rpmzqNewJob(long seq)
void rpmzqFini(rpmzQueue zq)
command the compress threads to all return, then join them all (call from main thread), free all the thread-related resources
static void output(int indent, int *offset, const char *fmt,...)
Definition: rpmmtree.c:2497
#define L(CS)
Definition: fnmatch.c:161
rpmzQueue rpmzqFree(rpmzQueue zq)
struct poptOption rpmzqOptionsPoptTable[]
const char const int i
Definition: bson.h:778
rpmzJob rpmzqDropJob(rpmzJob job)
const char const bson const bson bson * out
Definition: mongo.h:678
static void set(char *t, NODE *ip)
Definition: rpmmtree.c:1408
const char const char int int max
Definition: mongo.h:749
yarnThread yarnLaunch(void(*probe)(void *), void *payload)
Definition: yarn.c:481
rpmzQueue _rpmzq
static int compression
Definition: rpmrepo.c:1652
rpmzSpace rpmzqDropSpace(rpmzSpace space)
Drop a space, returning to the pool (or free'ing if no pool) when the use count is zero...
const char const char size_t size
Definition: bson.h:895
rpmzJob rpmzqDelSEQ(rpmzSEQ zs, long seq)
static void * _free(const void *p)
Wrapper to free(3), hides const compilation noise, permit NULL, return NULL.
Definition: rpmiotypes.h:756
rpmzFIFO rpmzqFiniFIFO(rpmzFIFO zs)
const bson * in
Definition: bson.h:746
rpmzSEQ rpmzqInitSEQ(long val)
#define RPMZ_FLAGS_FORCE
Definition: rpmzq.h:102
static const char * name
Definition: yarn.h:166
#define _(Text)
Definition: system.h:29
#define xmalloc
Definition: system.h:32
struct rpmzFIFO_s * rpmzFIFO
Definition: rpmzq.h:29
void rpmzqVerify(rpmzQueue zq)
verify no more jobs, prepare for next use
void rpmzqInit(rpmzQueue zq)
setup job lists (call from main thread)
const char const bson const bson int int int options
Definition: mongo.h:569
void rpmzqVerifyFIFO(rpmzFIFO zs)