In parts 1 and 2, we build the infrastructure for monitoring real-time prices and storing them in the database. In part 3, we will add a Trading Engine that will analyze each datapoint and then decide whether or not to execute a trade. For now, we will just print output to the console and return model_id values to the calling function.
To accomplish this, we need to do the following:
1. Define the “pools” of money we will use to trade
2. Define the models, or criteria, that we will use to make trades.
3. Create a stored procedure to evaluate inbound prices and models
4. Modify our getPrices function to stick incoming prices into the evaluate queue
Defining Trading Pools
The first thing we need to do is to be able to define how much money we want to use to trade each symbol. We will call this a “pool”.
Each pool will have its own unique ID, a specific starting dollar amount, and assigned exchange/symbol combination. I have also added a backtesting flag so that we can create models for backtesting only. Note that I intentionally did not make pool_id auto increment as they should be relatively static and this also makes it easier to port values between my development environment and production. Here is the SQL for creating the trading_pools table:
use aither_crypto;
drop table if exists trading_pools;
create table trading_pools
(
pool_id bigint unsigned not null unique,
exchange varchar(20),
symbol varchar(10),
total_amount decimal(10,2),
is_backtest tinyint(1) default 0,
primary key (pool_id)
);
Now let’s insert our first trading pool to use – $10,000 to trade Bitcoin! I am using 0 as an arbitrary pool_id that I chose:
insert into trading_pools (pool_id, exchange, symbol, total_amount, is_backtest)
values (0, 'coinbase', 'BTC-USD', 10000, 0);
Defining Trading Models
The second thing we need to do is define a structure that will let us define different trading models we want to use to spend our trading pools (for example, “buy when there has been a 2% loss in BTC during the last 24 hours”). We will call this a trading “model”. Each model will have a stored procedure associated with it the will return either a BUY recommendation or no action.
We are going to create a simple table structure to hold simple models that trades purely on percent gain/loss. Feel free to modify this structure to make it work for your needs.
Our trading_models table is defined as:
use aither_crypto;
drop table if exists trading_models;
create table trading_models
(
model_id varchar(50) not null unique,
exchange varchar(20),
symbol varchar(10),
proc_name_buy varchar(255),
target_level_buy decimal(10,2) default null,
enabled tinyint default 1,
primary key (model_id)
);
Again, the model_id is an arbitrary, unique identifier that you define. proc_name is the stored procedure that will contain the guts of the trading model. Each stored procedure will have a standard definition that takes in a price point, model_id, and pool_id to decide if a trade should be made. Finally, target_level_buy, take_profit_percentage, and stop_loss_percent define how our model will enter and exit the market. (If we set target_level_buy to null, then a buy will be initiated at the current market price).
Let’s go ahead and create a model in our table:
insert into trading_models (model_id, exchange, symbol, proc_name_buy, target_level_buy)
values ('CB_BTC_AITHERCRYPTO', 'coinbase', 'BTC-USD', 'sp_evaluate_buy_CB_BTC_AITHERCRYPTO', 27525.00);
Notice that as part of this insert, I added the stored procedure name sp_evaluate_buy_CB_BTC_AITHERCRYPTO. We will build this stored procedure shortly.
Relating Models to Pools
Now that we have created our models and pools, we need to assign trading models to each pool. The data structure allows you to assign multiple models to a single pool, but for our purposes we will keep a 1:1 ratio for models to pools (again, feel free to add complexity on your own).
We will use a simple join table to assign trading models to a pool:
drop table if exists trading_pool_models;
create table trading_pool_models
(
pool_id bigint,
model_id varchar(50),
primary key (pool_id, model_id)
);
And, inserting a relationship between the pool and model we created above:
insert into trading_pool_models(pool_id, model_id)
values(0, 'CB_BTC_AITHERCRYPTO');
The above relationship means that the trading model CB_BTC_AITHERCRYPTO will use pool 0 for executing trades.
Creating the Stored Procedures for Model Evaluation
As we see in the trading_models table above, each model will have an evaluation stored procedure defined to determine if a BUY should be executed. The stored proc will have a standard input definition of date time and model_id. From that information, the proc will look up details of the model and pool to decide if a trade should be made.
The buy stored procedure we need to create for our lone model is sp_evaluate_buy_CB_BTC_AITHERCRYPTO (the one we added in the INSERT statement above).
This procedure will check the conditions we have set in the models table against the current price (see blog posts 1 and 2) and make a recommendation on whether to buy or not. In this case, we just want to check if the last price is lower than the target level at which the model says to buy.
use aither_crypto;
drop procedure if exists sp_evaluate_buy_CB_BTC_AITHERCRYPTO;
delimiter $$
create procedure sp_evaluate_buy_CB_BTC_AITHERCRYPTO
(
locDate datetime,
locModelID varchar(50)
)
this_proc:begin
declare dLastPrice decimal(10,2);
declare dTargetLevelBuy decimal(10,2);
declare sExchange varchar(20);
declare sSymbol varchar(10);
-- Get model details
select
m.exchange, m.symbol, m.target_level_buy
into @sExchange, @sSymbol, @dTargetLevelBuy
from trading_models m
where
m.model_id=locModelID;
-- Get the last price before the passed in date
select
price into @dLastPrice
from
price_history
where
exchange = @sExchange
and symbol = @sSymbol
and created <= locDate
and created >= date_add(locDate, interval -(1) hour)
order by created desc
limit 1;
-- If the last/latest price is less than the model target level buy, then
-- return a BUY recommendation.
select
'BUY' as 'action',
@dLastPrice as 'last_price',
@dTargetLevelBuy as 'target_price'
where
@dLastPrice<@dTargetLevelBuy;
end$$
delimiter ;
We can test this procedure by running a quick test against known data. The image below shows that the last price of BTC-USD was 27393.56.
Our stored procedure checks to see if the price is below 27525.00 (from the target_buy_level of the model) and will return a buy recommendation if the condition is met. We can test this by using a date after the last price:
call sp_evaluate_buy_CB_BTC_AITHERCRYPTO("2023-05-15 18:32:30", "CB_BTC_AITHERCRYPTO");
And the stored procedure returns:
Adding a Lambda Function to Evaluate BUYs
The final step is to connect our chain of Lambda functions to run BUY evaluations for each model and pool.
In Part 1 and Part 2 of this series, we built an engine that pulled and stored prices. In this section we are going to add a second Lambda function that is run after every price pull to determine if a purchase should be made. We will create a generic function called evaluateModels for this purpose. In a later post, we will also use this function to evaluate SELL actions.
NOTE: I am not going to cover the details of creating a Lambda function as we already did this in Part 1.
To send information in to our new Lambda function, we need to define the event message format we are going to pass from getPrices. We will use a simple format with the token symbol, the price, and the timestamp of the price:
{
"Symbol": "BTC-USD",
"Price": "27393.56",
"Timestamp": "2023-05-15 18:32:30"
}
In the same AWS region that the getPrices Lambda function was created in Part 1, create a new Python 3.8 function and call it evaluateModels. Make sure to use the Aither-Crypto-Lambda-Role we created in Part 1.
After the function is created, add the pymysql module the same way that we did in Part 2. We will use this to pull and evaluate our models from the DB.
Here is the full code for the Lambda function:
import boto3
from datetime import datetime
from datetime import timedelta
import time
import json
import pymysql
import urllib.request
#NOTE: This is a helper function that will parse and return JSON
#from an event message body. I have found this addresses various
#scenarios where messages have slightly different formats between
#production SQS and using the Test functionality.
def parse_message_body_json(message):
message_body_json=None
try:
#Adjust our message body so that it parses correctly
message_body=message["body"]
if isinstance(message_body, list)==True:
list_message_body=message['body']
message_body_json = convert_list_to_dict(list_message_body)[0]
elif isinstance(message_body, dict)==True:
message_body_json=message_body
elif isinstance(message_body, str):
message_body=message_body.replace("'", "\"")
message_body=message_body.strip('"')
if message_body.index('[')!=0:
if message_body.rindex(']')!=len(message_body)-1:
message_body="[%s]" % message_body
#print("Debug 1: %s" % str(message_body))
message_body=json.loads(str(message_body))
message_body_json=message_body[0]
#print(message_body)
except Exception as error:
print(error)
print("Unable to parse JSON.")
#pass
return message_body_json
def lambda_handler(event, context):
#Store recommendations here to return from the function.
arr_buy_recommendations=[]
print("Connecting to DB...")
#Setup our database connection
db_host="<Your DB hostname here>"
db_username="crypto" #OR whatever DB username you created
db_password="<Your DB password here>"
conn = pymysql.connect(host=db_host, user=db_username, password=db_password, \
charset='utf8', db='aither_crypto', cursorclass=pymysql.cursors.DictCursor)
cur = conn.cursor()
print("Parsing symbol/price information from the event object...")
message_body_json=parse_message_body_json(event)
print(message_body_json)
print("Getting pool and model combinations for the symbol...")
sql = """select
p.pool_id,
m.model_id,
m.exchange,
m.symbol,
p.total_amount as 'total_pool_amount',
m.proc_name_buy
from
trading_models m,
trading_pools p,
trading_pool_models pm
where
m.enabled = 1
and p.symbol='%s'
and p.exchange = m.exchange
and p.symbol = m.symbol
and pm.pool_id=p.pool_id
and pm.model_id=m.model_id;""" % message_body_json["Symbol"]
#print(sql)
cur.execute(sql)
result_models = cur.fetchall()
#Evaluate BUY opportunities for each pool/model configuration
print("Evaluating BUY opportunities...")
for row in result_models:
try:
timestamp=message_body_json["Timestamp"]
print("Evaluating symbol %s at %s against model %s..." % (message_body_json["Symbol"], timestamp, row["model_id"]))
sql = "call %s('%s', '%s');" % (row["proc_name_buy"], timestamp, row["model_id"])
print(sql)
cur.execute(sql)
eval_result = cur.fetchall()
#If a row is returned from the proc, then that means we want to BUY.
for eval_row in eval_result:
print("TODO: Placing a BUY order for model=%s..." % (row["model_id"]))
arr_buy_recommendations.append({"model_id":row["model_id"]})
#TODO: Execute Trade Here
except Exception as error:
print(error)
continue
#Clean up
cur.close()
conn.close()
#Return result
return {
'data': message_body_json,
'buy_recommendations': arr_buy_recommendations
}
This code will return the below JSON in the case it detects a BUY:
{
"data": {
"Symbol": "BTC-USD",
"Price": "27393.56",
"Timestamp": "2023-05-15 18:32:30"
},
"buy_recommendations": [
{
"model_id": "CB_BTC_AITHERCRYPTO"
}
]
}
The first thing the evaluateModels function does is generate the SQL to pull all of the model_ids we have in the database that match the symbol that we want to check – in this case, “BTC-USD”. We also pull the proc_name_buy column, so we know what stored procedure to call to evaluate the current price data point.
NOTE: We also pull the total_amount we have allocated to the model, but we do not yet calculate how much of the pool is available. We will add this later when we start making and tracking actual purchases and sales.
print("Getting pool and model combinations for the symbol...")
sql = """select
p.pool_id,
m.model_id,
m.exchange,
m.symbol,
p.total_amount as 'total_pool_amount',
m.proc_name_buy
from
trading_models m,
trading_pools p,
trading_pool_models pm
where
m.enabled = 1
and p.symbol='%s'
and p.exchange = m.exchange
and p.symbol = m.symbol
and pm.pool_id=p.pool_id
and pm.model_id=m.model_id;""" % message_body_json["Symbol"]
After we retrieve the defined models, we use the proc_name_buy to generate SQL to call the evaluation proc for each model – I have highlighted the line in red. We have also defined our model evaluation procs to take a timestamp and model_id and pass that information back into the evaluation proc. The evaluation proc will look up the price details based on the symbol and model details based on the model_id.
This may be a little confusing since we pull model information and pass it into a proc which then pulls model information – but the other option is to pull them in the first query above from the trading_models table and then pass all the details around. I find it more flexible and less prone to error to pass around only the basic info needed. For instance, you could modify the trading_models table to have additional parameter columns and you would only need to modify the guts of the stored proc, instead of pulling more variables into Python and modifying the evaluation proc parameters.
print("Evaluating BUY opportunities...")
for row in result_models:
try:
timestamp=message_body_json["Timestamp"]
print("Evaluating symbol %s at %s against model %s..." % (message_body_json["Symbol"], timestamp, row["model_id"]))
sql = "call %s('%s', '%s');" % (row["proc_name_buy"], timestamp, row["model_id"])
print(sql)
cur.execute(sql)
eval_result = cur.fetchall()
Lastly, we check the result from the evaluation stored procedure and get ready to place an actual BUY. Here we will just print to the console.
#If a row is returned from the proc, then that means we want to BUY.
for eval_row in eval_result:
print("TODO: Placing a BUY order for model=%s..." % (row["model_id"]))
arr_buy_recommendations.append({"model_id":row["model_id"]})
#TODO: Execute Trade Here
Adding a Lambda Function to Evaluate BUYs
The last step is to add a synchronous call to our evaluateModels function directly from the getPrices function. The code below executes this call and waits for the function to return a response.
#Store the result into JSON. We will return this for now, but use it later.
arr_single_coin=[]
dict_single_coin={"Symbol": coin_pair, "Price": spot_details['data']['amount'], "Timestamp":now}
arr_single_coin.append(dict_single_coin)
lambda_client = boto3.client('lambda')
evaluateModels_response = lambda_client.invoke(FunctionName="evaluateModels",
InvocationType='RequestResponse',
Payload=json.dumps(dict_single_coin))
payload=evaluateModels_response['Payload']
payload_json=json.loads(payload.read().decode("utf-8"))
print(payload_json)
json_return.append(arr_single_coin)
When you run the getPrices function, the print(payload_json) line will show you the result of the call to the evaluateModels function. In this case, it shows a buy recommendation for our model_id.
{
"data":{
"Symbol":"BTC-USD",
"Price":"27393.56",
"Timestamp":"2023-05-15 18:32:30"
},
"buy_recommendations":[
{
"model_id":"CB_BTC_AITHERCRYPTO"
}
]
}
And that is it! Now, every time your getPrices function runs, you will also be checking all of your models to see if they recommend you buy.
You can find all the code for this in our GitHub repository.
In Part 4, we will add in trade logging and build out the functionality to evaluate positions for a SELL action.