Python 如何優雅地限制執行緒數量

當有一段工作需要平行化的時候,通常我們會很直覺的使用 threading.Thread 來做
但取決於各位機器的條件,有時候並不能無限制地狂開
因為只會提高工作出錯的機率
這時候就需要限制 thread 數量

這篇文章分享了幾種簡單又優雅地限制執行緒數量的方式

Producer & Consumer

第一種是使用 生產者消費者問題 的概念實作的

實作範例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading, time, random
from queue import Queue

jobs = Queue()

def do_stuff(q):
while not q.empty():
value = q.get()
time.sleep(random.randint(1, 10))
print(value)
q.task_done()

for i in range(10):
jobs.put(i)

for i in range(3):
worker = threading.Thread(target=do_stuff, args=(jobs,))
worker.start()

print("waiting for queue to complete", jobs.qsize(), "tasks")
jobs.join()
print("all done")

Semaphore

第二種是採用 號誌 的概念
Python 當然也有支援,詳細可參考 Semaphore Objects

實作範例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import threading
import time
import random


thread_semaphore = threading.Semaphore(2)

def count_func(index):

thread_semaphore.acquire()
try:
print(f'Hi, I am NO {index}!')
time.sleep(random.uniform(1.5, 3))
finally:
print(f'Hi, I am NO {index}! Done!')
thread_semaphore.release()

threads = []
for i in range(5):
t = threading.Thread(target=count_func, args=(i,))
t.start()

threads.append(t)

for t in threads:
t.join()

threading.active_count

最後一種就是 threading 內建的 active_count
但這個會把其他不一樣的 thread 也計算在一起
所以較不適用相對複雜的應用情境

實作範例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import threading
import time
import random

max_thread = 2


def do_stuff(index):
print(f'Hi, I am NO {index}!')
time.sleep(random.uniform(1.5, 3))
print(f'Hi, I am NO {index}! Done!')


threads = []
for i in range(5):
while threading.active_count() > max_thread:
time.sleep(0.1)

t = threading.Thread(target=do_stuff, args=(i,))
t.start()

threads.append(t)

for t in threads:
t.join()

從此就過著幸福快樂的開發生活了

怎麼可能

也許你也會想看看