-
Notifications
You must be signed in to change notification settings - Fork 0
/
api_calls.py
141 lines (125 loc) · 5.68 KB
/
api_calls.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import hashlib
import hmac
import requests
import time
base_url = "https://api.binance.com" #The URL of the Binance API
trading_bot_assets = 493 #Edit this value manually
def timer(func):
def wrapper(*args, **kwargs):
start_time = time.time() # Start time before function execution
result = func(*args, **kwargs) # Function execution
end_time = time.time() # End time after function execution
print(f"{func.__name__} took {end_time - start_time} seconds to run.")
return result
return wrapper
#Gets the API keys from the api_keys.txt file. Insert your keys in the file in the format:
# api_key=your_api_key (No spaces)
# secret_key=your_api_secret (No spaces)
def get_api_keys():
file_path = "api_keys.txt" #Rename if your api_keys file is named differently
api_keys = {}
with open(file_path, 'r') as file:
for line in file:
key, value = line.strip().split('=')
api_keys[key] = value
headers = {
"X-MBX-APIKEY": api_keys["api_key"],
"X-API-SECRET": api_keys["secret_key"]
}
return api_keys["api_key"], api_keys["secret_key"], headers
#Gets the server time from the Binance API
def get_server_time():
url = base_url + "/api/v3/time"
response = requests.get(url)
return response.json()
#Creates a signature for the request
def create_signature(api_secret, params=None):
timestamp = get_server_time()["serverTime"]
query_string = f"timestamp={timestamp}"
if params:
sorted_params = sorted(params.items()) # Sort parameters by key
query_params = '&'.join([f"{k}={v}" for k, v in sorted_params])
query_string += f"&{query_params}" # Add parameters to query string
hmac_obj = hmac.new(api_secret.encode(), query_string.encode(), hashlib.sha256)
timestamp_signature = f"?timestamp={timestamp}&signature={hmac_obj.hexdigest()}"
return timestamp_signature
#Gets BTC/USDT price from the Binance API
def get_btc_price():
url = base_url + "/api/v3/ticker/price?symbol=BTCUSDT"
response = requests.get(url)
return int(float(response.json()["price"])) #Converts the price to an integer
#Gets the user's assets from the Binance API
def get_spot_assets():
spot_assets = {}
api_key, api_secret, headers = get_api_keys()
params = {"needBtcValuation": "true"}
userAssets = f"/sapi/v3/asset/getUserAsset{create_signature(api_secret, params)}"
url = base_url + userAssets
response = requests.post(url, headers=headers, params=params).json()
btc_price = get_btc_price()
for i in response:
if float(i["btcValuation"]) > 0:
spot_assets[i["asset"]] = {"btcValuation": i["btcValuation"], "USDValuation": float(i["btcValuation"]) * btc_price}
return spot_assets
#Gets assets in earn wallet
def get_earn_assets():
earn_assets = {}
api_key, api_secret, headers = get_api_keys()
earn_assets_url = f"/sapi/v1/simple-earn/account{create_signature(api_secret)}"
url = base_url + earn_assets_url
response = requests.get(url, headers=headers).json()
if float(response["totalFlexibleAmountInUSDT"]) > 0:
url = base_url + f"/sapi/v1/simple-earn/flexible/position{create_signature(api_secret)}"
flexible = requests.get(url, headers=headers).json()
earn_assets["Flexible"] = []
for i in flexible["rows"]:
if float(i["totalAmount"]) > 0:
earn_assets["Flexible"].append({"asset": i["asset"], "amount": i["totalAmount"]})
elif float(response["totalLockedAmountInUSDT"]) > 0:
url = base_url + f"/sapi/v1/simple-earn/locked/position{create_signature(api_secret)}"
locked = requests.get(url, headers=headers).json()
earn_assets["Locked"] = []
for i in locked["rows"]:
if float(i["totalAmount"]) > 0:
earn_assets["Locked"].append({"asset": i["asset"], "amount": i["totalAmount"]})
total_assets_value = 0
for i in earn_assets:
for j in earn_assets[i]:
total_assets_value += float(j["amount"])
return earn_assets, round(total_assets_value, 2)
#Gets the total assets in USDT
def get_total_assets_in_USDT():
spot_assets = get_spot_assets()
earn_assets, earn_assets_value = get_earn_assets()
total_assets = {}
coin_values = {}
spot_value = 0
for i in spot_assets:
if i in ["BTC", "USDT", "BNB"]:
coin_values[i] = float(spot_assets[i]["USDValuation"])
spot_value += float(spot_assets[i]["USDValuation"])
for i in earn_assets:
for j in earn_assets[i]:
if j["asset"] in ["BTC", "USDT", "BNB"]:
coin_values[j["asset"]] = float(j["amount"])
total_assets["Spot"] = spot_value
total_assets["Earn"] = earn_assets_value
total_assets["TradingBots"] = trading_bot_assets #API does not support trading bot assets, edit this value manually
total_assets["Total"] = round(float(spot_value) + earn_assets_value + total_assets["TradingBots"], 2)
return total_assets, coin_values
import urllib.parse
def get_all_trades():
api_key, api_secret, headers = get_api_keys()
params = {
"symbol": "BTCUSDT",
"startTime": 1704133438000,
"timestamp": int(time.time() * 1000) # Current timestamp in milliseconds
}
params = dict(sorted(params.items())) # Sort parameters by key
query_string = urllib.parse.urlencode(params) # URL-encode parameters
signature = hmac.new(bytes(api_secret , 'latin-1'), msg = bytes(query_string , 'latin-1'), digestmod = hashlib.sha256).hexdigest()
params['signature'] = signature
trades_url = "/api/v3/myTrades"
url = base_url + trades_url
response = requests.get(url, headers=headers, params=params).json()
return response