tags: multithreading multiprocessing thread busy-waiting synchronization

Condition Variable

Condition variable is a synchronisation method where a process/thread is blocked by some condition. The thread is not restarted until the condition becomes true.

It commonly used with mutex.

Operations

The main operations associated with condition variable are.

  1. wait
    1. Suspend the process/thread calling this procedure until a signal to wake it up not send called by some other process/thread. It also unlocks the locked mutex.
  2. signal
    1. Resumes the suspended process/thread
  3. broadcast
    1. Resumes all of the process/threads waiting for the particular condition variable. Schedular then can choose which one to restart.

Following shows a C program using pthread.

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
 
#define MAX 10
 
pthread_mutex_t mutex;
pthread_cond_t condc, condp;
int buffer = 0;
 
void *producer() {
	int i;
 
	for (i = 1; i < MAX; i++) {
		pthread_mutex_lock(&mutex);
		while ( buffer != 0) {
			printf("Producer waiting\n");
			pthread_cond_wait(&condp, &mutex);
		}
		
		printf("Producer producing\n");
		buffer = i;
		pthread_cond_signal(&condc);
		pthread_mutex_unlock(&mutex);
	}
 
	pthread_exit(0);
}
 
void *consumer() {
	int i;
	for (i = 1; i < MAX; i++) {
		pthread_mutex_lock(&mutex);
		while (buffer == 0) { 
			printf("Consumer waiting\n");
			pthread_cond_wait(&condc, &mutex);
		}
		
		printf("Consumer consuming buffer\n");
		buffer = 0;
		pthread_cond_signal(&condp);
		pthread_mutex_unlock(&mutex);
	}
 
	pthread_exit(0);
}
 
int main() {
	pthread_t con, prod;
	pthread_mutex_init(&mutex, 0);
	pthread_cond_init(&condc, 0);
	pthread_cond_init(&condp, 0);
	pthread_create(&con, 0, consumer, 0);
	pthread_create(&prod, 0, producer, 0);
	
	pthread_join(prod, 0);
	pthread_join(con, 0);
	
	pthread_cond_destroy(&condc);
	pthread_cond_destroy(&condp);
	pthread_mutex_destroy(&mutex);
	return 0;
}

Program in python.

import threading
from random import randint 
 
MAX = 5
 
shared_buffer = []
 
cond = threading.Condition()
 
def producer():
	
	for i in range(1, 20):
		with cond:
	
			while len(shared_buffer) >= MAX:
				print("===> Producer waiting <===")
				cond.wait()
	
			item = randint(0, 100)
			print("Producing item ", item)
 
			shared_buffer.append(item)
			cond.notify()
 
 
 
def consumer():
	for i in range(1, 20):
		with cond:
			while len(shared_buffer) == 0:
				print("===> Consumer waiting <===")
				cond.wait()
 
			item = shared_buffer.pop(0)
	
			print("Consuming item ", item)
	
			cond.notify()
 
 
 
def main():
 
	prod_thread = threading.Thread(target=producer)
	con_thread = threading.Thread(target=consumer)
 
	prod_thread.start()
	con_thread.start()
 
	prod_thread.join()
	con_thread.join()
 
 
main()
 

Here, condition variable is much abstract as it interally controls mutex (with context manager is for that only).