Archive for the ‘Reactive Extensions’ Category
I was pleased to know that Netflix had OData API to query. The practical reason is obviously was to use the API to query for the movies I want to watch. Like I mentioned in my previous post, I will be using LINQPad 4 for querying purposes, because of its built-in capabilities for OData as well as for Rx.
One thing I discovered after playing around with OData is that not every query operator in LINQ is available in OData. For example the Netflix API has only for 4 operators which are
And also the query returns only 20 rows as the result for each request. So for example if I have to get 40 rows, on my first request the server would return 20 rows and in my next request I would have to skip first 20 and take next 20 to get 40 rows. These are some of the limitations.
Here is what I wanted from Netflix, I wanted to movie listings that has an average rating greater than 3.5 ,ordered by release year descending and grouped by listings that are available for instant watch. So that I can have one queue for movies that I want to watch online and another one that I can request via mail (the ones that is not available in instant watch). And here is the query to do that
var movies = from counter in (from e in Enumerable.Range(0,400) where e%20 == 0 select e).ToObservable() from movieTitle in Titles.Where (t => t.AverageRating > 3.5).OrderByDescending (t => t.ReleaseYear).Skip(counter).Take(20).ToObservable() select movieTitle; var moviesILikeToWatch = from counter in movies group counter by counter.Instant.Available into g select g; moviesILikeToWatch.Dump();
The first “from counter” query is to build the skip part, like I mentioned by default the result returns only 20 rows I wanted 400 rows to achieve that I used the enumerable range to generate sequence that I can use for skipping in my next query. I could have very well used for loop to build this, but that is not what I want. I want to try and write terse code. These are actual calls to Netflix OData API
http://odata.netflix.com/Catalog/Titles()?$filter=AverageRating gt 3.5&$orderby=ReleaseYear desc&$skip=0&$top=20
http://odata.netflix.com/Catalog/Titles()?$filter=AverageRating gt 3.5&$orderby=ReleaseYear desc&$skip=20&$top=20
In the below picture linqpad makes 20 calls to Netflix for getting 400 movie listings
The next line in the first query “from movieTitle” is simple Linq query to get movies based on filter criteria along with skip and take. The reason for the second query is because the OData API doesn’t provide a groupby operator and if I include it in my first query , Linqpad would try and convert it to OData specific request which would fail. So essentially I am getting all the data from the server and then grouping it locally.
This wouldn’t have been possible without OData.
I saw this cool post from Scott Hanselman on creating a OData API for stackoverflow. I use LINQPad more often than anything. And sometimes when I am not very busy, I also look for unanswered questions in stackoverflow. I have been playing around with Reactive Extensions. FYI LINQPad 4.0 supports Rx. So I thought how cool will be it if I have to look for unanswered “windbg” questions from stackoverflow , so that I could answer them. And here is the query
var windbgQuestions = from time in Observable.Interval(TimeSpan.FromMinutes(1)) from post in Posts.ToObservable() where post.AnswerCount == 0 && post.Tags.Contains("windbg") select post.Body; windbgQuestions.Dump();
I know this will not work now. But how cool it is to combine these frameworks write very succinct code to get what we want, without having to jump through hoops.
One of the key things in software is to write succinct , declarative and asynchronous code. The answer to that is Reactive Extensions for .NET
I have been digging into Rx for sometime now. Though there isn’t much of documentation I have been kind of successful in getting certain things done with it.
One of requirement that came up from our Operations was to monitor some websites and notify someone if the site was not accessible. But the added constraints to this were check for the site status only at a certain interval and report failure only if it was more than certain percentage within a certain duration.
So it was like check the site status every 5 seconds , buffer the results for a minute and in the buffered response if it had more 3 failures then tweet someone about it.
And here is the code to solve the problem
(from time in Observable.Interval(TimeSpan.FromSeconds(5)) let req = WebRequest.Create("http://www.nonexisting.com") from res in Observable. FromAsyncPattern<WebResponse>( req.BeginGetResponse, req.EndGetResponse)() .Materialize()select res).Buffer(new TimeSpan(0, 0, 1, 0)). Select(failed => failed.Where( n => n.Kind == NotificationKind.OnError)). Where(failed => failed.Count() > 3). Subscribe(x => Tweet("Nonexisting.com Failed thrice")); Console.Read();
The “from time in Observable.Interval(TimeSpan.FromSeconds(5)” is for ensuring that an Observable is generated every 5 seconds to check the status of the website. In the next line the code creates a web request.
Using the FromAsyncPattern I was able to reduce all the plumbing code to handle async i/o calls for the web request. The materialize is for the sequence to continue even if there is an exception. Here is an good write up on Materialize. And the rest is just the usual Linq where the code filters Notification type of error.
This was a fun exercise. I will continue to explore Rx and blog about it.