In the initial design of FMZ strategy, if asynchronous concurrent operations are required, the ```exchange.Go()``` function can only be used to achieve concurrent execution of FMZ encapsulated interface, and it is not possible to concurrently execute some custom operations (functions). Although this design greatly improves the efficiency of the strategy program, students who have experience in concurrent design in native programming languages often feel very uncomfortable.
Even new students who use FMZ for introductory quantitative trading may not understand the use of the ```exchange.Go()``` function. Using ```exchange.Go()``` still appears to be executing statements one by one in sequentially executed code. In this article, we will explore the use of the newly added concurrent thread functionality in the FMZ platform: the ```__Thread()``` and other related functions, as well as the asynchronous design of strategy programs.
1. Simple concurrent designIf we want the main thread of the strategy to run concurrently with a sub-thread executing a custom function we have written, we can use a design similar to the following code. In the strategy code, define a custom function ```GetTickerAsync()``` and write the specific functionality of this function. This function executes an infinite loop and continuously calls the FMZ API interface ```GetTicker()``` to retrieve market data.
Then, use the statement ```__threadSetData(0, "ticker", t)``` to write data to the main thread. The data name is ```ticker``` and the data value is ```t```, which is the return value of ```GetTicker()```.
```
__threadSetData(0, "ticker", t)
```
After designing the custom function for concurrent thread execution, we can write the code in the ```main()``` function. At the beginning of the ```main()``` function, we use:
```
__Thread(GetTickerAsync, 0) // GetTickerAsync is a custom function that needs to be executed concurrently, and 0 is the parameter that is passed to the GetTickerAsync function.
```
Create a concurrent thread that starts executing the ```GetTickerAsync()``` function. Then, the ```main()``` function starts executing its own ```while``` loop, in which it receives the data updated by the ```GetTickerAsync()``` function and prints it:
```
var t = __threadGetData(0, "ticker")
Log(t)
```
Complete code example:
```
function GetTickerAsync(index) {
while (true) {
var t = exchanges[index].GetTicker()
__threadSetData(0, "ticker", t)
Sleep(500)
}
}
function main() {
__Thread(GetTickerAsync, 0)
while(true) {
var t = __threadGetData(0, "ticker")
Log(t)
Sleep(1000)
}
}
```
Live trading test:
https://stocksharp.com/file/149772
This is one of the simplest application designs, so let's look at some other requirement designs.
2. Concurrent order placement designWe can design a function to create 10 threads simultaneously, each executing an order placement function. In the ```main()``` function, we can design a ```while``` loop to detect strategy interaction commands. When we receive the interaction command ```placeMultipleOrders```, we call the concurrent order placement function ```testPlaceMultipleOrders()```.
```
if (cmd == "placeMultipleOrders") {
// ...
}
```
Add strategy interaction design on the strategy editing page by adding a button with the command: placeMultipleOrders.
https://stocksharp.com/file/149771
Complete code example:
```
function placeOrder(exIndex, type, price, amount) {
var id = null
if (type == "Buy") {
id = exchanges[exIndex].Buy(price, amount)
} else if (type == "Sell") {
id = exchanges[exIndex].Sell(price, amount)
} else {
throw "type error! type:" + type
}
}
function testPlaceMultipleOrders(index, beginPrice, endPrice, step, type, amount) {
Log("beginPrice:", beginPrice, ", endPrice:", endPrice, ", step:", step, ", type:", type, ", amount:", amount)
var tids = []
for (var p = beginPrice; p <= endPrice; p += step) {
var tid = __Thread(placeOrder, index, type, p, amount)
tids.push(tid)
Sleep(10)
}
Sleep(1000)
for (var i = 0; i < tids.length; i++) {
__threadTerminate(tids[i])
}
}
function main() {
while(true) {
LogStatus(_D())
var cmd = GetCommand()
if (cmd) {
if (cmd == "placeMultipleOrders") {
var t = _C(exchange.GetTicker)
var beginPrice = t.Last * 0.8
var endPrice = t.Last * 0.9
var step = t.Last * 0.01
testPlaceMultipleOrders(0, beginPrice, endPrice, step, "Buy", 0.01)
var orders = exchange.GetOrders()
for (var i = 0; i < orders.length; i++) {
Log(orders[i])
}
}
}
Sleep(1000)
}
}
```
- The test adopts pending orders, incrementing from 80% to 90% of the current price, in a simulated trading environment. Click the interaction button to trigger the test order placement.
After clicking the "placeMultipleOrders" button, a message is prompted: The placeMultipleOrders command was sent successfully, please wait for a response from the live trading!
- Strategy log displays concurrent order placement operations:
https://stocksharp.com/file/149770
3. Create a WebSocket connection in a concurrent thread execution functionThis requirement was raised by an FMZ user who wants a simple example demonstrating how to use a **WebSocket** connection in concurrent threads and how to pass data to the ```main()``` function in the main thread.
Actually, it's quite simple and similar to creating concurrent threads in the previous examples. The only difference is that we use the ```__threadPeekMessage()``` and ```__threadPostMessage()``` functions for inter-thread communication. Taking the WebSocket API call for the Binance exchange as an example, we also need to handle the closing operation of the WebSocket connection. The following example demonstrates how to notify a concurrent thread to stop.
Complete code example:
```
var tid = null
function createWS() {
// wss://stream.binance.com:9443/ws/<streamName> , <symbol>@ticker
var stream = "wss://stream.binance.com:9443/ws/btcusdt@ticker"
var ws = Dial(stream)
Log("Create a WS connection:", stream)
while (true) {
var data = ws.read()
if (data) {
__threadPostMessage(0, data)
}
Log("receiving data pushed by the WS link, data:", data)
// __threadPeekMessage timeout parameter set to -1, no blocking
var msg = __threadPeekMessage(-1)
if (msg) {
if (msg == "stop") {
Log("Concurrent Thread Id:", __threadId(), "Received stop command")
break
}
}
}
Log("Concurrent threads finish execution, close ws connection")
ws.close()
}
function main() {
tid = __Thread(createWS)
Log("Create concurrent threads, thread Id:", tid)
while(true) {
// __threadPeekMessage's timeout parameter is set to 0, blocking for data
var data = __threadPeekMessage(0)
Log("Received from concurrent thread", ", Id:", tid, ", the data sent, data:", data, "#FF0000")
var tbl = {
type : "table",
title : "<symbol>@ticker channel push message",
cols : ["Event Type", "Event Time", "Trading Pairs", "24 Hour Price Change", "24 Hour Price Change %", "Average Price", "Last Traded Price", "Volume in 24 Hours", "Turnover in 24 Hours"],
rows : []
}
try {
data = JSON.parse(data)
tbl.rows.push([data.e, _D(data.E), data.s, data.p, data.P, data.w, data.c, data.v, data.q])
} catch (e) {
Log("e.name:", e.name, "e.stack:", e.stack, "e.message:", e.message)
}
LogStatus(_D(), "\n`" + JSON.stringify(tbl) + "`")
}
}
function onexit() {
Log("Finalize function, send a stop command to the concurrent thread with ID ", tid,"")
__threadPostMessage(tid, "stop")
Log("Wait for the concurrent thread with ID ", tid, " to stop")
__threadJoin(tid)
Log("Finalize function execution completed")
}
```
During live trading testing, we can see that the ```main()``` function continuously receives market data from WebSocket connections created by concurrent threads.
When stopping the live trading strategy, the finalize function will start working.
From:
https://blog.mathquant.c...ipt-strategy-design.html