Python之线程介绍(1)

广告位

Python 中的线程允许程序的不同部分并发运行,并且简化了设计。如果你已经有了一些 Python 编程经验,…

Python 中的线程允许程序的不同部分并发运行,并且简化了设计。如果你已经有了一些 Python 编程经验,并且希望通过使用线程来提高你的程序运行速度,这篇教程会对你有所帮助。

在本教程中,你将学到:

  • 线程是什么
  • 如何创建线程并等待它们结束
  • 如何使用 ThreadPoolExecutor
  • 如何避免竞争条件
  • 如何使用 Python 中 threading 提供的一般工具

本文假定你已经了解 Python 的基础知识,并且至少可以使用 Python 3.6 版本运行示例程序。如果你需要复习,可以从 Python 学习路径快速开始。

如果你不确定是否需要使用 Python 中的线程(threading),异步IO(asyncio)或者多线程(multiprocessing),那么可以参考使用并发为你的 Python 程序提速。

你可以从 Real Python Github 仓库获取本教程中使用的全部源代码。

线程是什么?

一个线程是一个单独的执行流程。这意味着你的程序将会在同一时间发生两件事。但是对于 Python 3 的大部分实现,不同的线程实际上并不是同时执行的:它们只是看起来像同时执行。您好,可以添加Python群,扣扣群:【881228581】,获取学习资源和安装包教程,有大佬指导学习 解答问题等等。

线程很容易被理解为程序上运行的两个(或更多)不同的处理器,每个处理器都同时执行一项独立的任务。这基本上是正确的。线程可能是运行在不同的处理器上的,但是它们同一时间只有一个在运行。

同时运行多个任务需要其他非标准的 Python 实现,如使用不同的语言编写你的代码,或者使用带有额外开销的 multiprocessing。

而由 CPython 实现的 Python 运行方式,线程可能无法加速运行所有的任务。这是由于与 GIL(全局解释器锁) 的交互,在根本上限制了同一时间只有一个 Python 线程在运行。

需要花费大量时间等待外部事件的任务很适合使用线程。而对于需要大量 CPU 计算且只需要很少时间等待外部事件的问题,使用线程可能根本不会提升运行速度。

使用 Python 编写代码,并在 CPython 的标准实现中运行,这是完全可行的。如果你的线程代码部分是 C 编写的,它能够释放 GIL 且并发运行。如果你当前使用的是其它不同的 Python 实现,请查阅文档了解其线程处理方式。

如果你正在运行的是 Python 的标准实现(且代码仅由 Python 编写),并且遇到了 CPU 密集型问题,那么应该考虑替换为 multiprocessing 模块。

使用线程构建程序也可以提高程序的设计透明度。本教程中将要学习的示例都使用了线程,所以它们可能在设计上不需要有较高的运行效率。使用线程有助于使设计更清晰,同时也更容易读懂。

所以,废话少说,现在开始使用线程吧!

开始写一个线程

现在你已经知道了线程是什么,我们来学习一下如何创建一个线程。Python 标准库提供了 threading 包,其中囊括了你在本篇文章中看到的大部分用法。Thread 模块中也很好的封装了线程,并提供了一个简洁的接口来实现功能。

想要启动一个单独的线程,你需要创建一个 Thread 的实例,然后调用 .start():

Python之线程介绍(1)

如果你注意到日志记录语句,可以看到上面主要功能是创建和启动这个线程:

Python之线程介绍(1)

当你创建了一个线程(Thread),会向它传递一个函数以及包含该函数参数的列表。这里,将会告知线程运行 thread_function() 函数并且将 1 作为其参数。

本文中将使用有序整数作为线程的名称。有一个 threading.get_ident() 方法,可以返回每一个线程的唯一名称,但是这个函数名不具有很高的可读性。

thread_function() 本身并没有执行太多功能,它只是夹杂着执行 time.sleep() 并记录了一些日志消息。

如果你按上面的代码,原封不动的执行的话(注释掉第 20 行),会输出像这样的结果:

Python之线程介绍(1)

可以看到,代码中写的是,线程执行到 Main 部分之后才结束。下一节我们将回顾其原因,并讨论一下被注释掉的第 20 行。

守护线程

在计算机科学中,守护(deamon)代表后台运行的进程。

守护线程 deamon 对于 Python 的线程 threading 有着更具体的意义。程序退出时,守护线程会立即关闭。理解这种定义的另一种方式是将守护线程视为在后台运行的线程,而且不用手动去关闭它。您好,可以添加Python群,扣扣群:【881–228–581】,获取学习资源和安装包教程,有大佬指导学习 解答问题等等。

如果程序正在运行的线程并不是守护线程,则程序需要等待这些线程完成之后才能终止。然而,当线程是守护进程时,程序关闭时该线程就会被直接杀死,而不会考虑它运行到了哪里。

我们来仔细看一下上面程序的输出内容,比较有趣的是最后两行。当你运行程序的时候,可以看到,在 __main__打印了 all done 消息之后,所有线程结束之前有一个停顿(大约 2 秒)。

这个停顿是 Python 在等待非守护线程完成。当 Python 程序结束时,清理线程实例也是关闭过程的一部分。

如果你去查看 Python 线程的源码,你可以看到 threading._shutdown() 方法会遍历所有正在运行的线程,并且会在所有没有 deamon 标记的线程上调用 .join() 方法。

所以,当线程本身正处于休眠等待状态时,程序会等待线程完成然后退出。而一旦线程完成并打印了消息,.join() 方法则会返回并退出程序。

通常你希望看到的就是这样,但是有时候我们还希望看到其他的结果。我们首先使用守护线程重写这个程序,你可以通过修改 Thread 的构造方式实现,这里在构造方法中添加一个 deamon=True 标记:

Python之线程介绍(1)

现在运行这个程序,你看到的输出结果应该是这样的:

Python之线程介绍(1)

与之前不同的点在于,这里没有了最后一行的输出内容。thread_function() 没有机会完成运行,它是一个守护线程,所以当 __main__ 运行到代码末尾并且要结束程序时,它被杀掉了。

.join() 一个线程

守护线程用起来很方便,但是当你想要等待一个线程结束又要怎么办呢?想要实现这种行为而不退出程序又要怎么做呢?现在我们回来看看最初的程序,看一下注释掉的第 20 行:

Python之线程介绍(1)

如果需要告诉一个线程去等待另一个线程结束,可以使用 .join() 函数。如果取消注释这一行,main 线程会停下来,并等待线程 x 结束运行。

你是不是已经在代码上使用守护线程或常规线程测试过这种实现?实际上这两种没有太大区别。如果你使用 .join() 加入了一个线程,该语句将会等待这个线程结束。

使用多个线程

目前为止,示例代码中只是用到了两个线程:main 线程和一个用 threading.Thread 对象启动的线程。

通常,你会想要启动许多线程来做点有趣的事情。我们先聚焦于更困难的线程操作方式,然后再转向简单的方法。

相较繁琐的启动多线程的方式其实你已经知道了:

Python之线程介绍(1)

这里的代码使用的启动线程的方式,与你在上面看到的机制相同,即创建了一个 Thread 对象,然后调用 .start() 方法。程序会持有一个 Thread 对象列表,之后使用 .join() 的时候会等待它们完成运行。

多次运行这段代码,产生的结果可能比较有趣。在我的机器上的输出示例是这样的:

Python之线程介绍(1)

如果你仔细查看输出内容,可以看到三个线程都是按照预期的顺序启动的,但是,完成的顺序却恰恰相反!多次运行还会看到不同的排序结果。注意 Thread x: finishing 消息,可以知道每个线程是在何时运行完成的。

线程运行的顺序由操作系统决定,且难以预测。它可能(且很可能)视运行而定,因此在设计使用线程的算法时需要关注这一点。

幸运的是,Python 提供了几个原生的功能,稍后你会看到它们如何协调线程,并与之一起运行。在此之前,我们先看看如何更轻松地管理一组线程。

使用 ThreadPoolExecutor

还有一种比上面你看到的更简单的方式来启动线程,它被称为 ThreadPoolExecutor,在标准库 concurrent.futures 中) (从 Python 3.2开始)的一部分实现。

创建使用 ThreadPoolExecutor 最简单的方法,是将其作为上下文管理器,使用 with 语句来管理线程池的创建与销毁。

这里用 ThreadPoolExecutor 重写之前最后一个示例的 __main__ 部分:

Python之线程介绍(1)

代码中创建了一个 ThreadPoolExecutor 作为上下文管理器,并告诉它池中需要多少个工作线程。然后使用 .map() 来遍历可迭代对象 range(3),并将其元素逐个传给线程中的每一个线程。

with 代码块结束时会通过 ThreadPoolExecutor 在线程池中的每一个线程上执行 .join() 方法。强烈建议你尽可能的将 ThreadPoolExecutor 作为上下文管理器使用,如此,便永远不会忘记执行 .join() 线程操作。

注意:使用 ThreadPoolExecutor 可能会导致未知的错误。

例如,如果调用一个不带参数的函数,但是在使用 .map() 时传入了参数,线程将会抛出异常。

不幸的是,ThreadPoolExecutor 会屏蔽该异常,并且(在上面的情况下)导致程序终止而没有输出。首先调试发现这个问题非常困难。

运行更正后的示例代码,将输出如下内容:

Python之线程介绍(1)

再注意一点,为什么 Thread 1 会在 Thread 0 之前结束呢。线程的调度由操作系统完成,而且并不遵循某种看似理所应当的顺序逻辑。

竞争条件

继续介绍 Python 线程中其它的隐藏功能之前,我们先讨论一下,编写线程程序时遇到的一个更难缠的问题:竞争条件。

如果你已经了解竞争条件是什么,并且遇到过一例这样的问题,你可能需要标准库提供的一些基础功能来防止竞争条件的发生。

当两个或多个线程访问共享数据或资源时,就可能会发生竞争条件。在下面这个例子中,将会创建一个必然发生的竞争条件,但是要注意大多数的竞争条件都并不这么明显。它们通常很少会发生,但每次发生都令人头疼不已。可以想象的到,竞争条件很难调试。

还好,这里代码中的竞争条件每次都必然发生,并且你可以细致地理解其中发生了什么。

对于这个例子,还需要编写一个用于更新数据库的类。好吧,这里并没有一个实际的数据库:只是假装有这么一个数据库,这不是本文的重点。

你的 FakeDatabase 类会有 __init__() 和 .update() 方法:

Python之线程介绍(1)

FakeDatabase 类持有一个简单的数字:.value。它是将会发生竞争条件的共享数据。

__init__() 方法简单的初始化了 .value。目前看来还一切顺利。

.update() 方法看起来则有点奇怪。它模拟从数据库中读取数据,并对其进行一些计算,然后将数据写回数据库。

这种情况下,从数据库读取数据仅仅表示将 .value 复制到局部变量。计算也只是将其数值加一,然后调用 .sleep() 休眠一点时间。最后,再将局部变量数值复制到 .value 写回值。

以下是对 FakeDatabase 的使用:

Python之线程介绍(1)

该程序创建了带有两个线程的 ThreadPoolExecutor 实例,然后在每个线程上调用 .submit() 方法,告诉它们运行 database.update() 方法。

.submit() 方法的函数定义,允许将位置参数和命名参数传递给线程中正在运行的函数:

Python之线程介绍(1)

上面的调用中,index 是传递给 database.update() 的第一个也是唯一一个位置参数。稍后在本文中,你将看到使用类似的方式传递多个参数。

由于每个线程都运行了 .update() 方法,而且 .update() 会将 .value 的值加一。因此,你可能希望在最后打印输出的时候, database.value 的值为 2。但如果是这样的话,你就不会看到这个例子了。运行上面的代码,输出内容如下:

Python之线程介绍(1)

可能你已经预料到会发生这种情况,但是为了方便理解问题的解决方案,我们来看一下这里究竟发生了什么。

一个线程

在你深入研究这两个线程的问题之前,我们先回过头来讨论一下线程工作的细节。

你不一定要深入了解这里所有的细节,因为这对目前水平来说不重要。我们还将对此使用一种技术上来说并不准确的方式,来做一些简化,它让你对当前正在发生的事情有更清晰的认识。

当你通知 ThreadPoolExecutor 运行每一个线程时,还会通知它要运行哪个函数以及向函数传递什么参数:如 executor.submit(database.update, index)。

结果就是线程池中的每一个线程都会调用 database.update(index)。注意,database 是一个 __main__ 线程中创建的 FakeDatabase 对象的引用。在这个对象上调用 .update() 方法会调用该对象的实例方法。

每一个线程都会引用一个相同的 FakeDatabase 对象(database),同时还具有一个唯一值(index),这就使得日志记录语句更具有可读性:

Python之线程介绍(1)

线程开始运行 .update() 时,在函数内部拥有自己的局部数据版本。这里的 .update() 是一个 local_copy。这一点非常棒。不然,两个线程运行相同的函数必定会彼此混淆。这也意味着函数范围内(或局部)的所有变量都是线程安全的。

现在如果你使用单个线程运行上面的程序,并且只调用一次 .update(),你可以看看这里面发生了什么。

如果只运行一个线程,下面的图像展示了 .update() 执行的每一步。执行语句显示在图左侧,后面是一个图表,展示线程的 local_value 变量和共享的 database.value 中的值:

Python之线程介绍(1)

从图表的结构上来看,从上到下移动,时间逐渐推移。自创建 Thread 1 开始,到末尾终止。

Thread 1 启动时,FakeDatabase.value 值为零。函数的第一行代码 local_copy = self.value 将这个零值复制到局部变量。接下来,使用 local_copy += 1 语句增加 local_copy 的值。你可以看到 Thread 1 中的 .value被设置为 1。

之后调用 time.sleep() 方法,使当前线程暂停并允许其它线程运行。由于此示例中只有一个线程,这一步没有效果。

当 Thread 1 唤醒并继续时,它会将新值从 local_copy 复制到 FakeDatabase.value,然后结束这个线程。你可以看到 database.value 设置为 1。

目前为止,一切还很顺利。你运行了一次 .update() 方法,并且 FakeDatabase.value 值增加 1。

两个线程

回到竞争条件的问题,两个线程会并发运行但却不是同时运行的。它们每一个都有自己的 local_copy 版本,并且每个版本都指向了同一个 database 对象实例。正是这个共享的 database 对象导致了这个问题。

程序从 Thread 1 运行 .update() 开始:

Python之线程介绍(1)

当 Thread 1 调用 time.sleep() 方法时,允许其它线程开始运行。从这里,一切就变得很有趣了。

Thread 2 启动后做了上面同样的操作。它还将 database.value 复制到其私有的 local_copy 中,并且此时共享的 database.value 还没有更新:

Python之线程介绍(1)

当 Thread 2 最终进入休眠状态时,共享的 database.value 仍然处于未被修改的零值,并且 local_copy 的两个私有版本的值都是 1。

Thread 1 唤醒后保存了自己的 local_copy 变量,然后结束运行,为之后 Thread 2 的运行提供机会。Thread 2在休眠时不知道 Thread 1 运行了,并且还更新了 database.value。它将自己的 local_copy 变量存储到 database.value,并将其设置为 1:

Python之线程介绍(1)

这两个线程,具有对单一共享对象的交叉访问,从而覆盖了彼此的结果。同样,当一个线程在另一个线程结束访问之前释放共享内存或关闭文件句柄时,可能会出现类似的竞争条件问题。

为何用这样一个牵强的例子

上面的例子勉强地确保了每次调用程序的时都会出现竞争条件。因为操作系统可以随时切换线程,所以可能在执行像 x = x + 1 这样的语句时,在读取 x 的值后,对值增加 1 并写回之前变量之前,发生中断。

有关这种情况的详细信息非常有趣,但对本文的其它部分来说并不需要,所以先可以跳过此后续部分。

实际工作原理

上面的代码并不像你原先想象的那么完美。它设计的目的是在每次运行时强制出现竞争条件,但这种竞争条件比大多数竞争条件都更容易解决。

在考虑竞争条件时,要记住两件事:

1. 即使像 x += 1 这样的操作处理器也需要很多步骤。这些步骤中的每一步对处理器来说都是单独的指令。

2. 操作系统可以随时交换正在运行的线程。在任何一个微小指令之后,都可能会交换一个线程。这意味着在 Python 的一个语句中间都可以让一个线程休眠,然后运行另一个线程。

我们来详细看一下。这个 REPL (交互时解释器)展示了一个函数,它接受一个参数并将其递增:

Python之线程介绍(1)

REPL 的这个示例,使用 Python 标准库中的 dis 模块来展示处理器在执行函数时的每一个微小步骤。它对数据 x 执行 LOAD_FAST,执行 LOAD_CONST 1,然后使用 INPLACE_ADD 将这些值加到一起。

由于特定的原因,我们会先停在这里。这也是上面 .update() 方法中的 time.sleep() 的运行结果,在这里进行强制切换线程。操作系统不可能在每隔一段时间就在一个确切的点切换线程,但是调用 sleep() 则能够实现这一点。

如上所述,操作系统可以随时切换线程。你已经运行到这个列表中标记为 4 的语句,如果操作系统交换了这个线程并运行了另一个线程,且修改了 x 的值,那么当这个线程恢复时,将会以一个错误的值覆盖 x。

从技术上讲,这个例子实际上并没有发生竞争条件,因为 x 是属于 inc() 的内部变量。但它确实说明了在单个 Python 程序执行期间线程是如何中断的。同样,还有 LOAD,MODIFY,STORE 操作集,也可以作用于全局和共享变量。

上面这种竞争条件很难出现,但记住,数百万次迭代中不常发生的事情其实很可能会发生。同样,这些竞争条件的罕见性,也使得它们比常规错误更难调试。

现在回到预先安排的课程!

目前你已经了解了竞争条件,下面来看看如何解决它们!

程序员

关于作者: 程序员

为您推荐