Pipes, queues, and lock in multiprocessing in Python
The previous shot covered the Process and Pool classes. In this shot, we will be learning about Pipes and Queues and Locks.
Pipes and Queues
The multiprocessing module provides two ways to communicate between the process.
Pipes
The Pipe(), by default, returns a pair of connection objects connected by a
Each connection object has send() and recv() methods to send and receive messages.
The following example illustrates how one process produces messages and another process listens to the messages.
import multiprocessingdef process1_send_function(conn, events):for event in events:conn.send(event)print(f"Event Sent: {event}")def process2_recv_function(conn):while True:event = conn.recv()if event == "eod":print("Event Received: End of Day")returnprint(f"Event Received: {event}")def run():events = ["get up", "brush your teeth", "shower", "work", "eod"]conn1, conn2 = multiprocessing.Pipe()process_1 = multiprocessing.Process(target=process1_send_function, args=(conn1, events))process_2 = multiprocessing.Process(target=process2_recv_function, args=(conn2,))process_1.start()process_2.start()process_1.join()process_2.join()if __name__ == "__main__":run()
Queues
The Queue() returns a process shared queue. Think of a queue as a data structure where the producer produces messages and the consumer consumes it. Queues are thread and process safe.
We can modify the above example used for pipes to make use of the queue.
import multiprocessingdef process1_send_function(queue, events):for event in events:queue.put(event)print(f"Event Sent: {event}")def process2_recv_function(queue):while True:event = queue.get()if event == "eod":print("Event Received: End of Day")returnprint(f"Event Received: {event}")def run():events = ["get up", "brush your teeth", "shower", "work", "eod"]queue = multiprocessing.Queue()process_1 = multiprocessing.Process(target=process1_send_function, args=(queue, events))process_2 = multiprocessing.Process(target=process2_recv_function, args=(queue,))process_1.start()process_2.start()process_1.join()process_2.join()if __name__ == "__main__":run()
Locks
Process synchronization makes sure that no two processes execute the same part of the program, called critical section, at the same time.
To achieve this, before executing the critical section, the process has to acquire the lock. Once, the work in the critical section is over, the process has to release the lock.
Take a look at following example.
import multiprocessingdef process_function(lock):lock.acquire()# CRITICAL SECTIONprint("CRITICAL SECTION")print("Only One Process has to access at a given time")lock.release()def run():lock = multiprocessing.Lock()process_1 = multiprocessing.Process(target=process_function, args=(lock,))process_2 = multiprocessing.Process(target=process_function, args=(lock,))process_1.start()process_2.start()process_1.join()process_2.join()if __name__ == "__main__":run()