SkillAgentSearch skills...

Lmalob

Code Walkthrough: Maintaining a non-aggregated Limit Order Book for Bitcoin with the Coinbase Pro API in Clojure

Install / Use

/learn @jjttjj/Lmalob
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

Maintaining a non-aggregated Limit Order Book for Bitcoin with the Coinbase Pro api in clojure

Let's make a LOB!

Let's make a limit order book from the coinbase pro api.

This is a walkthrough of the code here.

A limit order book is a data structure containing all resting orders for a market. For each price level it contains information on the liquidity available for that price.

Often a LOB a data source provides might be aggregated, where for each price level, we can see only the total size available at that price. Coinbase's websocket api contains a convenient channel to subscribe to for this. If you subscribe to the level2 channel, you will receive a huge message with "type" "snapshot" that will contain the current coinbase aggregated LOB, followed by subsequent messages containing batched changes to make to this snapshot.

We want a challenge though, so we will make a non-aggregated LOB where every single individual order will be kept. So we need to store them in a way that we can

  1. Add an order in such a way that it's price/time priority position relative to all other orders can be examined
  2. Cancel an individual order.

A non-aggregated LOB let's us analyze information that is lost in an aggregated LOB, such as the rate of cancellations, the exact queue position of a particular order, patterns in order placement and cancellations which may hint at an origin or possible intent of these orders, and so on.

Our goal is to turn the Coinbase Pro Full Channel into a non-aggregated LOB that's kept up to date in real time.

For this we'll need a websocket connection to use the feed, an http client to request the initial LOB data to build off of, a json parser to parse the data Coinbase gives us

Dependencies

{
  org.clojure/clojure         {:mvn/version "1.10.2-rc1"}
  org.clojure/data.json       {:mvn/version "1.0.0"}
  java-http-clj/java-http-clj {:mvn/version "0.4.1"}
  org.clojure/core.async      {:mvn/version "1.2.603"}
  dev.jt/lob                  {:git/url "https://github.com/jjttjj/lob.git"
                               :sha     "f616f3c0728f0a9cd016093d552e1c9aa0fb72a8"}

  }

The most notable dependency is probably java-http-clj which offers a minimal wrapper over the built in java.net.http HTTP and websocket clients. I've been using this lately as a no-frills http/websocket client and it has served me well so far. It's an extremely small wrapper on top of the built in java http libraries and uses no dependencies itself.

Requires

(ns lmalob
  (:require [java-http-clj.core :as http]
            [java-http-clj.websocket :as ws]
            [clojure.data.json :as json]
            [clojure.core.async :as a :refer
             [<! <!! >! >!! alt! alts! chan go go-loop poll!]]
            [dev.jt.lob :as lob]))

Util

(defn info [& xs] (apply println xs))

(defn uuid [s] (java.util.UUID/fromString s))

(defn cf->ch [^java.util.concurrent.CompletableFuture cf ch]
  (.whenCompleteAsync cf
    (reify
      java.util.function.BiConsumer
      (accept [_ result exception]
        (a/put! ch (or result exception)))))
  ch)

I'll use info as a placeholder for some real logging I may eventually want to do. uuid parses uuids strings. cf->ch puts the result of a java CompletableFuture onto a core.async chan. The java-http-clj library gives us the option to have results returned on a CompletableFuture to use them asynchronously. This helper function lets me use core.async instead.

First we will need a way to interact with both the REST and websocket APIs coinbase offers. There are many options for these things today in clojure land. Recently I've been using java-http-clj which offers a minimal wrapper over the built in java.net.http HTTP and websocket clients, and uses no dependencies.

API notes

Coinbase is kind enough to offer their real data publicly. We don't technically need to use the sandbox URLs, since we're not using any endpoints that are potentially dangerous (such as placing orders). However, the full LOB data we're requesting is huge (~20MB per request), and the sandbox versions are significantly smaller (1-2MB) so it is polite of us (and prevents our IP from being potentially banned for abuse) to use the sandbox endpoints until we ready to try things out with the real feed.

(def rest-url "https://api.pro.coinbase.com")
(def sandbox-rest-url "https://api-public.sandbox.pro.coinbase.com")

For this demo we used Bitcoin, but any of the currencies Coinbase offers works just as well, just replace "BTC-USD" with the appropriate product id. You can see all products with the following code. Note that the sandbox API provides much fewer products than the "real" one.


(-> (http/get (str sandbox-rest-url "/products"))
    :body
    (json/read-str :key-fn keyword)
    (->> (map :id)))

The LOB snapshot

We'll need a snapshot to use as a starting point to build the current LOB from.

The Order Book endpoint, with the level parameter set to 3, will give us a good starting point.

(http/get (str sandbox-rest-url "/products/BTC-USD/book?level=3"))

The returned JSON looks like:

{
    "sequence": "3",
    "bids": [
        [ "295.96","0.05088265","3b0f1225-7f84-490b-a29f-0faef9de823a" ],
        ...
    ],
    "asks": [
        [ "295.97","5.72036512","da863862-25f4-4868-ac41-005d11ab0a5f" ],
        ...
    ]
}

It is important that the we parse the prices as bigdec because they will serve as keys in our lob, and doubles make for bad map keys because of their rounding errors. We'll make size a bigdec as well.

The :sequence value represents a number that is guaranteed to be increasing with time. When comparing two messages, a higher sequence number will mean that message definitely occurred later. This will be important for us later.

We also want to make our requests asynchronously because these are big/slow requests and we'll have stuff to do while waiting for the response

(defn request-ch [req-map ch]
  (-> (http/send-async req-map)
      (cf->ch ch)))

(defn parse-level3-val [k v]
  (case k
    (:asks :bids) (mapv (fn [[px sz id]] [(bigdec px) (bigdec sz) (uuid id)]) v)
    ;;else return unchanged string
    v)
  )
(defn parse-level3 [s]
  (json/read-str s
    :key-fn keyword
    :value-fn parse-level3-val))

(defn req-level3 [sandbox? ch]
  (let [base   (if sandbox? sandbox-websocket-url rest-url)
        result (chan 1 (map (fn [response] (parse-level3 (:body response)))))]
    (request-ch 
      {:method :get
       :uri    (str base "/products/BTC-USD/book?level=3")}
      result)
    (a/pipe result ch)))

Now we can conveniently request the level3 data be parsed into nice clojure types and put onto a channel we provide:

(<!! (req-level3 true (a/chan)))
{:bids [[30997.47M 3009.9993548M #uuid "38bdbefd-bdf7-4b91-abf4-d995fe12682f"]
        [30997.46M 4771M #uuid "8b4e1e8f-158b-4a6c-bf9f-d99ecb318e39"]
        [30997.45M 6020M #uuid "555e571b-3198-498d-bfea-09bf0a4f11ef"]
        ...],
 :asks [[30997.49M 3009.99292034M #uuid "a84b9fe5-1b1a-4dbd-9fa8-3829cb279f6d"]
        [30997.5M 4771M #uuid "1fdb4052-5c9e-4f16-b168-ea44ea0a5f40"]
        [30997.51M 6020M #uuid "def3dfc6-7d9b-44b9-8ee8-80e23181d83c"]
        ...],
 :sequence 243571813}

Now we need to get that data into a structure that will let us efficiently add and remove orders. I released the hilariously small https://github.com/jjttjj/lob "library" for this purpose.

(defn cb-level3->lob [{:keys [asks bids sequence] :as cb-lob}]
  (as-> (lob/empty-lob) lob
    (reduce (fn [lob [px sz id time]] (lob/insert lob ::lob/asks px id nil sz)) lob asks)
    (reduce (fn [lob [px sz id time]] (lob/insert lob ::lob/bids px id nil sz)) lob bids)
    (assoc lob :sequence sequence)))

One thing to note is that the Coinbase level 3 lob gives us order ids for each order but not the time they were placed. So we lob/insert them with a time of nil. In effect this will cause all the orders initialized in this lob to behave as if they were added before any orders which ARE lob/inserted with a time value. This is good enough for our usage.

(cb-level3->lob (<!! (req-level3 true (a/chan))))

;;=>
{:dev.jt.lob/asks {31089M {#uuid "0d4982a1..." [nil 3009.99903649M]},
                   31089.01M {#uuid "7c517469..." [nil 4771M]},
                   31089.02M {#uuid "e9dba70f..." [nil 6020M]},
                   ...},
 :dev.jt.lob/bids {31088.98M {#uuid "18a0459a..." [nil 3009.9993567M]},
                   31088.97M {#uuid "fd7aa4f0..." [nil 4771M]},
                   31088.96M {#uuid "b865e258..." [nil 6020M]},
                   ...},
 :sequence 243573659}

The Websocket Feed

Now we have a base LOB to work off of, it's time to update it with a websocket connection.

(def websocket-url "wss://ws-feed.pro.coinbase.com")
(def sandbox-websocket-url "wss://ws-feed-public.sandbox.pro.coinbase.com")

ws->clj is a function to establish a websocket connection, puts the websocket result object on a core.async channel and puts every text message received on the socket onto a seperate channel. We'll also report the status changes with our info function.

(defn ws->ch [url open-ch recv-ch]
  (ws/build-websocket url
    {:on-text  (let [sa (atom "")]
                 (fn [ws s last?]
                   (let [s (swap! sa str s)]
                     (when last?
                       (a/put! recv-ch s)
                       (reset! sa "")))))
     :on-open  (fn [ws]
                 (info "ws opened")
            
View on GitHub
GitHub Stars19
CategoryDevelopment
Updated2y ago
Forks0

Languages

Clojure

Security Score

60/100

Audited on Aug 21, 2023

No findings