Question 4
You are tasked with developing a multi-threaded system using Pthreads to manage a dynamic task queue. The program should start with a user-specified number of worker threads that initially sleep in a condition wait. The main thread generates blocks of tasks (using linked list operations) and wakes a worker thread with a condition signal. After completing its tasks, the worker returns to the waiting state. Once all tasks are generated, the main thread updates a global variable to signal completion and uses a condition broadcast to wake all threads for termination. Implement the system with proper synchronization, ensuring efficient task distribution and clean thread shutdown.
Solution
The full source is also downloadable from here.
Building a Multi-Threaded Task Queue Using Pthreads
Multi-threading is a powerful approach to efficiently handle tasks in parallel. In this implementation, Pthreads and mutex locks manage a dynamic task queue. Multiple worker threads wait for tasks. They process them and then return to a waiting state. This continues until all tasks are completed. The main thread generates tasks. It signals workers when tasks are available. It also ensures a proper shutdown sequence once all tasks have been processed.
Understanding the Problem
The requirement is to create a system where a user specifies the number of worker threads. These threads initially remain in a waiting state. They should only wake up when a new task is added to the queue. Tasks are dynamically generated and stored in a linked list, which serves as the task queue. When a worker thread picks up a task, it removes it from the queue. It processes the task. Then, it checks for more work. If no tasks are available, it returns to waiting. After processing all tasks, the main thread must send a termination signal. This signal wakes all threads, allowing them to exit cleanly.
To achieve this, mutex locks (pthread_mutex_t) are used. They ensure exclusive access to the task queue. This prevents multiple threads from modifying it at the same time. Condition variables (pthread_cond_t) allow worker threads to efficiently sleep while waiting for tasks. This approach prevents them from continuously polling for work.
Program Execution and Synchronization
At the beginning of execution, the main thread reads the number of worker threads from user input. It dynamically allocates memory for them. It then initializes these threads using pthread_create(), assigning each thread to execute thread_function(). These worker threads immediately enter a waiting state using pthread_cond_wait(), ensuring they do not consume CPU cycles while idle.
The main thread generates tasks dynamically. It calls add_task() to create a new task node. The task node is then appended to the linked list. When a task is added, the main thread signals worker threads using pthread_cond_broadcast(). This wakes them up so they can process the available work. Each worker thread picks up a task. It removes it from the queue and processes it. Then, the worker thread decrements the work counter before going back to waiting.
Once all tasks are processed, the main thread updates a global flag (die = 1). This action signals that no new tasks will be added. It then broadcasts a final wake-up signal so all remaining worker threads can check the flag and exit. Finally, the main thread waits for all worker threads to finish execution using pthread_join(). This ensures that all resources are cleaned up before the program terminates.
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
pthread_mutex_t task_lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t available = PTHREAD_COND_INITIALIZER;
static int work = 0;
static int die = 0;
typedef struct node {
struct node * next;
} node_t;
node_t * head = NULL;
void add_task()
{
node_t * new_node = malloc(sizeof(node_t));
new_node->next = NULL;
printf("Head @%p\n", head);
printf("New task created at %p\n", new_node);
if(NULL == head) {
head = new_node;
} else {
node_t * temp = head;
node_t * prev = NULL;
while(temp)
{
// traverse through the list
prev = temp;
temp = temp->next;
}
printf("Adding new node after %p\n", prev);
prev->next = new_node;
++work; // increment the work counter
}
}
void remove_task()
{
node_t * temp = head;
printf("Head at %p\n", head);
if(head != NULL)
{
head = head->next;
if(NULL != temp) {
printf("Worker %ld Doing task @%p\n", pthread_self(), temp);
free(temp);
--work;
}
}
}
void * thread_function()
{
int ret = 0;
while(1)
{
ret = pthread_mutex_lock(&task_lock);
if(ret != 0) {
printf("pthread_mutex_lock failed\n");
}
while(work == 0 && !die) {
printf("Thread %ld going to conditional wait\n", pthread_self());
ret = pthread_cond_wait(&available, &task_lock);
if (ret != 0) {
printf("pthread_cond_wait faile\n");
}
}
if(die) {
pthread_mutex_unlock(&task_lock);
break;
}
if(work > 0) {
remove_task(); // do the work
printf("Work Done!\n");
}
pthread_mutex_unlock(&task_lock);
}
printf("Bye bye cruel world %ld\n", pthread_self());
return NULL;
}
int main(int argc, char **argv)
{
int ret = 0;
int i = 0;
int num_workers = 0;
if(argc < 2)
{
printf("\t Wrong usage\n");
printf("Launch the program as below: \n");
printf("\tpthread_question_4 <number of workers>");
exit(1);
}
num_workers = atoi(argv[1]);
pthread_t *thread = malloc(num_workers * sizeof(pthread_t));
if (!thread) {
printf("malloc failed\n");
exit(1);
}
for(i = 0; i < num_workers; ++i)
{
ret = pthread_create(&thread[i], NULL, thread_function, NULL);
if(ret != 0)
{
printf("pthread_cond_wait");
}
}
pthread_mutex_lock(&task_lock);
for(i=0; i < num_workers; ++i)
{
add_task();
}
sleep(5); // wait
if(work > 0) {
printf("work remaining\n");
}
pthread_cond_broadcast(&available);
pthread_mutex_unlock(&task_lock);
sleep(5);
pthread_mutex_lock(&task_lock);
if(!work) {
printf("All works done!\n");
die = 1;
pthread_cond_broadcast(&available);
}
pthread_mutex_unlock(&task_lock);
for(i=0; i < num_workers; ++i)
{
pthread_join(thread[i], NULL);
}
printf("Bye World\n");
if(thread != NULL)
free(thread);
return 0;
}
How to build the program
gcc pthread_question_4.c -o pthread_question_4 -lpthread
How to run the program
./pthread_question_4 6
Thread 133717876016704 going to conditional wait
Thread 133717886502464 going to conditional wait
Thread 133717865530944 going to conditional wait
Thread 133717764867648 going to conditional wait
Thread 133717844559424 going to conditional wait
Head @(nil)
New task created at 0x578f83b859a0
Head @0x578f83b859a0
New task created at 0x578f83b859c0
Adding new node after 0x578f83b859a0
Head @0x578f83b859a0
New task created at 0x578f83b859e0
Adding new node after 0x578f83b859c0
Head @0x578f83b859a0
New task created at 0x578f83b85a00
Adding new node after 0x578f83b859e0
Head @0x578f83b859a0
New task created at 0x578f83b85a20
Adding new node after 0x578f83b85a00
Head @0x578f83b859a0
New task created at 0x578f83b85a40
Adding new node after 0x578f83b85a20
work remaining
Head at 0x578f83b859a0
Worker 133717855045184 Doing task @0x578f83b859a0
Work Done!
Head at 0x578f83b859c0
Worker 133717855045184 Doing task @0x578f83b859c0
Work Done!
Head at 0x578f83b859e0
Worker 133717855045184 Doing task @0x578f83b859e0
Work Done!
Head at 0x578f83b85a00
Worker 133717855045184 Doing task @0x578f83b85a00
Work Done!
Head at 0x578f83b85a20
Worker 133717855045184 Doing task @0x578f83b85a20
Work Done!
Thread 133717855045184 going to conditional wait
Thread 133717876016704 going to conditional wait
Thread 133717764867648 going to conditional wait
Thread 133717865530944 going to conditional wait
Thread 133717844559424 going to conditional wait
Thread 133717886502464 going to conditional wait
All works done!
Bye bye cruel world 133717876016704
Bye bye cruel world 133717764867648
Bye bye cruel world 133717886502464
Bye bye cruel world 133717855045184
Bye bye cruel world 133717865530944
Bye bye cruel world 133717844559424
Bye World

1 Comment