Ruby - 多线程



传统的程序只有一个执行线程,程序包含的语句或指令按顺序执行,直到程序终止。

多线程程序具有多个执行线程。在每个线程中,语句按顺序执行,但线程本身可以在多核 CPU 上并行执行,例如。通常在单 CPU 机器上,多个线程并非真正并行执行,而是通过交错执行线程来模拟并行性。

Ruby 使用Thread类简化了多线程程序的编写。Ruby 线程是一种轻量级且高效的方式,可在代码中实现并发。

创建 Ruby 线程

要启动一个新线程,只需将一个代码块与Thread.new调用关联。将创建一个新线程来执行代码块中的代码,而原始线程将立即从Thread.new返回并继续执行下一条语句。

# Thread #1 is running here
Thread.new {
   # Thread #2 runs this code
}
# Thread #1 runs this code

示例

这是一个示例,展示了如何使用多线程 Ruby 程序。

#!/usr/bin/ruby

def func1
   i = 0
   while i<=2
      puts "func1 at: #{Time.now}"
      sleep(2)
      i = i+1
   end
end

def func2
   j = 0
   while j<=2
      puts "func2 at: #{Time.now}"
      sleep(1)
      j = j+1
   end
end

puts "Started At #{Time.now}"
t1 = Thread.new{func1()}
t2 = Thread.new{func2()}
t1.join
t2.join
puts "End at #{Time.now}"

这将产生以下结果:

Started At Wed May 14 08:21:54 -0700 2008
func1 at: Wed May 14 08:21:54 -0700 2008
func2 at: Wed May 14 08:21:54 -0700 2008
func2 at: Wed May 14 08:21:55 -0700 2008
func1 at: Wed May 14 08:21:56 -0700 2008
func2 at: Wed May 14 08:21:56 -0700 2008
func1 at: Wed May 14 08:21:58 -0700 2008
End at Wed May 14 08:22:00 -0700 2008

线程生命周期

使用Thread.new创建新线程。您还可以使用同义词Thread.startThread.fork

创建线程后无需启动它,当 CPU 资源可用时,它会自动开始运行。

Thread 类定义了许多方法来查询和操作正在运行的线程。线程运行与Thread.new调用关联的代码块中的代码,然后停止运行。

该代码块中最后一个表达式的值就是线程的值,可以通过调用 Thread 对象的value方法获得。如果线程已运行完成,则value会立即返回线程的值。否则,value方法会阻塞,直到线程完成。

类方法Thread.current返回表示当前线程的 Thread 对象。这允许线程操作自身。类方法Thread.main返回表示主线程的 Thread 对象。这是在启动 Ruby 程序时开始的初始执行线程。

您可以通过调用该线程的Thread.join方法来等待特定线程完成。调用线程将阻塞,直到给定线程完成。

线程和异常

如果在主线程中引发异常,并且未在任何地方处理,则 Ruby 解释器会打印一条消息并退出。在除主线程之外的线程中,未处理的异常会导致线程停止运行。

如果线程t由于未处理的异常而退出,而另一个线程s调用t.join或t.value,则在t中发生的异常将在线程s中引发。

如果Thread.abort_on_exceptionfalse(默认条件),未处理的异常只会终止当前线程,其余线程将继续运行。

如果您希望任何线程中的任何未处理异常都导致解释器退出,请将类方法Thread.abort_on_exception设置为true

t = Thread.new { ... }
t.abort_on_exception = true

线程变量

线程通常可以访问创建线程时作用域内的任何变量。线程代码块中的局部变量是线程局部变量,不会共享。

Thread 类提供了一种特殊机制,允许通过名称创建和访问线程局部变量。您只需将线程对象视为哈希表,使用[]=写入元素,使用[]读取元素。

在此示例中,每个线程都将变量count的当前值记录在键为mycount的线程局部变量中。

#!/usr/bin/ruby

count = 0
arr = []

10.times do |i|
   arr[i] = Thread.new {
      sleep(rand(0)/10.0)
      Thread.current["mycount"] = count
      count += 1
   }
end

arr.each {|t| t.join; print t["mycount"], ", " }
puts "count = #{count}"

这将产生以下结果:

8, 0, 3, 7, 2, 1, 6, 5, 4, 9, count = 10

主线程等待子线程完成,然后打印出每个子线程捕获的count值。

线程优先级

影响线程调度的第一个因素是线程优先级:高优先级线程优先于低优先级线程调度。更准确地说,只有在没有更高优先级的线程等待运行时,线程才能获得 CPU 时间。

您可以使用priority =priority设置和查询 Ruby Thread 对象的优先级。新创建的线程以创建它的线程相同的优先级启动。主线程的优先级从 0 开始。

无法在线程开始运行之前设置其优先级。但是,线程可以将其自身优先级作为其执行的第一个操作进行提高或降低。

线程互斥

如果两个线程共享对相同数据的访问,并且至少一个线程修改该数据,则必须特别注意确保任何线程都无法看到不一致状态的数据。这称为线程互斥

Mutex是一个实现简单信号量锁的类,用于对某些共享资源进行互斥访问。也就是说,在给定时间内,只有一个线程可以持有锁。其他线程可以选择等待锁可用,或者可以选择立即获得错误,指示锁不可用。

通过将对共享数据的全部访问置于mutex的控制之下,我们确保了一致性和原子操作。让我们尝试两个示例,第一个没有 mutax,第二个有 mutax:

无 Mutex 示例

#!/usr/bin/ruby
require 'thread'

count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      count1 += 1
      count2 += 1
   end
end
spy = Thread.new do
   loop do
      difference += (count1 - count2).abs
   end
end
sleep 1
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

这将产生以下结果:

count1 :  1583766
count2 :  1583766
difference : 0
#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new

count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      mutex.synchronize do
         count1 += 1
         count2 += 1
      end
   end
end
spy = Thread.new do
   loop do
      mutex.synchronize do
         difference += (count1 - count2).abs
      end
   end
end
sleep 1
mutex.lock
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

这将产生以下结果:

count1 :  696591
count2 :  696591
difference : 0

处理死锁

当我们开始使用Mutex对象进行线程互斥时,必须小心避免死锁。死锁是指所有线程都在等待获取另一个线程持有的资源的情况。因为所有线程都被阻塞,所以它们无法释放它们持有的锁。并且因为它们无法释放锁,所以没有其他线程可以获取这些锁。

这就是条件变量发挥作用的地方。条件变量只是一个与资源关联的信号量,用于在特定mutex的保护下使用。当您需要一个不可用的资源时,您会在条件变量上等待。该操作会释放相应mutex上的锁。当其他线程发出资源可用的信号时,原始线程将停止等待并同时重新获得关键区域的锁。

示例

#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new

cv = ConditionVariable.new
a = Thread.new {
   mutex.synchronize {
      puts "A: I have critical section, but will wait for cv"
      cv.wait(mutex)
      puts "A: I have critical section again! I rule!"
   }
}

puts "(Later, back at the ranch...)"

b = Thread.new {
   mutex.synchronize {
      puts "B: Now I am critical, but am done with cv"
      cv.signal
      puts "B: I am still critical, finishing up"
   }
}
a.join
b.join

这将产生以下结果:

A: I have critical section, but will wait for cv
(Later, back at the ranch...)
B: Now I am critical, but am done with cv
B: I am still critical, finishing up
A: I have critical section again! I rule!

线程状态

有五个可能的返回值,对应于下表所示的五个可能状态。status方法返回线程的状态。

线程状态 返回值
可运行 run
睡眠 睡眠
中止 aborting
正常终止 false
异常终止 nil

Thread 类方法

Thread类提供了以下方法,它们适用于程序中所有可用的线程。这些方法将使用Thread类名如下调用:

Thread.abort_on_exception = true

线程实例方法

这些方法适用于线程实例。这些方法将使用方法如下所示的Thread实例进行调用:

#!/usr/bin/ruby

thr = Thread.new do   # Calling a class method new
   puts "In second thread"
   raise "Raise exception"
end
thr.join   # Calling an instance method join
广告