Erlang - 并发



Erlang 中的并发编程需要遵循以下基本原则或流程。

列表包含以下原则:

piD = spawn(Fun)

创建一个新的并发进程来评估 Fun。新进程与调用者并行运行。示例如下:

示例

-module(helloworld). 
-export([start/0]). 

start() ->
   spawn(fun() -> server("Hello") end). 

server(Message) ->
   io:fwrite("~p",[Message]).

上述程序的输出为:

输出

“Hello”

Pid ! Message

将消息发送到标识符为 Pid 的进程。消息发送是异步的。发送者不会等待,而是继续执行其正在执行的操作。‘!’ 称为发送运算符。

示例如下:

示例

-module(helloworld). 
-export([start/0]). 
start() -> 
   Pid = spawn(fun() -> server("Hello") end), 
   Pid ! {hello}. 

server(Message) ->
   io:fwrite("~p",[Message]).

Receive…end

接收已发送到进程的消息。它具有以下语法:

语法

receive
Pattern1 [when Guard1] ->
Expressions1;
Pattern2 [when Guard2] ->
Expressions2;
...
End

当消息到达进程时,系统尝试将其与 Pattern1(可能带有保护条件 Guard1)进行匹配;如果成功,则评估 Expressions1。如果第一个模式不匹配,则尝试 Pattern2,依此类推。如果没有任何模式匹配,则该消息将保存以供以后处理,并且进程将等待下一条消息。

包含所有 3 个命令的整个过程示例如下程序所示。

示例

-module(helloworld). 
-export([loop/0,start/0]). 

loop() ->
   receive 
      {rectangle, Width, Ht} -> 
         io:fwrite("Area of rectangle is ~p~n" ,[Width * Ht]), 
         loop(); 
      {circle, R} ->
      io:fwrite("Area of circle is ~p~n" , [3.14159 * R * R]), 
      loop(); 
   Other ->
      io:fwrite("Unknown"), 
      loop() 
   end. 

start() ->
   Pid = spawn(fun() -> loop() end), 
   Pid ! {rectangle, 6, 10}.

关于上述程序,需要注意以下几点:

  • loop 函数具有 receive end 循环。因此,当发送消息时,它将由 receive end 循环处理。

  • 产生一个新进程,该进程转到 loop 函数。

  • 通过 Pid ! message 命令将消息发送到生成的进程。

上述程序的输出为:

输出

Area of the Rectangle is 60

最大进程数

在并发中,确定系统允许的最大进程数非常重要。然后,您应该能够了解系统上可以并发执行多少进程。

让我们来看一个如何确定系统上可以执行的最大进程数的示例。

-module(helloworld). 
-export([max/1,start/0]). 

max(N) -> 
   Max = erlang:system_info(process_limit), 
   io:format("Maximum allowed processes:~p~n" ,[Max]), 
   
   statistics(runtime), 
   statistics(wall_clock), 
   
   L = for(1, N, fun() -> spawn(fun() -> wait() end) end), 
   {_, Time1} = statistics(runtime), 
   {_, Time2} = statistics(wall_clock), lists:foreach(fun(Pid) -> Pid ! die end, L), 
   
   U1 = Time1 * 1000 / N, 
   U2 = Time2 * 1000 / N, 
   io:format("Process spawn time=~p (~p) microseconds~n" , [U1, U2]).
   wait() -> 
   
   receive 
      die -> void 
   end. 
 
for(N, N, F) -> [F()]; 
for(I, N, F) -> [F()|for(I+1, N, F)]. 

start()->
   max(1000), 
   max(100000).

在任何具有良好处理能力的机器上,上述两个 max 函数都将通过。以下是上述程序的示例输出。

Maximum allowed processes:262144
Process spawn time=47.0 (16.0) microseconds
Maximum allowed processes:262144
Process spawn time=12.81 (10.15) microseconds

带超时的接收

有时,receive 语句可能会无限期地等待永远不会到达的消息。这可能是由于多种原因造成的。例如,我们的程序中可能存在逻辑错误,或者将要向我们发送消息的进程可能在发送消息之前已崩溃。为了避免此问题,我们可以向 receive 语句添加超时。这将设置进程等待接收消息的最大时间。

以下是带有指定超时的 receive 消息的语法

语法

receive 
Pattern1 [when Guard1] -> 
Expressions1; 

Pattern2 [when Guard2] ->
Expressions2; 
... 
after Time -> 
Expressions 
end

最简单的示例是创建一个休眠函数,如下面的程序所示。

示例

-module(helloworld). 
-export([sleep/1,start/0]). 

sleep(T) ->
   receive 
   after T -> 
      true 
   end. 
   
start()->
   sleep(1000).

上述代码将在实际退出之前休眠 1000 毫秒。

选择性接收

Erlang 中的每个进程都有一个关联的邮箱。当您向进程发送消息时,该消息将放入邮箱中。只有当您的程序评估 receive 语句时,才会检查此邮箱。

以下是选择性 receive 语句的一般语法。

语法

receive 
Pattern1 [when Guard1] ->
Expressions1; 

Pattern2 [when Guard1] ->
Expressions1; 
... 
after 
Time ->
ExpressionTimeout 
end

这就是上述 receive 语句的工作方式:

  • 当我们进入 receive 语句时,我们启动一个计时器(但仅当表达式中存在 after 部分时)。

  • 获取邮箱中的第一条消息,并尝试将其与 Pattern1、Pattern2 等进行匹配。如果匹配成功,则从邮箱中删除该消息,并评估模式后面的表达式。

  • 如果 receive 语句中的任何模式都不匹配邮箱中的第一条消息,则从邮箱中删除第一条消息并将其放入“保存队列”。然后尝试邮箱中的第二条消息。重复此过程,直到找到匹配的消息或检查完邮箱中的所有消息。

  • 如果邮箱中的任何消息都不匹配,则进程将被挂起,并在下次将新消息放入邮箱时重新安排执行。请注意,当新消息到达时,保存队列中的消息不会重新匹配;仅匹配新消息。

  • 一旦消息匹配,则所有已放入保存队列的消息都将按照它们到达进程的顺序重新输入邮箱。如果设置了计时器,则将其清除。

  • 如果我们在等待消息时计时器超时,则评估 ExpressionsTimeout 表达式并将任何已保存的消息按照它们到达进程的顺序放回邮箱。

广告