Python - 线程优先级



在 Python 中,目前 **threading** 模块不直接支持线程优先级。与 Java 不同,Python 不支持线程优先级、线程组或某些线程控制机制,例如销毁、停止、挂起、恢复或中断线程。

尽管 Python 线程设计简单,并且松散地基于 Java 的线程 模型。这是由于 Python 的全局解释器锁 (GIL),它管理着 Python 线程。

但是,您可以使用诸如睡眠时长、线程内的自定义调度逻辑或使用管理任务优先级的附加模块等技术来模拟基于优先级的行为。

使用 sleep() 设置线程优先级

您可以通过引入延迟或使用其他机制来控制线程的执行顺序来模拟线程优先级。模拟线程优先级的一种常用方法是调整线程的睡眠时长。

优先级较低的线程睡眠时间较长,而优先级较高的线程睡眠时间较短。

示例

以下是一个简单的示例,演示如何在 Python 线程中使用延迟来自定义线程优先级。在此示例中,Thread-2 在 Thread-1 之前完成,因为它具有较低的优先级值,导致睡眠时间较短。

import threading
import time

class DummyThread(threading.Thread):
   def __init__(self, name, priority):
      threading.Thread.__init__(self)
      self.name = name
      self.priority = priority

   def run(self):
      name = self.name
      time.sleep(1.0 * self.priority)
      print(f"{name} thread with priority {self.priority} is running")

# Creating threads with different priorities
t1 = DummyThread(name='Thread-1', priority=4)
t2 = DummyThread(name='Thread-2', priority=1)

# Starting the threads
t1.start()
t2.start()

# Waiting for both threads to complete
t1.join()
t2.join()

print('All Threads are executed')

输出

执行上述程序后,您将获得以下结果:

Thread-2 thread with priority 1 is running
Thread-1 thread with priority 4 is running
All Threads are executed

在 Windows 上调整 Python 线程优先级

在 Windows 操作系统上,您可以使用 **ctypes** 模块来操作线程优先级,这是 Python 的标准模块之一,用于与 Windows API 交互。

示例

此示例演示了如何在 Windows 系统上使用 **ctypes** 模块手动设置 Python 线程的优先级。

import threading
import ctypes
import time

# Constants for Windows API
w32 = ctypes.windll.kernel32
SET_THREAD = 0x20
PRIORITIZE_THE_THREAD = 1

class MyThread(threading.Thread):
   def __init__(self, start_event, name, iterations):
      super().__init__()
      self.start_event = start_event
      self.thread_id = None
      self.iterations = iterations
      self.name = name

   def set_priority(self, priority):
      if not self.is_alive():
         print('Cannot set priority for a non-active thread')
         return

      thread_handle = w32.OpenThread(SET_THREAD, False, self.thread_id)
      success = w32.SetThreadPriority(thread_handle, priority)
      w32.CloseHandle(thread_handle)
      if not success:
         print('Failed to set thread priority:', w32.GetLastError())

   def run(self):
      self.thread_id = w32.GetCurrentThreadId()
      self.start_event.wait()
      while self.iterations:
         print(f"{self.name} running")
         start_time = time.time()
         while time.time() - start_time < 1:
            pass
         self.iterations -= 1

# Create an event to synchronize thread start
start_event = threading.Event()

# Create threads
thread_normal = MyThread(start_event, name='normal', iterations=4)
thread_high = MyThread(start_event, name='high', iterations=4)

# Start the threads
thread_normal.start()
thread_high.start()

# Adjusting priority of 'high' thread
thread_high.set_priority(PRIORITIZE_THE_THREAD)

# Trigger thread execution
start_event.set()

输出

在 Python 解释器中执行此代码时,您将获得以下结果:

high running
normal running
high running
normal running
high running
normal running
high running
normal running

使用 Queue 模块优先处理 Python 线程

Python 标准库中的 **queue** 模块在多线程编程中非常有用,此时必须在线程之间安全地交换信息。此模块中的 Priority Queue 类实现了所有必需的锁定语义。

使用优先级队列,条目保持排序(使用 **heapq** 模块),并且首先检索值最低的条目。

Queue 对象具有以下方法来控制队列:

  • **get()** - get() 从队列中移除并返回一个项目。

  • **put()** - put() 将项目添加到队列中。

  • **qsize()** - qsize() 返回当前在队列中的项目数量。

  • **empty()** - empty() 如果队列为空,则返回 True;否则返回 False。

  • **full()** - full() 如果队列已满,则返回 True;否则返回 False。

queue.PriorityQueue(maxsize=0)

这是优先级队列的构造函数。maxsize 是一个整数,它设置可以放入队列中的项目的数量上限。如果 maxsize 小于或等于零,则队列大小无限。

值最低的条目首先被检索(值最低的条目是 min(entries) 将返回的条目)。条目的典型模式是以下形式的元组:

(priority_number, data)

示例

此示例演示了如何在queue模块中使用PriorityQueue类来管理两个线程之间的任务优先级。

from time import sleep
from random import random, randint
from threading import Thread
from queue import PriorityQueue

queue = PriorityQueue()

def producer(queue):
   print('Producer: Running')
   for i in range(5):

      # create item with priority
      value = random()
      priority = randint(0, 5)
      item = (priority, value)
      queue.put(item)
   # wait for all items to be processed
   queue.join()

   queue.put(None)
   print('Producer: Done')

def consumer(queue):
   print('Consumer: Running')

   while True:

      # get a unit of work
      item = queue.get()
      if item is None:
         break

      sleep(item[1])
      print(item)
      queue.task_done()
   print('Consumer: Done')

producer = Thread(target=producer, args=(queue,))
producer.start()

consumer = Thread(target=consumer, args=(queue,))
consumer.start()

producer.join()
consumer.join()

输出

执行后,它将产生以下输出:

Producer: Running
Consumer: Running
(0, 0.15332707626852804)
(2, 0.4730737391435892)
(2, 0.8679231358257962)
(3, 0.051924220435665025)
(4, 0.23945882716108446)
Producer: Done
Consumer: Done
广告