python多线程中,主线程中如果捕获子线程的异常,笔者查阅了相关资料,有一种方式是使用队列(queue)将子线程的异常写入队列,然后主进程中去遍历异常消息队列,这种方式不近需要额外引入一个q对象,要同时遍历队列和判断线程状态,实现上上非常丑陋,后来发现如下方式,通过继承threading.Thread后,重写run和join方法,优雅地实现了线程方法的异常“上抛”,可以在主线程中轻松捕获子线程的异常信息。
以下是译文:
https://www.geeksforgeeks.org/handling-a-threads-exception-in-the-caller-thread-in-python/
Python中的多线程可以通过使用threading库来实现。为了调用线程,调用者线程创建一个线程对象并在其上调用 start 方法。一旦调用了 join 方法,就会启动它的执行并执行类对象的 run 方法。
对于异常处理,使用 try-except 块来捕获在 try 块中引发的异常,并在 except 块中进行相应处理
比如:
# Importing the modules import threading import sys # Custom Thread Class class MyThread(threading.Thread): def someFunction(self): print("Hello World") def run(self): self.someFunction() def join(self): threading.Thread.join(self) # Driver function def main(): t = MyThread() t.start() t.join() # Driver code if __name__ == '__main__': main()
#输出:Hello World
为了在调用者线程中捕获和处理线程的异常,我们使用一个变量来存储被调用线程中引发的异常(如果有),当被调用线程被连接时,连接函数检查 exc 的值是否为 None,
如果是则不生成异常,否则再次引发存储在 exc 中的生成异常。这发生在调用者线程中,因此可以在调用者线程本身中处理。
示例:
该示例创建了一个 MyThread 类型的线程 t,该线程的 run() 方法调用了 someFunction() 方法,该方法引发了 MyException,因此每当线程运行时,它都会引发异常。
为了在调用者线程中捕获异常,我们维护了一个单独的变量 exc,它设置为当被调用线程引发异常时引发的异常。
最后在 join() 方法中检查此 exc,如果不是 None,则 join 只会引发相同的异常。因此,在调用者线程中引发了捕获的异常,因为 join 在调用者线程(这里是主线程)中返回,因此被相应地处理。
# Importing the modules import threading import sys # Custom Exception Class class MyException(Exception): pass # Custom Thread Class class MyThread(threading.Thread): # Function that raises the custom exception def someFunction(self): name = threading.current_thread().name raise MyException("An error in thread "+ name) def run(self): # Variable that stores the exception, if raised by someFunction self.exc = None try: self.someFunction() except BaseException as e: self.exc = e def join(self): threading.Thread.join(self) # Since join() returns in caller thread # we re-raise the caught exception # if any was caught if self.exc: raise self.exc # Driver function def main(): # Create a new Thread t # Here Main is the caller thread t = MyThread() t.start() # Exception handled in Caller thread try: t.join() except Exception as e: print("Exception Handled in Main, Details of the Exception:", str(e)) # Driver code if __name__ == '__main__': main()
#输出:Exception Handled in Main, Details of the Exception: An error in thread Thread-1
译者补充:
补充1:理解重新run和join方法的目的
按照常规的方式,通过继承threading.Thread重写run方法不作任何异常处理,如下:执行该代码,主线程中无法捕获子线程的异常,也即主线程中没有任何反应。
由此可以体会到重写run和join方法的作用。
# Importing the modules import threading import sys # Custom Exception Class class MyException(Exception): pass # Custom Thread Class class MyThread(threading.Thread): # Function that raises the custom exception def someFunction(self): name = threading.current_thread().name raise MyException("An error in thread " + name) def run(self): self.someFunction() # Driver function def main(): # Create a new Thread t # Here Main is the caller thread t = MyThread() t.start() # Exception handled in Caller thread try: t.join() except Exception as e: print("Exception Handled in Main, Details of the Exception:",str(e))
) # Driver code if __name__ == '__main__': main()
补充2:上述case只有一个线程,如果开启了多个线程,且不希望某个线程异常之后打断主线程,该怎么处理?此时需要在线程join的时候依次捕获器异常,而不是在线程join的时候捕获异常。
# Importing the modules import concurrent import threading import sys # Custom Exception Class import time from concurrent.futures import ThreadPoolExecutor class MyException(Exception): pass # Custom Thread Class class MyThread(threading.Thread): # Function that raises the custom exception def someFunction(self): name = threading.current_thread().name raise MyException("An error in thread " + name) def run(self): self.someFunction() # Driver function def main(): # Create a new Thread t # Here Main is the caller thread t1 = MyThread() t2 = MyThread() list_thread = [t1,t2] for t in list_thread: t.start() # Exception handled in Caller thread for t in list_thread: try: t.join() except Exception as e: print("Exception Handled in Main, Details of the Exception:",str(e))
) # Driver code if __name__ == '__main__': main()
补充3:如果使用线程池的模式开启多线程,不需要重写线程类,同时也就也不需要重写run和join方法,只需要遍历ThreadPoolExecutor的submit方法返回的futuer对象即可。可见ThreadPoolExecutor对象替我们做了很多工作。
# Importing the modules import concurrent import threading import sys # Custom Exception Class import time from concurrent.futures import ThreadPoolExecutor class MyException(Exception): pass # Custom Thread Class class MyThread(): # Function that raises the custom exception def someFunction(self): time.sleep(1) name = threading.current_thread().name raise MyException("An error in thread " + name) # Driver function def main(): # Create a new Thread t # Here Main is the caller thread t1 = MyThread() t2 = MyThread() list_thread = [t1,t2] list_task = [] ''' for t in list_thread: t.start() # Exception handled in Caller thread for t in list_thread: try: t.join() except Exception as e: print("Exception Handled in Main, Details of the Exception:", str(e)) ''' with ThreadPoolExecutor(max_workers=3) as executor: for obj in list_thread: future = executor.submit(obj.someFunction) list_task.append(future) for future in list_task: try: future.result() except Exception as e: print("Exception Handled in Main, Details of the Exception:", str(e)) # Driver code if __name__ == '__main__': main()
补充4:如何让子线程在异常的时候打断,或者不打断主线程?
其实上述代码中已经写明了,在异常捕获的中,如果想要子线程不打断主线程,需要在单个子线程join的时候进行异常处理,这种情况下单个子线程正常与否,不影响主线程。、
如果try except在线程join的外层(线程池中的list_task外层),那么任何一个线程的异常将中断主线程
for t in list_thread: try: t.join() except Exception as e: print("Exception Handled in Main, Details of the Exception:", str(e)) for future in list_task: try: future.result() except Exception as e: print("Exception Handled in Main, Details of the Exception:", str(e))