XAML Playground
about XAML and other Amenities

Handling errors when calling network with Observable.FromEvent

2011-03-30T16:11:10+01:00 by Andrea Boschin

In my I've show a new way to write methods to call network resources like feeds, WCF methods and so on. I've proposed two solution that take advantage of Reactive Extension to call operations that follow the Begin-End pattern and also operation that use an event driven pattern. Today I would like to make another step forward to make the event driven side of the problem more reliable. Most of the times when you call the network you deal with this pattern instead of the Begin-End. Calling WCF and WebClient follow this pattern and we need to accurately handle the errors when we get the result from the calls.

The problem is that when you call a network resource this way you eventually get exceptions as a value of a property of the result instead of having them thrown as you may expect. This is obviously understandable due to the asynchronous nature of the calls. Also if the runtime will throw exceptions you would not be able to catch them because they are generated in another thread so the sole way to get them is that the runtime catch every exception and forward it using the Error property of the result. So here is how you commony handle the errors in network calls:

   1: public void client_DownloadStringCompleted(object sender, DownloadStringCompletedEventArgs e)
   2: {
   3:     if (e.Error != null)
   4:     {
   5:         // there is an exception
   6:         MessageBox.Show(e.Error.Message);
   7:     }
   8:     else
   9:     {
  10:         // no error occured: do what you want with the result
  11:     }
  12: }

Using my Reactive Extensions way to the asynchronicity this issue may be a big problem because, since the Subscribe method is able to catch exceptions along the execution of the event, it does not understand about the Error resulting from an asynchronous network call. In my last example I showed this code:

   1: IObservable<SyndicationFeed> result = Observable.FromEvent<DownloadStringCompletedEventHandler, DownloadStringCompletedEventArgs>(
   2:     ev => new DownloadStringCompletedEventHandler(ev),
   3:     ev => client.DownloadStringCompleted += ev,
   4:     ev => client.DownloadStringCompleted -= ev)
   5:     .Select(o => o.EventArgs.Result)
   6:     .ConvertToFeed();

Unfortunately it works perfectly when the network call returns a valid response but it is unable to catch an exception if something goes wrong. To solve this problem I wrote an extension method that can collect possible exceptions and rethrow them. The trick is made using a base class named "AsyncCompletedEventArgs" that is used by every EventArgs returned by a network call. Having this common base class is key to write a single method to handle a lot of cases. Here is the code from the ThrowIfError() method:

   1: public static IObservable<IEvent<T>> ThrowIfError<T>(this IObservable<IEvent<T>> observable)
   2:     where T : AsyncCompletedEventArgs
   3: {
   4:     return observable.SelectMany(
   5:         o =>
   6:         {
   7:             if (o.EventArgs.Error != null)
   8:                 throw o.EventArgs.Error;
  10:             return Observable.Return(o);
  11:         });
  12: }

The method is able to intercept and IObservable<IEvent<T>> that is the interface resulting from Observable.FromEvent() method. Once it receive the interface it examine the content and if it detects an error simple throw it. This make the runtime catch the exception and forward it to the OnError method of the Observer<T> that is the second parameter of a Subscribe method. If there is not any error the method simply returns the observable itself so the chain of method can be continued seamless. I use the SelectMany method to flatten the results from IObservable<IObservable<IEvent<T>>> (it's not a joke) to IObservable<IEvent<T>>. So now we can use the method everywhere we have to place a network call:

   1: public static IObservable<SyndicationFeed> DownloadFeedWebClient(Uri uri)
   2: {
   3:     WebClient client = new WebClient();
   5:     IObservable<SyndicationFeed> result = Observable.FromEvent<DownloadStringCompletedEventHandler, DownloadStringCompletedEventArgs>(
   6:         ev => new DownloadStringCompletedEventHandler(ev),
   7:         ev => client.DownloadStringCompleted += ev,
   8:         ev => client.DownloadStringCompleted -= ev)
   9:         .ThrowIfError()
  10:         .Select(o => o.EventArgs.Result)
  11:         .ConvertToFeed();
  13:     client.DownloadStringAsync(uri);
  15:     return result;
  16: }

This does not change anything to the way you can call the DownloadFeedWebClient method because it returns always an IObservable<T> so you can simply Subscribe to it and specify and handler for the OnError part of the Observer<T>. To view an example please visit my .

Using a new pattern to call the network with Reactive Extensions

2011-03-30T01:17:20+01:00 by codeblock

Recently I’m trying the Reactive Extensions, an interesting set of extensions that aim to improve the handling of asynchronous programming with an innovative implementation of the observer pattern. These extensions provide some useful methods that solve in an elegant way some recurring problems that everyone have found for sure when calling the network. So, after lot of attempts I found a pattern I will start to use implementing the Data Access facade in my Silverlight applications.

Before discovering the Reactive Extensions (starting from here I will refer to them using the abbreviation RX), I was habit to use a widely adopted pattern that exposes two additional parameters to every method of the Data Access class. For instance, a method made to download a feed from the network may have the following signature and body:

   1: public static void DownloadFeed(Uri uri, Action<SyndicationFeed> success, Action<Exception> fail)
   2: {
   3:     try
   4:     {
   5:         HttpWebRequest request = HttpWebRequest.CreateHttp(uri);
   7:         request.BeginGetResponse(
   8:             asyncResult =>
   9:             {
  10:                 try
  11:                 {
  12:                     WebResponse response = request.EndGetResponse(asyncResult);
  14:                     using (StreamReader sReader = new StreamReader(response.GetResponseStream()))
  15:                     {
  16:                         using (StringReader reader = new StringReader(sReader.ReadToEnd()))
  17:                         {
  18:                             using (XmlReader xReader = XmlReader.Create(reader))
  19:                             {
  20:                                 Deployment.Current.Dispatcher.BeginInvoke(
  21:                                     () => success(SyndicationFeed.Load(xReader)));
  22:                             }
  23:                         }
  24:                     }
  25:                 }
  26:                 catch (Exception ex)
  27:                 {
  28:                     fail(ex);
  29:                 }
  30:             }, null);
  31:     }
  32:     catch (Exception ex)
  33:     {
  34:         fail(ex);
  35:     }
  36: }

Using the HttpWebRequest made the code more complicated but the WebClient does not makes significant changes to the way the network call works. This pattern is interesting because using some lambda expression the resulting code remain compact. The drawback is that every method has two more parameters so the code is hardest to be understood.

The RX framework contains two methods that are useful to handle a network connection; Observable.FromAsyncPattern is really useful if you want to use an HttpWebRequest because it can automatically handle the Begin and End parts of the asynchronous pattern. On the other side, Observable.FromEvent can attach an event and gracefully detach when it is not reqired anymore; it is the case of the WebClient class tha expose a DownloadStringCompleted event. So we start handling the async pattern of an HttpWebRequest:

   1: public static IObservable<SyndicationFeed> DownloadFeedWebRequest(Uri uri)
   2: {
   3:     HttpWebRequest request = HttpWebRequest.CreateHttp(uri);
   5:     return Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)
   6:         .Invoke()
   7:         .ReadToEnd()
   8:         .ConvertToFeed()
   9:         .ObserveOnDispatcher();
  10: }

These few lines does the trick: The Observable.FromAsyncPattern method encapsulates BeginGetResponse and EndGetResponse. The resulting object is a synchronous representation of the async pattern. The Invoke method call the service then two methods I wrote read the resulting stream and convert the string to a syndicationfeed. Finally, with the ObserveOnDispatcher method I marshal the thread to the user interface using the dispatcher.

The return value is an IObservable<SyndicationFeed> instance that enable the caller to listen for the completion of the operation. The way the method is called is very similar to my previous patter but the method now has only the needed parameters:

   1: DataSource.DownloadFeedWebRequest(new Uri("http://api.twitter.com/1/statuses/public_timeline.atom"))
   2:     .Subscribe(
   3:         r =>
   4:         {
   5:             feed.ItemsSource = r.Items;
   6:         }, HandleExceptions);
The Subscribe method I use here exposes two callbacks, one provides the result of the method and the other is called when an exception occur. If you prefer to use the WebClient you can use the Observable.FromEvent method the complete the same operation.

   1: public static IObservable<SyndicationFeed> DownloadFeedWebClient(Uri uri)
   2: {
   3:     WebClient client = new WebClient();
   5:     IObservable<SyndicationFeed> result = Observable.FromEvent<DownloadStringCompletedEventHandler, DownloadStringCompletedEventArgs>(
   6:         ev => new DownloadStringCompletedEventHandler(ev),
   7:         ev => client.DownloadStringCompleted += ev,
   8:         ev => client.DownloadStringCompleted -= ev)
   9:         .Select(o => o.EventArgs.Result)
  10:         .ConvertToFeed();
  12:     client.DownloadStringAsync(uri);
  14:     return result;
  15: }

In the method we have to specify the attach and detach actions. These action will be called by the RX when the Subscribe method is complete; Here we do not need to call the ReadToEnd method because the WebClient returns directly a string and also we do not have to marshal to the UI Thread because the WebClient already does the trick. The select method convert the resulting EventArgs to the value expected from the ConvertToFeed.

I’m very intersted in your opinion on this example. My feel is that the code is more reusable, and compact, but if someone have a suggestion to improve the method pleare