import requests import sqlite3 import time import schedule from datetime import datetime # --- Configuration --- API_URL = "https://api.uexcorp.space/2.0/commodities_prices_all" with open("uex_api_key", "r") as f: BEARER_TOKEN = f.read().strip() DB_NAME = "commodities.db" TABLE_NAME = "commodity_prices" def setup_database(): """ Sets up the SQLite database and creates the table if it doesn't exist. The table uses a composite primary key (id_commodity, id_terminal) to ensure each commodity at each terminal has only one latest entry. """ conn = sqlite3.connect(DB_NAME) cursor = conn.cursor() # Using "IF NOT EXISTS" prevents errors on subsequent runs. # The schema is derived from your provided image. # We use INSERT OR REPLACE later, so a primary key is important. # (id_commodity, id_terminal) is a good candidate for a unique key. cursor.execute(f''' CREATE TABLE IF NOT EXISTS {TABLE_NAME} ( id INTEGER, id_commodity INTEGER, id_terminal INTEGER, price_buy REAL, price_buy_avg REAL, price_sell REAL, price_sell_avg REAL, scu_buy REAL, scu_buy_avg REAL, scu_sell_stock REAL, scu_sell_stock_avg REAL, scu_sell REAL, scu_sell_avg REAL, status_buy INTEGER, status_sell INTEGER, date_added INTEGER, date_modified INTEGER, commodity_name TEXT, commodity_code TEXT, commodity_slug TEXT, terminal_name TEXT, terminal_code TEXT, terminal_slug TEXT, PRIMARY KEY (id_commodity, id_terminal) ) ''') conn.commit() conn.close() print("Database setup complete. Table 'commodity_prices' is ready.") def fetch_data_from_api(): """ Fetches the latest commodity data from the UAX Corp API. Returns the data as a list of dictionaries or None if an error occurs. """ headers = { "Authorization": f"Bearer {BEARER_TOKEN}" } try: response = requests.get(API_URL, headers=headers) # Raise an exception for bad status codes (4xx or 5xx) response.raise_for_status() data = response.json() if 'data' in data: return data['data'] else: # Handle cases where the structure might be flat return data except requests.exceptions.RequestException as e: print(f"Error fetching data from API: {e}") return None def save_data_to_db(data): """ Saves the fetched data into the SQLite database. Uses 'INSERT OR REPLACE' to update existing records for a commodity/terminal pair or insert new ones. """ if not data: print("No data to save.") return conn = sqlite3.connect(DB_NAME) cursor = conn.cursor() # Prepare data for insertion records_to_insert = [] for item in data: # The order of values must match the table schema records_to_insert.append(( item.get('id'), item.get('id_commodity'), item.get('id_terminal'), item.get('price_buy'), item.get('price_buy_avg'), item.get('price_sell'), item.get('price_sell_avg'), item.get('scu_buy'), item.get('scu_buy_avg'), item.get('scu_sell_stock'), item.get('scu_sell_stock_avg'), item.get('scu_sell'), item.get('scu_sell_avg'), item.get('status_buy'), item.get('status_sell'), item.get('date_added'), item.get('date_modified'), item.get('commodity_name'), item.get('commodity_code'), item.get('commodity_slug'), item.get('terminal_name'), item.get('terminal_code'), item.get('terminal_slug') )) # Using executemany is much more efficient than one by one sql_statement = f''' INSERT OR REPLACE INTO {TABLE_NAME} ( id, id_commodity, id_terminal, price_buy, price_buy_avg, price_sell, price_sell_avg, scu_buy, scu_buy_avg, scu_sell_stock, scu_sell_stock_avg, scu_sell, scu_sell_avg, status_buy, status_sell, date_added, date_modified, commodity_name, commodity_code, commodity_slug, terminal_name, terminal_code, terminal_slug ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''' cursor.executemany(sql_statement, records_to_insert) conn.commit() conn.close() print(f"Successfully saved/updated {len(records_to_insert)} records to the database.") def job(): """The main job function to be scheduled.""" print(f"--- Running job at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ---") api_data = fetch_data_from_api() if api_data: save_data_to_db(api_data) print("--- Job finished ---") if __name__ == "__main__": # 1. Set up the database and table on the first run. setup_database() # 2. Run the job immediately once when the script starts. job() # 3. Schedule the job to run every hour. print(f"Scheduling job to run every hour. Press Ctrl+C to exit.") schedule.every().hour.do(job) # 4. Run the scheduler loop. while True: schedule.run_pending() time.sleep(1)