SC-Discord-Bot/llm_tools/get_commodities.py

169 lines
5.2 KiB
Python

import requests
import sqlite3
import time
import schedule
from datetime import datetime
# --- Configuration ---
API_URL = "https://api.uexcorp.space/2.0/commodities_prices_all"
with open("uex_api_key", "r") as f:
BEARER_TOKEN = f.read().strip()
DB_NAME = "commodities.db"
TABLE_NAME = "commodity_prices"
def setup_database():
"""
Sets up the SQLite database and creates the table if it doesn't exist.
The table uses a composite primary key (id_commodity, id_terminal)
to ensure each commodity at each terminal has only one latest entry.
"""
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
# Using "IF NOT EXISTS" prevents errors on subsequent runs.
# The schema is derived from your provided image.
# We use INSERT OR REPLACE later, so a primary key is important.
# (id_commodity, id_terminal) is a good candidate for a unique key.
cursor.execute(f'''
CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
id INTEGER,
id_commodity INTEGER,
id_terminal INTEGER,
price_buy REAL,
price_buy_avg REAL,
price_sell REAL,
price_sell_avg REAL,
scu_buy REAL,
scu_buy_avg REAL,
scu_sell_stock REAL,
scu_sell_stock_avg REAL,
scu_sell REAL,
scu_sell_avg REAL,
status_buy INTEGER,
status_sell INTEGER,
date_added INTEGER,
date_modified INTEGER,
commodity_name TEXT,
commodity_code TEXT,
commodity_slug TEXT,
terminal_name TEXT,
terminal_code TEXT,
terminal_slug TEXT,
PRIMARY KEY (id_commodity, id_terminal)
)
''')
conn.commit()
conn.close()
print("Database setup complete. Table 'commodity_prices' is ready.")
def fetch_data_from_api():
"""
Fetches the latest commodity data from the UAX Corp API.
Returns the data as a list of dictionaries or None if an error occurs.
"""
headers = {
"Authorization": f"Bearer {BEARER_TOKEN}"
}
try:
response = requests.get(API_URL, headers=headers)
# Raise an exception for bad status codes (4xx or 5xx)
response.raise_for_status()
data = response.json()
if 'data' in data:
return data['data']
else:
# Handle cases where the structure might be flat
return data
except requests.exceptions.RequestException as e:
print(f"Error fetching data from API: {e}")
return None
def save_data_to_db(data):
"""
Saves the fetched data into the SQLite database.
Uses 'INSERT OR REPLACE' to update existing records for a
commodity/terminal pair or insert new ones.
"""
if not data:
print("No data to save.")
return
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
# Prepare data for insertion
records_to_insert = []
for item in data:
# The order of values must match the table schema
records_to_insert.append((
item.get('id'),
item.get('id_commodity'),
item.get('id_terminal'),
item.get('price_buy'),
item.get('price_buy_avg'),
item.get('price_sell'),
item.get('price_sell_avg'),
item.get('scu_buy'),
item.get('scu_buy_avg'),
item.get('scu_sell_stock'),
item.get('scu_sell_stock_avg'),
item.get('scu_sell'),
item.get('scu_sell_avg'),
item.get('status_buy'),
item.get('status_sell'),
item.get('date_added'),
item.get('date_modified'),
item.get('commodity_name'),
item.get('commodity_code'),
item.get('commodity_slug'),
item.get('terminal_name'),
item.get('terminal_code'),
item.get('terminal_slug')
))
# Using executemany is much more efficient than one by one
sql_statement = f'''
INSERT OR REPLACE INTO {TABLE_NAME} (
id, id_commodity, id_terminal, price_buy, price_buy_avg, price_sell,
price_sell_avg, scu_buy, scu_buy_avg, scu_sell_stock, scu_sell_stock_avg,
scu_sell, scu_sell_avg, status_buy, status_sell, date_added, date_modified,
commodity_name, commodity_code, commodity_slug, terminal_name, terminal_code,
terminal_slug
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
'''
cursor.executemany(sql_statement, records_to_insert)
conn.commit()
conn.close()
print(f"Successfully saved/updated {len(records_to_insert)} records to the database.")
def job():
"""The main job function to be scheduled."""
print(f"--- Running job at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ---")
api_data = fetch_data_from_api()
if api_data:
save_data_to_db(api_data)
print("--- Job finished ---")
if __name__ == "__main__":
# 1. Set up the database and table on the first run.
setup_database()
# 2. Run the job immediately once when the script starts.
job()
# 3. Schedule the job to run every hour.
print(f"Scheduling job to run every hour. Press Ctrl+C to exit.")
schedule.every().hour.do(job)
# 4. Run the scheduler loop.
while True:
schedule.run_pending()
time.sleep(1)