Load Balancing Stock Market Trades From A WebSocket Feed
Did you know you can get a live feed of every trade in the stock market?
Unlike other market APIs which tell you “don’t request too many symbols” or set hard caps on what you can request, here you can just ask for the entire market and start receiving every trade as it posts to a tape. You don’t have to limit yourself to your favorite 5 or 30 symbols—you can receive live trade details for all 6,000+ symbols in real time over a text websocket feed. cool right?
How would you make a simple system to consume the entire US market median transaction rate of 2,000 trades per second while also having enough processing headroom to accommodate the daily 600k to 1M trade per second bursts around market open (0930 ET) and close (~1550 ET)?1
Consuming the median trade rate for the entire market (or the trades for a small number of symbols) is easy enough. A regular python script can read in a loop to process all the data, but the problem is at the start and end of the day when trade rates spike from 2,000 per second up to 600k to 1M+ per second in rapid bursts. My first attempt at just having a python websocket client running during those bursts resulted in a processing backlog of 20+ minutes as everything to longer and longer to flush out. You really don’t want 20+ minute delayed decisions when you’re trying to read live data for real-time decisions.
I ended up with this load balancing architecture capable of routing 2 million trades per second without noticeable delays:
Why
First, why do all this yourself? Quote APIs typically provide bars on their own (and polygon even provides 1-second bars for streaming), so why bother reading 50 million trades yourself every day?
Maybe you want to ignore the regulation trade bar complex rules and specify your own conditions for “bar worthy” trades to cover different scenarios?
Maybe you want to collect your own real time dark pool trade statistics, which are never counted in official bars, but are still always reported in the tape?
Maybe you want to create a scanner to see when old off-market trades post?
Maybe just for fun?
Performance Tricks
To hash and forward up to 2 million trades per second, my trade load balancer is optimized to:
- use kernel TLS so the kernel decrypts data directly into our client buffer with no additional user-required copying
- give the kernel a large write buffer so we hopefully minimize the number of
read()
system calls required - manage websocket frame offsets manually inside our buffer instead of only requesting one-frame-per-read syscall.
- manage a static continuation buffer for processing partial websocket frames
- treat JSON objects as just text without any validation or conversion
- hash the trade symbol in each trade JSON to an outbound sending pipe for a worker to read
Overhead of receiving websockets
WebSockets aren’t the most efficient framing protocol because the layers-upon-layers-upon-layers design can fragment at every level and require extra buffer space to remember previous data for re-combining.
A websocket connection is formed by:
- a TCP stream
- containing a TLS session (if in production)
- containing one of 6 kinds of WebSocket frames (text, binary, close, ping, pong, or continue which is a large text or binary frame split across multiple sub-frames)
- containing user data (in our case, containing a JSON array of JSON objects describing trades)
The Data Flow
I wrote the data parser as a rampant layering violation. The websocket frame decode logic also reads the JSON contents and forwards the per-trade JSON to worker pipes.
With a normal websocket library, the websocket-frame-as-data abstraction requires:
- accumulate data from network until one websocket frame is complete
- allocate new data buffer
- copy websocket frame to data buffer
- give websocket frame to client
- client converts websocket frame content from JSON array to native types
- client iterates over native array to hash each symbol to a worker
- client sends each native object to the hashed worker pipe as JSON again (which requires re-encoding individual objects as JSON again, re-allocating, etc)
- client frees websocket frame
- repeat
When trying to process a million items per second, you have to process each data point in, at most, 1 microsecond. 1 us doesn’t leave any room for per-frame allocation, freeing, or redundant copying.
But with a custom data-aware websocket parser we can:
- accumulate data from network into a pre-allocated static buffer until we have one or more complete websocket frames (which we can now fully process in a fast loop instead of needing to read from the kernel again for a while)
- read JSON objects using simple byte offsets and text searching for matching quotes all the way to end-of-object braces
- hash symbol inside objects to worker pipe IDs
- write JSON text to worker pipe using byte offsets inside our static websocket frame buffer (no copying or converting JSON to native types!)
- periodically (every 8192 trades) report stats about:
- number of
read()
syscalls (reads) - number of websocket frames inside each
read()
(frames per read) - number of JSON objects/trades inside each websocket frame (trades per frame)
- total trades parsed
- total bytes received
- current throughput reported as trades per second
- number of
- repeat
By writing a dirty websocket client with data processing embedded in the frame decoding, we avoid anywhere from 3 to 7 allocation and free calls per frame which would be required by a “proper” step-by-step pass-it-forward architecture.
For any high speed and low latency data processing it’s always best to find a way to iterate your data in a loop as fast as possible, then pause/block/idle while waiting for more data again. CPUs love operating in loops, so help them help you as much as possible.
Under normal conditions (meaning: no websocket continuation frames), we don’t need any allocations or memory copies for trade load balancing because we are just reading websocket frames into, essentially, a large circular buffer, then we operate using single-pass byte offsets inside the buffer directly.
The Interface
The trade balancer doesn’t have an interactive interface, but it does print stats output every 8192 trades processed.
Start of Day
Here’s the trade load balancer stats output for start of a trading day (specifically, this was Friday, November 27 2020 around 0930 ET). Note how the processed trade rate goes from 572 trades per second to 1500/sec to 4500/sec to 33k to 40k to 53k then it hits 800k/sec less than one second later! (also, these are instantaneous rates and not total trades processed).
First Batch
Stats Fields
The fields per row are:
- [timestamp] — epoch seconds to a 5 decimal resolution (“right-truncated microseconds” basically)
- r — number of
read()
system calls it took to process this trade batch (each report batch is 8192 trades)- notice how sometimes r is 1 or even 0 — this means we are still processing our previously saved buffer and aren’t reading much (if any) new data for this batch of 8192 trades. Being able to consume our previously-saved buffer in a loop without further
read()
syscalls is how we hit such high performance numbers (up to 2M trades/second below).
- notice how sometimes r is 1 or even 0 — this means we are still processing our previously saved buffer and aren’t reading much (if any) new data for this batch of 8192 trades. Being able to consume our previously-saved buffer in a loop without further
- f — number of WebSocket frames received during this batch
- each
read()
call usually contains multiple WebSocket frames, so f is higher than r - also, if we are processing from saved buffer only, r will be low (or 0) and we just process previously-saved frames in a loop
- each
- f/r — frames per read (so just f/r)
- t/f — trades per frame (each WebSocket frame holds a JavaScript array of trades, so this is an average of how long the trade array was inside each WebSocket frame for this stats report)
- tt - total trades processed during this connection
- ttt - total trades processed during this server lifetime (if the upstream WebSocket server disconnects, we aggressively reconnect and keep our running total trade number ttt increasing while resetting tt to zero for the new connection)
- br — total bytes received by
read()
system calls for this stats batch- when using kernel TLS, we are only counting decrypted bytes; we have no insight to the encrypted byte count
- also note, br can be zero if we are still processing a large previously
read()
buffer without needingread()
again
- bt — total
read()
bytes received over the lifetime of the load balancer uptime - {trades per second} — instantaneous trade per second rate
- though, since a stats line is only reported every 8192 trades, the trade rate is also the rate of stats printing.
- if we are logging data at early trade time like 0500 ET, the logger may have 5 to 10 minutes between stats output lines because it takes so long for 8192 trades to hit the market at such low volume.
Also, the Sending empty pong… reply is just debug output showing we are still replying to WebSocket ping
frames sent by the upstream server (this upstream server isn’t including any ping
body to duplicate for the reply pong
, as the WebSocket protocol allows some servers to do, so we can send a statically allocated empty pong
response every time without needing to format it for each reply).
Second Batch
The next 14 seconds of the day after the previous screen.
Third Batch
The next 20 seconds after the previous screen.
Random Burst
Here’s a random trade burst in the middle of the day at 1220 ET
Trade goes from a median level of around 3,500 per second to a burst of almost 80k trades per second then goes back down.
Here’s a ramp from steady state of less than 2,500 trades per second to a sudden 50k/second trade burst.
Also notice how during larger bursts, each WebSocket frame holds more trades. At the 2,300 per second rate, each frame has less than 3 trades. At the 50k per second rate, each frame is now up to 15 trades because the data provider could bundle more trades together before reaching their WebSocket frame completion time cutoff.
Maximum Performance Testing
A live stock market is nice and all, but what about seeing how fast we can actually chew through trades?
In the stats below, I was running a saved stock market snapshot as fast as possible (which is also why all the frame and trade line up nicely: every transfer is 128 trades in one WebSocket frame). Instead of using production trade connection, I’m using a localhost connection where I feed it millions of trades and have the trade load balancer route them back out again.
You can see we reach a sustained rate of almost 2 million trades forwarded per second using a single core on my KVM instance (via Intel(R) Xeon(R) CPU E5-2650 v2 @ 2.60GHz):
Here’s a continuation of the test with the speed still increasing. This includes full hashing of each trade entry including writing each trade out to the sharded pipes for workers to read, all happening during the WebSocket frame parsing.
Reasons
I decided on oversized Linux pipes communicating to forked worker processes as the IPC mechanism because pipes are the fastest IPC (with free blocking or async notification built-in!) and Linux allows you to increase pipe buffers up to 2 GB max (other systems limit you to 64 KB pipes). Using pipes as IPC does limit you to one-system scalability (and they have ugly blocking behavior if they do reach capacity), but in practice modern servers have 32+ cores which is more than enough for spawning concurrent workers.
The kernel TLS is a big help because it simplifies network processing paths and error conditions. The only difference between the encrypted and unencrypted code paths is how we connect to the server. All after-connection receive logic is agnostic to whether we are running unencrypted for localhost testing or fully encrypted for production use.
Parsing the JSON as text with fixed field offsets is also a huge help because it removes any validation or conversion overhead which was a big performance hit in the Python attempt. Also combining the symbol hashing and direct trade forwarding writes into the frame parser loop makes this the fastest trade-json-over-websocket forwarding system possible.
The Code
You can evaluate the code at https://github.com/mattsta/trade-balancer
The trade load balancer code is only the load balancer portion. It doesn’t include the actual trade evaluation and processing code yet, but you can easily tell the code where to fork your own worker process to receive the inbound pipes then process from there (good luck though, I’ve spent six months refining my trade processing logic and I still don’t quite trust it).
My concurrent trade processing/reporting/graphing code will be released eventually after it can be packaged more cleanly than its current state of being spread across 8 different directories with custom PYTHONPATH
notes everywhere.
Next Steps
So what about actually processing the trade data? We’ll save those fancy details for another post.
For processing trade data, I collect trades into temporal buckets every 30, 90, and 180 seconds, then each time bar maintains its own rolling stats infrastructure for simple things like VWAP and RSI or more complicated things like H-A bars and custom volatility-sensitive SAR buy/sell points.
Releasing the trade processing infrastructure will be next, but it’s more important to release the trade load balancer implementation because without being able to read all trades from the network in a scalable way, processing trades reliably can’t happen, but now it can happen and it do happen and it will have had happen happening now for next time.
Don’t be not un-data-processing, be good do data processing.
matt is available for consulting, holiday parties, birthday gatherings, and delivering unsolicited rational judgments
matt loves you
Also, the only reason this load balancing is required is because stock data providers aggressively charge per data feed, so your account can only open one network connection at a time. If the upstream providers allowed multiple connections, we would probably just have our worker processes each subscribe to their own individual symbols for direct reading and we could skip this entire “single network feed needing to reach 50+ worker processes” load balancing hack.
Basically every time you have a live feed of stock data from any provider, you end up writing your own internal load balancer or pub/sub proxy service to split the feed out into multiple consumers on your backend side (since the upstream providers always limit you to one network connection at a time from their viewpoint).↩