Python多线程和锁
# 进程和线程
进程是执行中的计算机程序。每个进程都拥有自己的地址空间、内存、数据栈及其它的辅助数据。操作系统管理着所有的进程,并为这些进程合理分配时间。进程可以通过派生新的进程来执行其它任务,不过每个进程都拥有自己的内存和数据栈等,进程之间的数据交换采用 进程间通信(IPC) 方式。
线程在进程之下执行,一个进程下可以运行多个线程,它们之间共享相同上下文。线程包括开始、执行顺序和结束三部分。它有一个指针,用于记录当前运行的上下文。当其它线程执行时,它可以被抢占(中断)和临时挂起(也称睡眠) ——这种做法叫做 让步(yielding)。
一个进程中的各个线程与主进程共享同一片数据空间,与独立进程相比,线程之间信息共享和通信更加容易。线程一般以并发执行,正是由于这种并发和数据共享机制,使多任务间的协作成为可能。当然,这种共享也并不是没有风险的,如果多个线程访问同一数据空间,由于访问顺序不同,可能导致结果不一致,这种情况通常称为竞态条件(race condition),不过大多数线程库都有同步原语,以允许线程管理器的控制执行和访问;另一个要注意的问题是,线程无法给予公平执行时间,CPU 时间分配会倾向那些阻塞更少的函数。
# 全局解释器锁(GIL)
Python 代码执行由 Python 虚拟机 (又名解释器主循环) 进行控制。Python 在设计时是这样考虑的,在主循环中同时只能有一个控制线程在执行。对 Python 虚拟机的访问由 全局解释器(GIL) 控制,这个锁用于,当有多个线程时保证同一时刻只能有一个线程在运行。
由于 Python 的 GIL 的限制,多线程更适合 I/O 密集型应用( I/O 释放了 GIL,可以允许更多的并发),对于计算密集型应用,为了实现更好的并行性,适合使用多进程,已便利用 CPU 的多核优势。Python 的多进程相关模块:subprocess、multiprocessing、concurrent.futures
# threading 模块
threading 是 Python 高级别的多线程模块。
# threading 模块的函数
active_count() 当前活动的 Thread 对象个数
current_thread() 返回当前 Thread 对象
get_ident() 返回当前线程
enumerater() 返回当前活动 Thread 对象列表
main_thread() 返回主 Thread 对象
settrace(func) 为所有线程设置一个 trace 函数
setprofile(func) 为所有线程设置一个 profile 函数
stack_size([size]) 返回新创建线程栈大小;或为后续创建的线程设定栈大小为 size
TIMEOUT_MAX Lock.acquire() , RLock.acquire() , Condition.wait() 允许的最大值
# threading 可用对象列表
Thread 表示执行线程的对象
Lock 锁原语对象
RLock 可重入锁对象,使单一进程再次获得已持有的锁(递归锁)
Condition 条件变量对象,使得一个线程等待另一个线程满足特定条件,比如改变状态或某个值
Semaphore 为线程间共享的有限资源提供一个"计数器",如果没有可用资源会被阻塞
Event 条件变量的通用版本,任意数量的线程等待某个时间的发生,在改事件发生后所有线程被激活
Timer 与 Thread 相识,不过它要在运行前等待一段时间
Barrier 创建一个"阻碍",必须达到指定数量的线程后才可以继续
# Thread 类
# Thread 对象的属性
- Thread.name
- Thread.ident
- Thread.daemon
详见(The Python Standard Library)
# Thread 对象方法
- Thread.start()
- Thread.run()
- Thread.join(timeout=None)
- Thread.getName
- Thread.setName
- Thread.is_alive()
- Thread.isDaemon()
- Thread.setDaemon()
详见(The Python Standard Library)
# 使用 Thread 类
很多种方法来创建线程,这里使用常见的两种:
- 创建 Thread 实例,传给它一个函数。
- 派生 Thread 子类,并创建子类的实例。
# 一个单线程栗子
#!/usr/bin/env python3
import threading
from random import randint
from time import sleep, ctime
def hi(n):
sleep(n)
print("ZzZzzz, sleep: ", n) # 打印 Sleep 的秒数
def main():
print("### Start at: ", ctime())
for i in range(10):
hi(randint(1,2)) # 调用十次,每次 Sleep 1秒或2秒
print("### Done at: ", ctime())
if __name__ == '__main__':
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 运行结果
### Start at: Thu Sep 1 14:11:00 2016
ZzZzzz, sleep: 1
ZzZzzz, sleep: 2
ZzZzzz, sleep: 2
ZzZzzz, sleep: 2
ZzZzzz, sleep: 1
ZzZzzz, sleep: 1
ZzZzzz, sleep: 1
ZzZzzz, sleep: 1
ZzZzzz, sleep: 1
ZzZzzz, sleep: 2
### Done at: Thu Sep 1 14:11:14 2016
2
3
4
5
6
7
8
9
10
11
12
一共是用了14秒。
# 多线程:创建 Thread 实例
# 传给它一个函数栗子
#!/usr/bin/env python3
import threading
from random import randint
from time import sleep, ctime
def hi(n):
sleep(n)
print("ZzZzzz, sleep: ", n)
def main():
print("### Start at: ", ctime())
threads = []
for i in range(10):
rands = randint(1,2)
# 实例化每个 Thread 对象,把函数和参数传递进去,返回 Thread 实例
t = threading.Thread(target=hi, args=(rands,))
threads.append(t) # 分配线程
for i in range(10):
threads[i].start() # 开始执行多线程
for i in range(10):
threads[i].join() # (自旋锁)等待线程结束或超时,然后再往下执行
print("### Done at: ", ctime())
if __name__ == '__main__':
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 运行结果
### Start at: Thu Sep 1 14:18:00 2016
ZzZzzz, sleep: 1
ZzZzzz, sleep: 1
ZzZzzz, sleep: 1
ZzZzzz, sleep: 1
ZzZzzz, sleep: 1
ZzZzzz, sleep: 1
ZzZzzz, sleep: 2
ZzZzzz, sleep: 2
ZzZzzz, sleep: 2
ZzZzzz, sleep: 2
### Done at: Thu Sep 1 14:18:02 2016
2
3
4
5
6
7
8
9
10
11
12
使用多线程,只用了2秒。
# 多线程:派生 Thread 子类
# 并创建子类的实例栗子
#!/usr/bin/env python3
import threading
from random import randint
from time import sleep, ctime
class MyThread(threading.Thread):
def __init__(self, func, args, times):
super(MyThread, self).__init__()
self.func = func
self.args = args
self.times = times
def run(self):
print("begin thread......", self.times)
self.res = self.func(*self.args)
print("end threads......", self.times)
def hi(n):
sleep(n)
print("ZzZzzz, sleep: ", n)
def main():
print("### Start at: ", ctime())
threads = []
for i in range(10):
rands = randint(1,2)
t = MyThread(hi, (rands,), i+1)
threads.append(t)
for i in range(10):
threads[i].start()
for i in range(10):
threads[i].join()
print("### Done at: ", ctime())
if __name__ == '__main__':
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 执行结果
### Start at: Thu Sep 1 14:47:09 2016
begin thread...... 1
begin thread...... 2
begin thread...... 3
begin thread...... 4
begin thread...... 5
begin thread...... 6
begin thread...... 7
begin thread...... 8
begin thread...... 9
begin thread...... 10
ZzZzzz, sleep: 1
ZzZzzz, sleep: 1
end threads...... 1
end threads...... 4
ZzZzzz, sleep: 1
end threads...... 7
ZzZzzz, sleep: 1
end threads...... 3
ZzZzzz, sleep: 1
end threads...... 9
ZzZzzz, sleep: 2
end threads...... 2
ZzZzzz, sleep: 2
end threads...... 5
ZzZzzz, sleep: 2
ZzZzzz, sleep: 2
end threads...... 10
end threads...... 6
ZzZzzz, sleep: 2
end threads...... 8
### Done at: Thu Sep 1 14:47:11 2016
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
这个栗子对 Thread 子类化,而不是对其实例化,使得定制线程对象更具灵活性,同时也简化线程创建的调用过程。
# 线程锁
当多线程争夺锁时,允许第一个获得锁的线程进入临街区,并执行代码。所有之后到达的线程将被阻塞,直到第一个线程执行结束,退出临街区,并释放锁。需要注意,那些阻塞的线程是没有顺序的。
# 举个栗子
#!/usr/bin/env python3
import threading
from random import randint
from time import sleep, ctime
L = threading.Lock() # 引入锁
def hi(n):
L.acquire() # 加锁
for i in [1,2]:
print(i)
sleep(n)
print("ZzZzzz, sleep: ", n)
L.release() # 释放锁
def main():
print("### Start at: ", ctime())
threads = []
for i in range(10):
rands = randint(1,2)
t = threading.Thread(target=hi, args=(rands,))
threads.append(t)
for i in range(10):
threads[i].start()
for i in range(10):
threads[i].join()
print("### Done at: ", ctime())
if __name__ == '__main__':
main()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
运行上面的代码,再将锁的代码注释掉,对比下输出。