13 August, 2013

Making a streaming API from scraped data using Clojure


I recently found myself having to play around with some stock exchange data. The stock exchange in Nepal, unsurprisingly, doesn't provide a data API so I had to scrape their website. The non-realtime data isn't very interesting, just regular old scraping made a little more tedious by the fact that whoever designed the website didn't know about how to use HTML id attributes.

Now, to the live trading data. For the live data, the website shows a ticker of stock prices, which I think is a really bad representation of the data. If you want to know at what price ZXY was traded at, you have to wait till the end of the ticker. If the ZXY stock was all you were interested in, you'd still have to bear with the rest of the ticker. And to get the actual live data, you have to hit refresh. This is kind of okay on TV, but having to do this on a computer is terrible. Computers are more interactive than TV sets and should be treated as such. Bret Victor has given a great talk titled "Stop Drawing Dead Fish" that conveys this in a much more articulated way. The talk is about art, but I think having data represented on a ticker is like drawing dead fish.

So, I got around to thinking about how to build a better interface for the live trading data. To do that, I first had to build a streaming API which pushes stock prices as the trades happen. And doing that wasn't all that complicated, thanks to clojure.data/diff, watches and http-kit.

The first step is to pull in the page and scrape out the ticker data to get a map of the latest trades for each company like this:

{"ABC" {:price 100
        :volume 12} 
 "FOO" {:price 432
        :volume 22}
 "BAR" {:price 94
        :volume 34}}

I used laser for the scraping- you could use that; Enlive seems to be great too. I won't go into the details of the scraping.

Store this into an atom, lets call this atom current-prices. After, say, 5 seconds when we scrape again, new trades will have happened and the map we get will be different than the one above:

{"ABC" {:price 100
        :volume 12}
 "FOO" {:price 434 ;; this has changed
        :volume 300}
 "BAR" {:price 90 ;; this too
        :volume 25}}

Since we called our atom current-prices, it would be sensible to reset! it now to hold the second, more recent map of trading data. Its nice that we now have the trading data in a Clojure data structure but note that reset!-ing our atom is really just the equivalent of refreshing our browser- we aren't done yet.

Now, Clojure comes with a handy function called diff which is in the clojure.data namespace. Here's how it works:

(require 'clojure.data)

(clojure.data/diff {:a 42 :b "foo"}
                   {:a 43 :b "foo"})
;; ({:a 42} {:a 43} {:b "foo"})

The diff function tells how one data structure varies from another. The first map shows the key-value pairs that exist in the first map but not in the second; the second map shows the pairs existent in only the second map. And the third map shows the pairs that exist in both of the maps.

diff works on seqs too, but we won't bother with that right now.

Let's see what we get when we diff the older and newer versions of our current-prices atom:

(clojure.data/diff {"ABC" {:price 100
                           :volume 12} 
                    "FOO" {:price 432
                           :volume 22}
                    "BAR" {:price 94
                           :volume 34}}

                   {"ABC" {:price 100
                           :volume 12}
                    "FOO" {:price 434 ;; this has changed
                           :volume 300}
                    "BAR" {:price 90 ;; this too
                           :volume 25}})
;; ({"FOO" {:price 432, :volume 22},  "BAR" {:price 94, :volume 34}}
;;  {"FOO" {:price 434, :volume 300}, "BAR" {:price 90, :volume 25}}
;;  {"ABC" {:price 100, :volume 12}})

Great. This is telling us that no trade happened for ABC. For FOO and BAR this is showing the older and newer trading data.

Now, lets add a watch to our current-prices atom, so that whenever we pull in new data, the watch function finds out the stocks for which new trades happened and pushes its prices to the appropriate clients.

(add-watch current-prices :send
           (fn [key identity old new]
             (let [diff (clojure.data/diff old new)
                   new-trades (second diff)]
               (doseq [client @clients
                       trade new-trades]
                 (send! client (str (key trade)
                                    " traded for "
                                    (:price (val trade))))))))

Every time the current-prices atom is reset! or swap!-ed, the function above gets called.

Here we're simply sending all our clients a string. In practice, you'd probably pass JSON or EDN to only those clients who are interested in a specific company. The send! function is from http-kit which has a unified API for WebSockets, HTTP long polling and streaming. I wrote about using Websockets with http-kit in a previous post.

And that's it. We have now built a streaming API using just a watch function and clojure.data/diff. I think that's pretty cool.