Wednesday, 27 December 2023

Cleanly terminating threads nested in processes in python

The following code cleanly terminates threads nested inside processes in python. Processes are started using a ProcessPoolExecutor. To shutdown the pool cleanly, threads must be signaled to terminate via a threading Event. The SIGINT signal is captured for the parent and all child processes so that a KeyboardInterrupt exception is not thrown which would lead to threads or processes terminating in an unclean state after the user pressing ctrl + c and raising the SIGINT signal. All futures of processes containing threads are waited on to terminate using wait. Finally, the process pool can be shutdown clearnly using shutdown, now that all tasks submitted to the pool are complete.

import concurrent.futures
import multiprocessing
import os
import signal
import threading
import time

terminate = threading.Event()

def sigint(sig, frame):
    print(f"SIGINT received by {os.getpid()} -> setting terminate")
    terminate.set()

def thread_worker():
    tid = threading.get_ident()
    pid = os.getpid()
    while not terminate.is_set():
        print(f"sleeping in thread {tid} in process {pid}")
        time.sleep(1)
    print(f"{thread_worker} in thread {tid} in process {pid} finished cleanly")

def process_worker():
    signal.signal(signal.SIGINT, sigint)
    t = threading.Thread(target=thread_worker, daemon=True)
    t.start()
    t.join()
    print(f"{process_worker} in process {os.getpid()} finished cleanly")

if __name__ == "__main__":
    signal.signal(signal.SIGINT, sigint)
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as pool:
        futures = [pool.submit(process_worker) for _ in range(3)]
    print(futures)
    concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED)
    pool.shutdown()
    print("parent exited cleanly")

This produces the following output when run and ctrl + c pressed after the second time the threads print to the console.

$ python3 pool.py
sleeping in thread 6155104256 in process 1788
sleeping in thread 6185136128 in process 1789
sleeping in thread 6155104256 in process 1788
sleeping in thread 6185136128 in process 1789
^CSIGINT received by 1789 -> setting terminate
SIGINT received by 1788 -> setting terminate
SIGINT received by 1786 -> setting terminate
<function thread_worker at 0x1025d0790> in thread 6155104256 in process 1788 finished cleanly
<function thread_worker at 0x10092c790> in thread 6185136128 in process 1789 finished cleanly
<function process_worker at 0x1025d0820> in process 1788 finished cleanly
<function process_worker at 0x10092c820> in process 1789 finished cleanly
<function thread_worker at 0x1025d0790> in thread 6155104256 in process 1788 finished cleanly
<function process_worker at 0x1025d0820> in process 1788 finished cleanly
[<Future at 0x10078ea60 state=finished returned NoneType>, <Future at 0x1007a85b0 state=finished returned NoneType>, <Future at 0x1007a8a00 state=finished returned NoneType>]
parent exited cleanly
$

No comments:

Post a Comment