RxPY - 创建 Observable
创建
此方法用于创建 Observable。它将包含 observer 方法,即:
on_next() - 当 Observable 发射一个项目时,此函数会被调用。
on_completed() - 当 Observable 完成时,此函数会被调用。
on_error() - 当 Observable 发生错误时,此函数会被调用。
下面是一个工作示例:
testrx.py
from rx import create
def test_observable(observer, scheduler):
observer.on_next("Hello")
observer.on_error("Error occured")
observer.on_completed()
source = create(test_observable)
source.subscribe(
on_next = lambda i: print("Got - {0}".format(i)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!"),
)
下面是创建的 Observable 的输出:
E:\pyrx>python testrx.py Got - Hello Job Done!
空
此 Observable 不会输出任何内容,并直接发射完成状态。
语法
empty()
返回值
它将返回一个不包含任何元素的 Observable。
示例
from rx import empty
test = empty()
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
输出
E:\pyrx>python testrx.py Job Done!
永不完成
此方法创建一个永远不会到达完成状态的 Observable。
语法
never()
返回值
它将返回一个永远不会完成的 Observable。
示例
from rx import never
test = never()
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
输出
It does not show any output.
抛出异常
此方法将创建一个会抛出错误的 Observable。
语法
throw(exception)
参数
exception:包含错误详细信息的对象。
返回值
返回一个包含错误详细信息的 Observable。
示例
from rx import throw
test = throw(Exception('There is an Error!'))
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
输出
E:\pyrx>python testrx.py Error: There is an Error!
从迭代器创建
此方法将给定的数组或对象转换为 Observable。
语法
from_(iterator)
参数
iterator:对象或数组。
返回值
这将为给定的迭代器返回一个 Observable。
示例
from rx import from_
test = from_([1,2,3,4,5,6,7,8,9,10])
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
输出
E:\pyrx>python testrx.py The value is 1 The value is 2 The value is 3 The value is 4 The value is 5 The value is 6 The value is 7 The value is 8 The value is 9 The value is 10 Job Done!
间隔
此方法将在超时后产生一系列值。
语法
interval(period)
参数
period:启动整数序列。
返回值
它返回一个包含所有按顺序排列的值的 Observable。
示例
import rx
from rx import operators as ops
rx.interval(1).pipe(
ops.map(lambda i: i * i)
).subscribe(lambda x: print("The value is {0}".format(x)))
input("Press any key to exit\n")
输出
E:\pyrx>python testrx.py Press any key to exit The value is 0 The value is 1 The value is 4 The value is 9 The value is 16 The value is 25 The value is 36 The value is 49 The value is 64 The value is 81 The value is 100 The value is 121 The value is 144 The value is 169 The value is 196 The value is 225 The value is 256 The value is 289 The value is 324 The value is 361 The value is 400
仅值
此方法将给定值转换为 Observable。
语法
just(value)
参数
value:要转换为 Observable 的值。
返回值
它将返回一个包含给定值的 Observable。
示例
from rx import just
test = just([15, 25,50, 55])
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
输出
E:\pyrx>python testrx.py The value is [15, 25, 50, 55] Job Done!
范围
此方法根据给定的输入提供一系列整数。
语法
range(start, stop=None)
参数
start:范围开始的第一个值。
stop:可选,范围停止的最后一个值。
返回值
这将返回一个根据给定输入包含整数值的 Observable。
示例
from rx import range
test = range(0,10)
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
输出
E:\pyrx>python testrx.py The value is 0 The value is 1 The value is 2 The value is 3 The value is 4 The value is 5 The value is 6 The value is 7 The value is 8 The value is 9 Job Done!
重复值
此方法将创建一个 Observable,该 Observable 将根据给定的次数重复给定值。
语法
repeat_value(value=None, repeat_count=None)
参数
value:可选。要重复的值。
repeat_count:可选。要重复给定值的次数。
返回值
它将返回一个 Observable,该 Observable 将根据给定的次数重复给定值。
示例
from rx import repeat_value
test = repeat_value(44,10)
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
输出
E:\pyrx>python testrx.py The value is 44 The value is 44 The value is 44 The value is 44 The value is 44 The value is 44 The value is 44 The value is 44 The value is 44 The value is 44 Job Done!
启动
此方法以函数作为输入,并返回一个 Observable,该 Observable 将返回输入函数中的值。
语法
start(func)
参数
func:将被调用的函数。
返回值
它返回一个 Observable,该 Observable 将包含输入函数的返回值。
示例
from rx import start
test = start(lambda : "Hello World")
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
输出
E:\pyrx>python testrx.py The value is Hello World Job Done!
计时器
此方法将在超时完成后按顺序发射值。
语法
timer(duetime)
参数
duetime:发射第一个值之后的时间。
返回值
它将返回一个在 duetime 之后发射值的 Observable。
示例
import rx
from rx import operators as ops
rx.timer(5.0, 10).pipe(
ops.map(lambda i: i * i)
).subscribe(lambda x: print("The value is {0}".format(x)))
input("Press any key to exit\n")
输出
E:\pyrx>python testrx.py Press any key to exit The value is 0 The value is 1 The value is 4 The value is 9 The value is 16 The value is 25 The value is 36 The value is 49 The value is 64