I created a simulated stock ticker in Clojure using core.async.
(ns ticker.core (:require [clojure.core.async :refer [chan ! timeout go] :as async]))
In core.async functions in your program communicate over channels. There are several ways to put items into a channel and take them off, but in this project only uses >! and <!. These functions must be called within a 'go block'.
<! takes a value off of a channel when one is available. If no value is available, execution within the go block is suspended until a value becomes available, at which time execution resumes.
>! does just the opposite. It puts a value onto a channel when a channel can accept a value, and suspends if the channel is not ready to accept a value. An unbuffered channel can only accept a new value when another block is trying to take a value off of the channel. Channels can use buffers to accept values before readers are ready for them, but this project doesn't use them.
The other core.async function that I refer is timeout. timeout creates a channel that closes after a specified number of milliseconds. Reading from a closed channel immediately returns nil. As the name suggests, this can be used to implement timeouts, but it can also be used to create delays.
I have a couple of utility functions for adding some randomness to the price and timing of transactions. I also have a function that creates a map to represent each transaction.
(defn adjust-price [old-price] (let [numerator (- (rand-int 30) 15) adjustment (* numerator 0.01M)] (+ old-price adjustment))) (defn random-time [t] (* t (+ 1 (rand-int 5)))) (defn new-transaction [symbol price] {:symbol symbol :time (java.util.Date.) :price price})
The make-ticker function takes a stock symbol, a minimum number of milliseconds between transactions and a starting price. It returns a channel that will have a new transactions placed on it after a random interval as long as there is a listener to take the transactions off of the channel.
(defn make-ticker [symbol t start-price] (let [c (chan)] (go (loop [price start-price] (let [new-price (adjust-price price)] (<! (timeout (random-time t))) (>! c (new-transaction symbol new-price)) (recur new-price)))) c))
This function creates a channel, and then sets up an infinite loop that puts messages on to that channel. In terms of execution order what really happens is the channel is created and returned, a new price is calculated and the timeout is encountered. After the timeout has expired, a new value is put on the channel and we repeat the loop.
One thing that can get tricky with core.async is the lifetime of the channels. It is important to create the main channel 'c' outside of the loop because it needs to exist for the entire lifetime of the ticker. If it is created inside of the loop, each message will be on a new channel.
The timeout channel must be created inside of the loop. Each timeout is for a single use. If the timeout were created outside of the go loop, we would wait for the timeout channel to close during the first iteration. Later attempts to read the same channel would return immediately.
I created a collection of stocks symbols along with arbitrary values to use for the time interval and starting price.
(def stocks [ ;; symbol min-interval starting-price ["AAPL" 1400 537 ] ["AMZN" 4200 345] ["CSCO" 400 22] ["EBAY" 1200 55] ["GOOG" 8200 1127] ["IBM" 2200 192] ["MSFT" 500 40] ["ORCL" 1000 39] ["RHT" 10200 53] ["T" 600 35]])
Each stock symbol will have its transactions created on its own channel. For the ticker, we want to create a single channel that combines the outputs of each stock's channel. the merge function does exactly that, it takes several channels as inputs, and combines their outputs into a single channel.
(defn run-sim [] (let [ticker (async/merge (map #(apply make-ticker %) stocks))] (go (loop [x 0] (when (< x 1000) (do (println (str x "-" (<! ticker))) (recur (inc x))))))))
This function creates channels for each of the stock symbols and combines their outputs into a channel called ticker. It then creates a loop within a go block that will run until 1000 transactions have been printed out.
I chose to call the merge function with async/merge instead of bringing merge in with :refer. The other core.async functions have names that make it clear that they pertain to core.async. Merge is a name that could apply to lots of different things, so I wanted to be explicit.
Great example thanks
ReplyDelete