4
\$\begingroup\$

I encountered the concurrency-zip project in OSTEP By Remzi Arpaci-Dusseau. So basically, I've built a simple compression program that uses Run-length Encoding, and multi-threading in C. But I couldn't find any correct solution so I think I can use this forum for code review.

Essentially, the idea is based on Producer-Consumer problem where

  • One producer thread will load the files and split them into equal-size pages. The page size will be defined by the Linux kernel using sysconf(_SC_PAGE_SIZE). The splitting process will be using mmap to load files into memory for high performance. All the split pages will be put into a work queue. The queue is an unbounded buffer,which is different from traditional bounded-buffer where the buffer is bounded.

  • Many consumer threads will take a work out of the queue and do the compression using RLE. The number of threads will be defined with get_nprocs by the C library. After taking and compressing one work, each consumer will be put into the result queue and wait for the previous works to be finished.

  • The main thread will take all the results from the result queue and write into the stdout. Use Linux redirection if a destination file is specified.

More detail can found on the owner's README

Code:

#include <stdio.h> // stderr, stdout, perror, fprintf, fwrite #include <stdlib.h> // exit, EXIT_FAILURE, EXIT_SUCCESS, malloc #include <pthread.h> // pthread_t, pthread_create, phthread_exit, pthread_join #include <fcntl.h> // sysconf, open, O_* #include <unistd.h> // _SC_* #include <semaphore.h> // sem_t, sem_init, sem_wait, sem_post #include <sys/sysinfo.h> // get_nprocs #include <sys/stat.h> // stat, fstat #include <sys/mman.h> // mmap #include <string.h> // memset #define handle_error(msg) \ do \ { \ perror(msg); \ exit(EXIT_FAILURE); \ } while (0) /* Global object */ // Argument object for producer thread typedef struct _arg { int argc; char **argv; } arg_t; // Page object for munmap typedef struct _page { char *addr; long size; } page_t; // Work produced by producer typedef struct _work { char *addr; long pagesz; long pagenm; long filenm; struct _work *next; } work_t; // Result after a work consumed by consumer typedef struct _rle { char c; int count; struct _rle *next; } result_t; /* Global variables */ long nprocs; // Number of processes long nfiles; // Number of files long pagesz; // Page size long pagenm; // Page number # long filenm; // File number # static int done = 0; static int curr_page = 0; static int *npage_onfile; static work_t *works, *work_head, *work_tail; static result_t *results, *result_tail; static sem_t mutex, filled, page; static sem_t *order; /* Global functions */ void * producer(void *args); void *consumer(void *args); void wenqueue(work_t work); work_t *wdequeue(); result_t *compress(work_t work); void renqueue(result_t *result); int main(int argc, char **argv) { if (argc < 2) { fprintf(stderr, "pzip: file1 [file2 ...]\n"); exit(EXIT_FAILURE); } arg_t *args = malloc(sizeof(arg_t)); if (args == NULL) handle_error("Malloc args"); args->argc = argc - 1; args->argv = argv + 1; // Remove the first argument nprocs = get_nprocs(); // Let the system decide how many thread will be used nfiles = argc - 1; // Remove the first argument pagesz = sysconf(_SC_PAGE_SIZE); // Let the system decide how big a page is order = malloc(sizeof(sem_t) * nprocs); npage_onfile = malloc(sizeof(int) * nfiles); memset(npage_onfile, 0, sizeof(int) * nfiles); sem_init(&mutex, 0, 1); sem_init(&filled, 0, 0); sem_init(&page, 0, 0); pthread_t pid, cid[nprocs]; pthread_create(&pid, NULL, producer, (void *)args); for (int i = 0; i < nprocs; i++) { pthread_create(&cid[i], NULL, consumer, (void *)args); sem_init(&order[i], 0, i ? 0 : 1); } for (int i = 0; i < nprocs; i++) { pthread_join(cid[i], NULL); } pthread_join(pid, NULL); for (result_t *curr = results; curr != NULL; curr = curr->next) { fwrite((char *)&(curr->count), sizeof(int), 1, stdout); fwrite((char *)&(curr->c), sizeof(char), 1, stdout); } sem_destroy(&filled); sem_destroy(&mutex); sem_destroy(&page); for (int i = 0; i < nprocs; i++) { sem_destroy(&order[i]); } for (result_t *curr = results; curr != NULL; curr = results) { results = results->next; free(curr); curr = NULL; } for (work_t *curr = works; curr != NULL; curr = works) { munmap(curr->addr, curr->pagesz); works = works->next; free(curr); curr = NULL; } free(order); free(npage_onfile); return 0; } void wenqueue(work_t work) { if (works == NULL) { works = malloc(sizeof(work_t)); if (works == NULL) { handle_error("malloc work"); sem_post(&mutex); } works->addr = work.addr; works->filenm = work.filenm; works->pagenm = work.pagenm; works->pagesz = work.pagesz; works->next = NULL; work_head = works; work_tail = works; } else { work_tail->next = malloc(sizeof(work_t)); if (work_tail->next == NULL) { handle_error("malloc work"); sem_post(&mutex); } work_tail->next->addr = work.addr; work_tail->next->filenm = work.filenm; work_tail->next->pagenm = work.pagenm; work_tail->next->pagesz = work.pagesz; work_tail->next->next = NULL; work_tail = work_tail->next; } } void *producer(void *args) { arg_t *arg = (arg_t *)args; char **fnames = arg->argv; for (int i = 0; i < arg->argc; i++) { int fd = open(fnames[i], O_RDONLY); if (fd == -1) handle_error("open error"); struct stat sb; if (fstat(fd, &sb) == -1) handle_error("fstat error"); if (sb.st_size == 0) continue; int p4f = sb.st_size / pagesz; if ((double)sb.st_size / pagesz > p4f) p4f++; int offset = 0; npage_onfile[i] = p4f; char *addr = mmap(NULL, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0); for (int j = 0; j < p4f; j++) { // it should be less than or equal to the default page size int curr_pagesz = (j < p4f - 1) ? pagesz : sb.st_size - ((p4f - 1) * pagesz); offset += curr_pagesz; work_t work; work.addr = addr; work.filenm = i; work.pagenm = j; work.pagesz = curr_pagesz; work.next = NULL; sem_wait(&mutex); wenqueue(work); sem_post(&mutex); sem_post(&filled); addr += curr_pagesz; } close(fd); } done = 1; for (int i = 0; i < nprocs; i++) { sem_post(&filled); } pthread_exit(NULL); } work_t *wdequeue() { if (work_head == NULL) return NULL; work_t *tmp = work_head; work_head = work_head->next; return tmp; } result_t *compress(work_t work) { result_t *result = malloc(sizeof(result_t)); if (result == NULL) handle_error("malloc result"); result_t *tmp = result; int count = 0; char c, last; for (int i = 0; i < work.pagesz; i++) { c = work.addr[i]; if (count && last != c) { tmp->c = last; tmp->count = count; tmp->next = malloc(sizeof(result_t)); tmp = tmp->next; count = 0; } last = c; count++; } if (count) { tmp->c = last; tmp->count = count; tmp->next = NULL; } return result; } void renqueue(result_t *result) { if (results == NULL) { results = result; } else { if (result_tail->c == result->c) { result_tail->count += result->count; result = result->next; } result_tail->next = result; } result_t *curr = result; for (; curr->next != NULL; curr = curr->next) { } result_tail = curr; } void *consumer(void *args) { work_t *work; while (!done || work_head != NULL) { sem_wait(&filled); sem_wait(&mutex); if (work_head == work_tail && !done) { sem_post(&mutex); continue; } else if (work_head == NULL) { sem_post(&mutex); return NULL; } else { work = work_head; work_head = work_head->next; sem_post(&mutex); } result_t *result = compress(*work); if (work->filenm == 0 && work->pagenm == 0) { sem_wait(&order[0]); renqueue(result); if (work->pagenm == npage_onfile[work->filenm] - 1) { sem_post(&order[0]); curr_page++; } else sem_post(&order[1]); sem_post(&page); } else { while (1) { sem_wait(&page); if (curr_page != work->filenm) { sem_post(&page); continue; } if (curr_page == nfiles) { sem_post(&page); return NULL; } sem_post(&page); sem_wait(&order[work->pagenm % nprocs]); sem_wait(&page); renqueue(result); if (work->filenm == curr_page && work->pagenm < npage_onfile[work->filenm] - 1) { sem_post(&order[(work->pagenm + 1) % nprocs]); } else if (work->filenm == curr_page && work->pagenm == npage_onfile[work->filenm] - 1) { sem_post(&order[0]); curr_page++; } sem_post(&page); break; } } } return NULL; } 

What I want to check:

  1. Correctness
  2. Performance
  3. Memory leak
  4. Syntax and code organization

I've already tested with small files. The parallel zip is supposed to have a better performance compared to the sequential zip (non multi-threading). But my program doesn't run faster significantly, and even it runs slower. What I can only think of is the free process at the end that slows the performance

EDITED UPDATE

I want to clarify some of my questions. The output of compression with parallel zip is correct. Although I don't have a matching decompress program with parallel zip, I test the result with the sequential zip with Linux diff. The standard sequential zip is also found here. It's acceptable to lose data when compress multiple files, which means the result cannot be decompressed back to the correct original files.

What I ask for correctness is more about the multi-threading and memory mapping. Is what I'm doing correct or is there anything I can improve?

For testing performance, I use Linux time and compare execution time. I created the test file by myself; it just contains many runs so that RLE can work best. The size of of the test file is around 150Mb. For one input file, the difference is negligible and for multiple files, parallel zip starts to decrease performance.

$ time ./pzip test.txt test.txt test.txt > pzip.z real 0m0.042s user 0m0.061s sys 0m0.009s $ time ./wzip test.txt test.txt test.txt > wzip.z real 0m0.025s user 0m0.017s sys 0m0.008s $ time ./wzip test.txt > wzip.z real 0m0.026s user 0m0.019s sys 0m0.007s $ time ./pzip test.txt > pzip.z real 0m0.020s user 0m0.014s sys 0m0.018s 

For memory leak, I don't really how to check it efficiently. I just simply mark wherever I have used malloc or mmap, I free them later.

Thanks in advance!

\$\endgroup\$
6
  • \$\begingroup\$Welcome to Code Review! If you really want to attract some users that follow the tag performance then you could consider replacing one of the existing tags with that one.\$\endgroup\$CommentedFeb 22, 2022 at 21:46
  • 1
    \$\begingroup\$Thank you for your tip! I updated with the tag performance.\$\endgroup\$CommentedFeb 22, 2022 at 22:30
  • 1
    \$\begingroup\$It's probably worth showing what testing you've done (for example, have you executed it using a memory checker to help identify memory issues? Do you have a matching uncompress program to test the correctness of the output?).\$\endgroup\$CommentedFeb 23, 2022 at 10:56
  • \$\begingroup\$I will update more detail about what I've been testing. The output is correct. Although I don't have a matching uncompress program, I use Linux diff to compare with sequential compress.\$\endgroup\$CommentedFeb 23, 2022 at 15:09
  • \$\begingroup\$Wait, why is it acceptable to lose data?\$\endgroup\$
    – CWallach
    CommentedFeb 24, 2022 at 6:27

1 Answer 1

1
\$\begingroup\$

Overall the code is very readable and maintainable. The vertical spacing definitely helps the readability. The function name wenqueue() is clear that it does some kind of queuing, but what kind of queuing is it?

Answers to your questions

  • Correctness

I don't do formal proofs of correctness as part of a review, sorry. The functional testing and unit testing should imply the correctness of the code. You already have functional testing covered, you might want to develop some unit tests to test each of the functions.

  • Performance

I'll try to come back to answer this later.

  • Memory leak

I don't see any memory leaks, but the code is a little to complex to check quickly.

  • Syntax and code organization

A comment about the code organization, the more code you write the more error that creap into the code. Rather than creating function prototypes and having the main() function first it would be better to organize the code so that the main() function is last. Organize the other functions to reduce the need for function prototypes as much as possible. Try to reduce function prototypes to be in header files only. You might want to break the program into separagte modules (files) so that each module does one thing. You might also want to think about creating a queue mechanism that the producer can add to and the consumer can consume.

Capitalize the first letter of your typedef names to make the structs or objects clearer in the code.

Memory Allocation Issues

There are several places in the code where the memory allocation test is not performed, for instance in the main() function the allocation of args is tested, but the allocations for the variables order and npage_onfile are not tested. The code needs to be consistent, and all memory allocations should be tested. Search for malloc() in the code, there are other places where the results of malloc() are not tested, such as in the function compress().

In this code malloc() could be replaced by the function void* calloc( size_t num, size_t size ) instead.

 npage_onfile = malloc(sizeof(int) * nfiles); memset(npage_onfile, 0, sizeof(int) * nfiles); 

Prefer Functions Over Macros

While you have separated your error handling out, it would be better if it was a function rather than a macro. Functions are easier to debug and maintain then macros are. If you want to use the error handling in multiple files it would also be better if it was a function. Perhaps the error handling should be in it's own C source file with a header file to accompany it.

Avoid Global Variables

It is very difficult to read, write, debug and maintain programs that use global variables. Global variables can be modified by any function within the program and therefore require each function to be examined before making changes in the code. In C and C++ global variables impact the namespace and they can cause linking errors if they are defined in multiple files. The answers in this stackoverflow question provide a fuller explanation.

DRY Code

There is a programming principle called the Don't Repeat Yourself Principle sometimes referred to as DRY code. If you find yourself repeating the same code mutiple times it is better to encapsulate it in a function. If it is possible to loop through the code that can reduce repetition as well. The code in the function void wenqueue(work_t work) can be simplified by reducing the amount of replication in the function. There are a couple of ways to accomplish this, one would be to create a function that creates a work node, the other would be to create the node first and then either assign the node as the head of the work queue or as the tail of the work queue.

Complexity

There are at least 2 functions that are too complex (does too much), the main() function and the void* consumer(void* args) function. Both functions should be broken into smaller functions that perform single actions if possible. As programs grow in size the use of main() should be limited to calling functions that parse the command line, calling functions that set up for processing, calling functions that execute the desired function of the program, and calling functions to clean up after the main portion of the program. A general rule in programming is if the function is larger than a single screen in an editor or IDE the function is too large and too complex to understand.

Cyclomatic complexity is a measurement developed by Thomas McCabe to determine the stability and level of confidence in a program. It measures the number of linearly-independent paths through a program module. Programs with lower Cyclomatic complexity are easier to understand and less risky to modify.

There is also a programming principle called the Single Responsibility Principle that applies here. The Single Responsibility Principle states:

that every module, class, or function should have responsibility over a single part of the functionality provided by the software, and that responsibility should be entirely encapsulated by that module, class or function.

One of the functions to help the main() function that should be created is a function that cleans up all the allocated memory.

\$\endgroup\$
2
  • \$\begingroup\$Thank you for your thorough review. To be honest, I'm surprised of what you have pointed out because I'm still learning C and pretty much every line of code is from the man page, especially the macro for handling errors. I thought the code from man page was sort of standard.\$\endgroup\$CommentedFeb 24, 2022 at 20:28
  • \$\begingroup\$@RichardNguyen C programming has a very long history, the man page may have been written long ago, macros are still used a lot in C, but they are frowned upon in C++. Well written C can be easily ported to C++, but the reverse is not true. In the long run functions are better than macros. The calloc link is one of two references I use for C and C++.\$\endgroup\$
    – pacmaninbw
    CommentedFeb 24, 2022 at 21:16

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.