RxPy - 示例



在本章中,我们将详细讨论以下主题:

  • 展示 Observable、操作符和订阅 Observer 工作的基本示例。
  • Observable 和 Subject 之间的区别。
  • 了解冷 Observable 和热 Observable。

下面是一个基本示例,展示了 Observable、操作符和订阅 Observer 的工作原理。

示例

test.py

import requests
import rx
import json
from rx import operators as ops
def filternames(x):
   if (x["name"].startswith("C")):
      return x["name"]
   else :
      return ""
content = requests.get('https://jsonplaceholder.typicode.com/users')
y = json.loads(content.text)
source = rx.from_(y)
case1 = source.pipe(
   ops.filter(lambda c: filternames(c)),
   ops.map(lambda a:a["name"])
)
case1.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)), 8. RxPy — Examples
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

这是一个非常简单的示例,我从以下 URL 获取用户数据:

https://jsonplaceholder.typicode.com/users。

过滤数据,以显示名称以“C”开头的用户,然后使用 map 函数仅返回名称。以下是输出:

E:\pyrx\examples>python test.py
Got - Clementine Bauch
Got - Chelsey Dietrich
Got - Clementina DuBuque
Job Done!

Observable 和 Subject 之间的区别

在本示例中,我们将了解 Observable 和 Subject 之间的区别。

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

输出

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

在上面的示例中,每次订阅 Observable 时,它都会为您提供新的值。

Subject 示例

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

输出

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

如果您看到这些值是通过 Subject 共享给两个订阅者的。

了解冷 Observable 和热 Observable

Observable 可分为

  • 冷 Observable
  • 热 Observable

当多个订阅者订阅时,会注意到 Observable 的差异。

冷 Observable

冷 Observable 是一种每次订阅时都会执行并生成数据的 Observable。当它被订阅时,Observable 会执行并提供新的值。

以下示例说明了冷 Observable 的理解。

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

输出

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

在上面的示例中,每次订阅 Observable 时,它都会执行 Observable 并发出值。如上例所示,这些值在不同的订阅者之间也可能有所不同。

热 Observable

对于热 Observable,它们将在准备好时发出值,并且不会总是等待订阅。当值发出时,所有订阅者都将获得相同的值。

当您希望在 Observable 准备好时发出值,或希望将相同的值共享给所有订阅者时,可以使用热 Observable。

热 Observable 的示例包括 Subject 和可连接操作符。

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

输出

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

如果您看到,相同的值在订阅者之间共享。您可以使用 publish() 可连接 Observable 操作符来实现相同的效果。

广告

© . All rights reserved.