响应式编程
响应式编程是一种处理数据流和变化传播的编程范式。这意味着当一个组件发出数据流时,响应式编程库会将变化传播到其他组件。变化的传播将持续进行,直到到达最终接收者。事件驱动编程和响应式编程的区别在于,事件驱动编程围绕事件展开,而响应式编程围绕数据展开。
用于响应式编程的 ReactiveX 或 RX
ReactiveX 或 Reactive Extension 是响应式编程最著名的实现。ReactiveX 的工作依赖于以下两个类:
Observable 类
此类是数据流或事件的来源,它打包传入数据,以便数据可以从一个线程传递到另一个线程。在某些观察者订阅它之前,它不会提供数据。
Observer 类
此类使用Observable发出的数据流。可以有多个观察者与 Observable 相关联,每个观察者都会收到发出的每个数据项。观察者可以通过订阅 Observable 来接收三种类型的事件:
on_next() 事件 - 表示数据流中存在一个元素。
on_completed() 事件 - 表示发射结束,不再有项目到来。
on_error() 事件 - 也表示发射结束,但在Observable抛出错误的情况下。
RxPY – Python 响应式编程模块
RxPY 是一个可用于响应式编程的 Python 模块。我们需要确保该模块已安装。可以使用以下命令安装 RxPY 模块:
pip install RxPY
示例
以下是一个 Python 脚本,它使用RxPY模块及其Observable和Observe for类进行响应式编程。基本上有两个类:
get_strings() - 用于从观察者获取字符串。
PrintObserver() - 用于打印观察者中的字符串。它使用观察者类的所有三个事件。它还使用 subscribe() 类。
from rx import Observable, Observer def get_strings(observer): observer.on_next("Ram") observer.on_next("Mohan") observer.on_next("Shyam") observer.on_completed() class PrintObserver(Observer): def on_next(self, value): print("Received {0}".format(value)) def on_completed(self): print("Finished") def on_error(self, error): print("Error: {0}".format(error)) source = Observable.create(get_strings) source.subscribe(PrintObserver())
输出
Received Ram Received Mohan Received Shyam Finished
用于响应式编程的 PyFunctional 库
PyFunctional是另一个可用于响应式编程的 Python 库。它使我们能够使用 Python 编程语言创建函数式程序。它很有用,因为它允许我们通过使用链接的函数运算符来创建数据管道。
RxPY 和 PyFunctional 之间的区别
这两个库都用于响应式编程,并以类似的方式处理流,但两者之间主要的区别取决于数据的处理方式。RxPY处理系统中的数据和事件,而PyFunctional专注于使用函数式编程范式转换数据。
安装 PyFunctional 模块
在使用此模块之前,我们需要安装它。可以使用 pip 命令安装它,如下所示:
pip install pyfunctional
示例
以下示例使用PyFunctional模块及其seq类,该类充当流对象,我们可以使用它进行迭代和操作。在此程序中,它使用 lambda 函数将序列映射为每个值的双倍,然后过滤 x 大于 4 的值,最后将序列减少为所有剩余值的总和。
from functional import seq result = seq(1,2,3).map(lambda x: x*2).filter(lambda x: x > 4).reduce(lambda x, y: x + y) print ("Result: {}".format(result))
输出
Result: 6