I encountered the concurrency-zip project in OSTEP By Remzi Arpaci-Dusseau. So basically, I've built a simple compression program that uses Run-length Encoding, and multi-threading in C. But I couldn't find any correct solution so I think I can use this forum for code review.
Essentially, the idea is based on Producer-Consumer problem where
One producer thread will load the files and split them into equal-size pages. The page size will be defined by the Linux kernel using
sysconf(_SC_PAGE_SIZE)
. The splitting process will be usingmmap
to load files into memory for high performance. All the split pages will be put into awork
queue. The queue is an unbounded buffer,which is different from traditional bounded-buffer where the buffer is bounded.Many consumer threads will take a
work
out of the queue and do the compression using RLE. The number of threads will be defined withget_nprocs
by the C library. After taking and compressing one work, each consumer will be put into theresult
queue and wait for the previous works to be finished.The main thread will take all the results from the
result
queue and write into thestdout
. Use Linux redirection if a destination file is specified.
More detail can found on the owner's README
Code:
#include <stdio.h> // stderr, stdout, perror, fprintf, fwrite #include <stdlib.h> // exit, EXIT_FAILURE, EXIT_SUCCESS, malloc #include <pthread.h> // pthread_t, pthread_create, phthread_exit, pthread_join #include <fcntl.h> // sysconf, open, O_* #include <unistd.h> // _SC_* #include <semaphore.h> // sem_t, sem_init, sem_wait, sem_post #include <sys/sysinfo.h> // get_nprocs #include <sys/stat.h> // stat, fstat #include <sys/mman.h> // mmap #include <string.h> // memset #define handle_error(msg) \ do \ { \ perror(msg); \ exit(EXIT_FAILURE); \ } while (0) /* Global object */ // Argument object for producer thread typedef struct _arg { int argc; char **argv; } arg_t; // Page object for munmap typedef struct _page { char *addr; long size; } page_t; // Work produced by producer typedef struct _work { char *addr; long pagesz; long pagenm; long filenm; struct _work *next; } work_t; // Result after a work consumed by consumer typedef struct _rle { char c; int count; struct _rle *next; } result_t; /* Global variables */ long nprocs; // Number of processes long nfiles; // Number of files long pagesz; // Page size long pagenm; // Page number # long filenm; // File number # static int done = 0; static int curr_page = 0; static int *npage_onfile; static work_t *works, *work_head, *work_tail; static result_t *results, *result_tail; static sem_t mutex, filled, page; static sem_t *order; /* Global functions */ void * producer(void *args); void *consumer(void *args); void wenqueue(work_t work); work_t *wdequeue(); result_t *compress(work_t work); void renqueue(result_t *result); int main(int argc, char **argv) { if (argc < 2) { fprintf(stderr, "pzip: file1 [file2 ...]\n"); exit(EXIT_FAILURE); } arg_t *args = malloc(sizeof(arg_t)); if (args == NULL) handle_error("Malloc args"); args->argc = argc - 1; args->argv = argv + 1; // Remove the first argument nprocs = get_nprocs(); // Let the system decide how many thread will be used nfiles = argc - 1; // Remove the first argument pagesz = sysconf(_SC_PAGE_SIZE); // Let the system decide how big a page is order = malloc(sizeof(sem_t) * nprocs); npage_onfile = malloc(sizeof(int) * nfiles); memset(npage_onfile, 0, sizeof(int) * nfiles); sem_init(&mutex, 0, 1); sem_init(&filled, 0, 0); sem_init(&page, 0, 0); pthread_t pid, cid[nprocs]; pthread_create(&pid, NULL, producer, (void *)args); for (int i = 0; i < nprocs; i++) { pthread_create(&cid[i], NULL, consumer, (void *)args); sem_init(&order[i], 0, i ? 0 : 1); } for (int i = 0; i < nprocs; i++) { pthread_join(cid[i], NULL); } pthread_join(pid, NULL); for (result_t *curr = results; curr != NULL; curr = curr->next) { fwrite((char *)&(curr->count), sizeof(int), 1, stdout); fwrite((char *)&(curr->c), sizeof(char), 1, stdout); } sem_destroy(&filled); sem_destroy(&mutex); sem_destroy(&page); for (int i = 0; i < nprocs; i++) { sem_destroy(&order[i]); } for (result_t *curr = results; curr != NULL; curr = results) { results = results->next; free(curr); curr = NULL; } for (work_t *curr = works; curr != NULL; curr = works) { munmap(curr->addr, curr->pagesz); works = works->next; free(curr); curr = NULL; } free(order); free(npage_onfile); return 0; } void wenqueue(work_t work) { if (works == NULL) { works = malloc(sizeof(work_t)); if (works == NULL) { handle_error("malloc work"); sem_post(&mutex); } works->addr = work.addr; works->filenm = work.filenm; works->pagenm = work.pagenm; works->pagesz = work.pagesz; works->next = NULL; work_head = works; work_tail = works; } else { work_tail->next = malloc(sizeof(work_t)); if (work_tail->next == NULL) { handle_error("malloc work"); sem_post(&mutex); } work_tail->next->addr = work.addr; work_tail->next->filenm = work.filenm; work_tail->next->pagenm = work.pagenm; work_tail->next->pagesz = work.pagesz; work_tail->next->next = NULL; work_tail = work_tail->next; } } void *producer(void *args) { arg_t *arg = (arg_t *)args; char **fnames = arg->argv; for (int i = 0; i < arg->argc; i++) { int fd = open(fnames[i], O_RDONLY); if (fd == -1) handle_error("open error"); struct stat sb; if (fstat(fd, &sb) == -1) handle_error("fstat error"); if (sb.st_size == 0) continue; int p4f = sb.st_size / pagesz; if ((double)sb.st_size / pagesz > p4f) p4f++; int offset = 0; npage_onfile[i] = p4f; char *addr = mmap(NULL, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0); for (int j = 0; j < p4f; j++) { // it should be less than or equal to the default page size int curr_pagesz = (j < p4f - 1) ? pagesz : sb.st_size - ((p4f - 1) * pagesz); offset += curr_pagesz; work_t work; work.addr = addr; work.filenm = i; work.pagenm = j; work.pagesz = curr_pagesz; work.next = NULL; sem_wait(&mutex); wenqueue(work); sem_post(&mutex); sem_post(&filled); addr += curr_pagesz; } close(fd); } done = 1; for (int i = 0; i < nprocs; i++) { sem_post(&filled); } pthread_exit(NULL); } work_t *wdequeue() { if (work_head == NULL) return NULL; work_t *tmp = work_head; work_head = work_head->next; return tmp; } result_t *compress(work_t work) { result_t *result = malloc(sizeof(result_t)); if (result == NULL) handle_error("malloc result"); result_t *tmp = result; int count = 0; char c, last; for (int i = 0; i < work.pagesz; i++) { c = work.addr[i]; if (count && last != c) { tmp->c = last; tmp->count = count; tmp->next = malloc(sizeof(result_t)); tmp = tmp->next; count = 0; } last = c; count++; } if (count) { tmp->c = last; tmp->count = count; tmp->next = NULL; } return result; } void renqueue(result_t *result) { if (results == NULL) { results = result; } else { if (result_tail->c == result->c) { result_tail->count += result->count; result = result->next; } result_tail->next = result; } result_t *curr = result; for (; curr->next != NULL; curr = curr->next) { } result_tail = curr; } void *consumer(void *args) { work_t *work; while (!done || work_head != NULL) { sem_wait(&filled); sem_wait(&mutex); if (work_head == work_tail && !done) { sem_post(&mutex); continue; } else if (work_head == NULL) { sem_post(&mutex); return NULL; } else { work = work_head; work_head = work_head->next; sem_post(&mutex); } result_t *result = compress(*work); if (work->filenm == 0 && work->pagenm == 0) { sem_wait(&order[0]); renqueue(result); if (work->pagenm == npage_onfile[work->filenm] - 1) { sem_post(&order[0]); curr_page++; } else sem_post(&order[1]); sem_post(&page); } else { while (1) { sem_wait(&page); if (curr_page != work->filenm) { sem_post(&page); continue; } if (curr_page == nfiles) { sem_post(&page); return NULL; } sem_post(&page); sem_wait(&order[work->pagenm % nprocs]); sem_wait(&page); renqueue(result); if (work->filenm == curr_page && work->pagenm < npage_onfile[work->filenm] - 1) { sem_post(&order[(work->pagenm + 1) % nprocs]); } else if (work->filenm == curr_page && work->pagenm == npage_onfile[work->filenm] - 1) { sem_post(&order[0]); curr_page++; } sem_post(&page); break; } } } return NULL; }
What I want to check:
- Correctness
- Performance
- Memory leak
- Syntax and code organization
I've already tested with small files. The parallel zip is supposed to have a better performance compared to the sequential zip (non multi-threading). But my program doesn't run faster significantly, and even it runs slower. What I can only think of is the free process at the end that slows the performance
EDITED UPDATE
I want to clarify some of my questions. The output of compression with parallel zip is correct. Although I don't have a matching decompress program with parallel zip, I test the result with the sequential zip with Linux diff
. The standard sequential zip is also found here. It's acceptable to lose data when compress multiple files, which means the result cannot be decompressed back to the correct original files.
What I ask for correctness is more about the multi-threading and memory mapping. Is what I'm doing correct or is there anything I can improve?
For testing performance, I use Linux time
and compare execution time. I created the test file by myself; it just contains many runs so that RLE can work best. The size of of the test file is around 150Mb. For one input file, the difference is negligible and for multiple files, parallel zip starts to decrease performance.
$ time ./pzip test.txt test.txt test.txt > pzip.z real 0m0.042s user 0m0.061s sys 0m0.009s $ time ./wzip test.txt test.txt test.txt > wzip.z real 0m0.025s user 0m0.017s sys 0m0.008s $ time ./wzip test.txt > wzip.z real 0m0.026s user 0m0.019s sys 0m0.007s $ time ./pzip test.txt > pzip.z real 0m0.020s user 0m0.014s sys 0m0.018s
For memory leak, I don't really how to check it efficiently. I just simply mark wherever I have used malloc
or mmap
, I free them later.
Thanks in advance!
diff
to compare with sequential compress.\$\endgroup\$