RxPY - Connectable 操作符
-
publish
此方法会将 observable 转换为可连接的 observable。句法
publish(mapper=None)
参数
mapper:可选。用于多次多播源值的函数,无需进行多次订阅。例子
from rx import create, range, operators as op import random def test_observable(observer, scheduler): observer.on_next(random.random()) observer.on_completed() source = create(test_observable).pipe(op.publish()) test1 = source.subscribe(on_next = lambda i: print("From subscriber 1 - {0}".format(i))) test2 = source.subscribe(on_next = lambda i: print("From subscriber 2 – {0}".format(i))) source.connect()
输出
E:\pyrx>python testrx.py From subscriber 1 - 0.14751607273318490 From subscriber 2 - 0.1475160727331849
-
ref_count
这个操作符将使 observable 成为一个普通的 observable。句法
ref_count()
例子
from rx import create, operators as op import random def test_observable(observer, scheduler): observer.on_next(random.random()) source = create(test_observable).pipe(op.publish(),op.ref_count()) test1 = source.subscribe(on_next = lambda i: print("From subscriber 1 - {0}".format(i))) test2 = source.subscribe(on_next = lambda i: print("From subscriber 2 - {0}".format(i)))
输出
E:\pyrx>python testrx.py From subscriber 1 - 0.8230640432381131
-
replay
此方法的工作方式类似于 replaySubject。这个方法将返回相同的值,即使 observable 已经发出,并且一些订阅者订阅晚了。句法
replay()
例子
from rx import create, range, operators as op import random from threading import Timer def test_observable(observer, scheduler): observer.on_next(random.random()) observer.on_completed() source = create(test_observable).pipe(op.replay()) test1 = source.subscribe(on_next = lambda i: print("From subscriber 1 - {0}".format(i))) test2 = source.subscribe(on_next = lambda i: print("From subscriber 2 - {0}".format(i))) source.connect() print("subscriber called after delay ") def last_subscriber(): test3 = source.subscribe(on_next = lambda i: print("From subscriber 3 - {0}".format(i))) t = Timer(5.0, last_subscriber) t.start()
输出
E:\pyrx>python testrx.py From subscriber 1 - 0.8340998157725388 From subscriber 2 - 0.8340998157725388 subscriber called after delay From subscriber 3 - 0.8340998157725388