|
來源:richiezhang
鏈接:http://www.cnblogs.com/richieyang/p/4974630.html#top
一、反應式編程(Reactive Programming)
1、什么是反應式編程:反應式編程(Reactive programming)簡稱Rx,他是一個使用LINQ風格編寫基于觀察者模式的異步編程模型。簡單點說Rx = Observables + LINQ + Schedulers。
2、為什么會產(chǎn)生這種風格的編程模型?我在本系列文章開始的時候說過一個使用事件的例子:
var watch = new FileSystemWatcher(); watch.Created += (s, e) => { var fileType = Path.GetExtension(e.FullPath); if (fileType.ToLower() == 'jpg') { //do some thing } };
這個代碼定義了一個FileSystemWatcher,然后在Watcher事件上注冊了一個匿名函數(shù)。事件的使用是一種命令式代碼風格,有沒有辦法寫出聲明性更強的代碼風格?我們知道使用高階函數(shù)可以讓代碼更具聲明性,整個LINQ擴展就是一個高階函數(shù)庫,常見的LINQ風格代碼如下:
var list = Enumerable.Range(1, 10) .Where(x => x > 8) .Select(x => x.ToString()) .First();
能否使用這樣的風格來編寫事件呢?
3、事件流 LINQ是對IEnumerable的一系列擴展方法,我們可以簡單的將IEnumerable認為是一個集合。當我們將事件放在一個時間范圍內(nèi),事件也變成了集合。我們可以將這個事件集合理解為事件流。

事件流的出現(xiàn)給了我們一個能夠對事件進行LINQ操作的靈感。
二、反應式編程中的兩個重要類型
事件模型從本質(zhì)上來說是觀察者模式,所以IObservable和IObserver也是該模型的重頭戲。讓我們來看看這兩個接口的定義:
public interface IObservable { //Notifies the provider that an observer is to receive notifications. IDisposable Subscribe(IObserver observer); }
public interface IObserver { //Notifies the observer that the provider has finished sending push-based notifications. void OnCompleted(); //Notifies the observer that the provider has experienced an error condition. void OnError(Exception error); //Provides the observer with new data. void OnNext(T value); }
這兩個名稱準確的反應出了它兩的職責:IObservable-可觀察的事物,IObserver-觀察者。
IObservable只有一個方法Subscribe(IObserver observer),此方法用來對事件流注冊一個觀察者。
IObserver有三個回調(diào)方法。當事件流中有新的事件產(chǎn)生的時候會回調(diào)OnNext(T value),觀察者會得到事件中的數(shù)據(jù)。OnCompleted()和OnError(Exception error)則分別用來通知觀察者事件流已結束,事件流發(fā)生錯誤。
顯然事件流是可觀察的事物,我們用Rx改寫上面的例子:
Observable.FromEventPattern(watch, 'Created') .Where(e => Path.GetExtension(e.EventArgs.FullPath).ToLower() == 'jpg') .Subscribe(e => { //do some thing });
注:在.net下使用Rx編程需要安裝以下Nuget組件:
Install-Package Rx-main
三、UI編程中使用Rx
Rx模型不但使得代碼更加具有聲明性,Rx還可以用在UI編程中。
1、UI編程中的第一段Rx代碼
為了簡單的展示如何在UI編程中使用Rx,我們以Winform中的Button為例,看看事件模型和Rx有何不同。
private void BindFirstGroupButtons() { btnFirstEventMode.Click += btnFirstEventMode_Click; } void btnFirstEventMode_Click(object sender, EventArgs e) { MessageBox.Show('hello world'); }
添加了一個Button,點擊Button的時候彈出一個對話框。使用Rx做同樣的實現(xiàn):
//得到了Button的Click事件流。 var clickedStream = Observable.FromEventPattern(btnFirstReactiveMode, 'Click'); //在事件流上注冊了一個觀察者。 clickedStream.Subscribe(e => MessageBox.Show('Hello world'));
有朋友指出字符串“Click”非常讓人不爽,這確實是個問題。由于Click是一個event類型,無法用表達式樹獲取其名稱,最終我想到使用擴展方法來實現(xiàn):
public static IObservable> FromClickEventPattern(this Button button) { return Observable.FromEventPattern(button, 'Click'); } public static IObservable> FromDoubleClickEventPattern(this Button button) { return Observable.FromEventPattern(button, 'DoubleClick'); }
我們平時常用的事件類型也就那么幾個,可以暫時通過這種方案來實現(xiàn),該方案算不上完美,但是比起直接使用字符串又能優(yōu)雅不少。
btnFirstReactiveMode.FromClickEventPattern() .Subscribe(e => MessageBox.Show('hello world'));
2、UI編程中存在一個很常見的場景:當一個事件的注冊者阻塞了線程時,整個界面都處于假死狀態(tài)。.net中的異步模型也從APM,EAP,TPL不斷演化直至async/await模型的出現(xiàn)才使得異步編程更加簡單易用。我們來看看界面假死的代碼:
void btnSecondEventMode_Click(object sender, EventArgs e) { btnSecondEventMode.BackColor = Color.Coral; Thread.Sleep(2000); lblMessage.Text = 'event mode'; }
Thread.Sleep(2000);模擬了一個長時間的操作,當你點下Button時整個界面處于假死狀態(tài)并且此時的程序無法響應其他的界面事件。傳統(tǒng)的解決方案是使用多線程來解決假死:
BtnSecondEventAsyncModel.BackColor = Color.Coral; Task.Run(() => { Thread.Sleep(2000); Action showMessage = () => lblMessage.Text = 'async event mode'; lblMessage.Invoke(showMessage); });
這個代碼的復雜點在于:普通的多線程無法對UI進行操作,在Winform中需要用Control.BeginInvoke(Action action)經(jīng)過包裝后,多線程中的UI操作才能正確執(zhí)行,WPF則要使用Dispatcher.BeginInvoke(Action action)包裝。
Rx方案:
btnSecondReactiveMode.FromClickEventPattern() .Subscribe(e => { Observable.Start(() => { btnSecondReactiveMode.BackColor = Color.Coral; Thread.Sleep(2000); return 'reactive mode'; }) .SubscribeOn(ThreadPoolScheduler.Instance) .ObserveOn(this) .Subscribe(x => { lblMessage.Text = x; }); });
一句SubscribeOn(ThreadPoolScheduler.Instance)將費時的操作跑在了新線程中,ObserveOn(this)讓后面的觀察者跑在了UI線程中。
注:使用ObserveOn(this)需要使用Rx-WinForms
Install-Package Rx-WinForms
這個例子雖然成功了,但是并沒有比BeginInvoke(Action action)的方案有明顯的進步之處。在一個事件流中再次使用Ovservable.Start()開啟新的觀察者讓人更加摸不著頭腦。這并不是Rx的問題,而是事件模型在UI編程中存在局限性:不方便使用異步,不具備可測試性等。以XMAL和MVVM為核心的UI編程模型將在未來處于主導地位,由于在MVVM中可以將UI綁定到一個Command,從而解耦了事件模型。
開源項目ReactiveUI提供了一個以Rx基礎的UI編程方案,可以使用在XMAL和MVVM為核心的UI編程中,例如:Xamarin,WFP,Windows Phone8等開發(fā)中。
注:在WPF中使用ObserveOn()需要安裝Rx-WPF
Install-Package Rx-WPF
3、再來一個例子,讓我們感受一下Rx的魅力

界面上有兩個Button分別為+和-操作,點擊+按鈕則+1,點擊-按鈕則-1,最終的結果顯示在一個Label中。 這樣的一個需求使用經(jīng)典事件模型只需要維護一個內(nèi)部變量,兩個按鈕的Click事件分別對變量做加1或減1的操作即可。
Rx作為一種函數(shù)式編程模型講求immutable-不可變性,即不使用變量來維護內(nèi)部狀態(tài)。
var increasedEventStream = btnIncreasement.FromClickEventPattern() .Select(_ => 1); var decreasedEventStream = btnDecrement.FromClickEventPattern() .Select(_ => -1); increasedEventStream.Merge(decreasedEventStream) .Scan(0, (result, s) => result + s) .Subscribe(x => lblResult.Text = x.ToString());
這個例子使用了IObservable的”謂詞”來對事件流做了一些操作。
下面就讓我們來看看IObservable中常用的“謂詞”
四、IObservable中的謂詞
IObservable的靈感來源于LINQ,所以很多操作也跟LINQ中的操作差不多,例如Where、First、Last、Single、Max、Any。 還有一些“謂詞”則是新出現(xiàn)的,例如上面提到的”Merge”、“Scan”等,為了理解這些“謂詞”的含義,我們請出一個神器RxSandbox。
1、Merge操作,從下面的圖中我們可以清晰的看出Merge操作將三個事件流中的事件合并在了同一個時間軸上。

2、Where操作則是根據(jù)指定的條件篩選出事件。

有了這個工具我們可以更加方便的了解這些“謂詞”的用途。
五、IObservable的創(chuàng)建
Observable類提供了很多靜態(tài)方法用來創(chuàng)建IObservable,之前的例子我們都使用FromEventPattern方法來將事件轉化為IObservable,接下來再看看別的方法。
Return可以創(chuàng)建一個具體的IObservable:
public static void UsingReturn() { var greeting = Observable.Return('Hello world'); greeting.Subscribe(Console.WriteLine); }
Create也可以創(chuàng)建一個IObservable,并且擁有更加豐富的重載:
public static void UsingCreate() { var greeting = Observable.Create(observer => { observer.OnNext('Hello world'); return Disposable.Create(() => Console.WriteLine('Observer has unsubscribed')); }); greeting.Subscribe(Console.WriteLine); }
Range方法可以產(chǎn)生一個指定范圍內(nèi)的IObservable
Observable.Range(1, 10) .Subscribe(x => Console.WriteLine(x.ToString()));
Generate方法是一個折疊操作的逆向操作,又稱Unfold方法:
public static void UsingGenerate() { var range = Observable.Generate(0, x => x x + 1, x => x); range.Subscribe(Console.WriteLine); }
Interval方法可以每隔一定時間產(chǎn)生一個IObservable:
Observable.Interval(TimeSpan.FromSeconds(1)) .Subscribe(x => Console.WriteLine(x.ToString()));
Subscribe方法有一個重載,可以分別對Observable發(fā)生異常和Observable完成定義一個回調(diào)函數(shù)。
Observable.Range(1, 10) .Subscribe(x => Console.WriteLine(x.ToString()), e => Console.WriteLine('Error' + e.Message), () => Console.WriteLine('Completed'));
還可以將IEnumerable轉化為IObservable類型:
Enumerable.Range(1, 10).ToObservable() .Subscribe(x => Console.WriteLine(x.ToString()));
也可以將IObservable轉化為IEnumerable
var list= Observable.Range(1, 10).ToEnumerable();

六、Scheduler
Rx的核心是觀察者模式和異步,Scheduler正是為異步而生。我們在之前的例子中已經(jīng)接觸過一些具體的Scheduler了,那么他們都具體是做什么的呢?
1、先看下面的代碼:
public static void UsingScheduler() { Console.WriteLine('Starting on threadId:{0}', Thread.CurrentThread.ManagedThreadId); var source = Observable.Create( o => { Console.WriteLine('Invoked on threadId:{0}', Thread.CurrentThread.ManagedThreadId); o.OnNext(1); o.OnNext(2); o.OnNext(3); o.OnCompleted(); Console.WriteLine('Finished on threadId:{0}',Thread.CurrentThread.ManagedThreadId); return Disposable.Empty; }); source //.SubscribeOn(NewThreadScheduler.Default) //.SubscribeOn(ThreadPoolScheduler.Instance) .Subscribe( o => Console.WriteLine('Received {1} on threadId:{0}',Thread.CurrentThread.ManagedThreadId,o), () => Console.WriteLine('OnCompleted on threadId:{0}',Thread.CurrentThread.ManagedThreadId)); Console.WriteLine('Subscribed on threadId:{0}', Thread.CurrentThread.ManagedThreadId); }
當我們不使用任何Scheduler的時候,整個Rx的觀察者和主題都跑在主線程中,也就是說并沒有異步執(zhí)行。正如下面的截圖,所有的操作都跑在threadId=1的線程中。

當我們使用SubscribeOn(NewThreadScheduler.Default)或者SubscribeOn(ThreadPoolScheduler.Instance)的時候,觀察者和主題都跑在了theadId=3的線程中。

這兩個Scheduler的區(qū)別在于:NewThreadScheduler用于執(zhí)行一個長時間的操作,ThreadPoolScheduler用來執(zhí)行短時間的操作。
2、SubscribeOn和ObserveOn的區(qū)別
上面的例子僅僅展示了SubscribeOn()方法,Rx中還有一個ObserveOn()方法。stackoverflow上有一個這樣的問題:What’s the difference between SubscribeOn and ObserveOn,其中一個簡單的例子很好的詮釋了這個區(qū)別。



至此結論應該非常清晰了:SubscribeOn()和ObserveOn()分別控制著主題和觀察者的異步。
七、其他Rx資源
除了.net中的Rx.net,其他語言也紛紛推出了自己的Rx框架。
參考資源:
http://rxwiki./101samples
http:///Content/v1.0.10621.0/01_WhyRx.html#WhyRx
http://www./Articles/646361/Reactive-Programming-For-NET-And-Csharp-Developers
|