import requests import sqlite3 import time import schedule from datetime import datetime # --- Configuration for Item Prices --- API_URL = "https://api.uexcorp.space/2.0/items_prices_all" with open("uex_api_key", "r") as f: BEARER_TOKEN = f.read().strip() DB_NAME = "items.db" # Using a dedicated DB file TABLE_NAME = "item_prices" def setup_item_database(): """ Sets up the SQLite database and creates the item_prices table if it doesn't exist. """ conn = sqlite3.connect(DB_NAME) cursor = conn.cursor() # Schema is derived from the new API documentation. cursor.execute(f''' CREATE TABLE IF NOT EXISTS {TABLE_NAME} ( id INTEGER, id_item INTEGER, id_terminal INTEGER, id_category INTEGER, price_buy REAL, price_sell REAL, date_added INTEGER, date_modified INTEGER, item_name TEXT, item_uuid TEXT, terminal_name TEXT, PRIMARY KEY (id_item, id_terminal) ) ''') conn.commit() conn.close() print(f"Database setup complete. Table '{TABLE_NAME}' is ready.") def fetch_item_data_from_api(): """ Fetches the latest item price data from the UAX Corp API. Returns the data as a list of dictionaries or None if an error occurs. """ headers = { "Authorization": f"Bearer {BEARER_TOKEN}" } try: response = requests.get(API_URL, headers=headers) response.raise_for_status() # Check for HTTP errors data = response.json() if 'data' in data: return data['data'] return data except requests.exceptions.RequestException as e: print(f"Error fetching item data from API: {e}") return None def sync_data_with_db(data): """ Synchronizes the database with the fetched API data. It de-duplicates the source data, then deletes all old entries and inserts the new ones in a single transaction. """ if not data: print("No data received from API. Database will not be changed.") return # --- De-duplication Step --- # The API is returning duplicates for (id_item, id_terminal). # We will process the list and keep only the one with the latest 'date_modified'. unique_items = {} for item in data: key = (item.get('id_item'), item.get('id_terminal')) if key not in unique_items or item.get('date_modified') > unique_items[key].get('date_modified'): unique_items[key] = item # The final list of records to insert is the values of our de-duplicated dictionary. clean_data = list(unique_items.values()) print(f"Received {len(data)} records from API. After de-duplication, {len(clean_data)} unique records will be processed.") conn = None try: conn = sqlite3.connect(DB_NAME) cursor = conn.cursor() # --- Start Transaction --- # 1. Delete all existing records from the table. cursor.execute(f"DELETE FROM {TABLE_NAME}") print(f"Cleared all old records from '{TABLE_NAME}'.") # 2. Prepare the new, clean records for insertion. records_to_insert = [] for item in clean_data: # Use the clean_data list now records_to_insert.append(( item.get('id'), item.get('id_item'), item.get('id_terminal'), item.get('id_category'), item.get('price_buy'), item.get('price_sell'), item.get('date_added'), item.get('date_modified'), item.get('item_name'), item.get('item_uuid'), item.get('terminal_name') )) # 3. Insert all new records. sql_statement = f''' INSERT INTO {TABLE_NAME} ( id, id_item, id_terminal, id_category, price_buy, price_sell, date_added, date_modified, item_name, item_uuid, terminal_name ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''' cursor.executemany(sql_statement, records_to_insert) # --- Commit Transaction --- conn.commit() print(f"Successfully synchronized {len(records_to_insert)} records into the database.") except sqlite3.Error as e: print(f"Database error: {e}") if conn: print("Rolling back changes.") conn.rollback() finally: if conn: conn.close() def item_sync_job(): """The main job function to be scheduled for syncing item prices.""" print(f"--- Running item sync job at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ---") api_data = fetch_item_data_from_api() sync_data_with_db(api_data) print("--- Item sync job finished ---") if __name__ == "__main__": # 1. Set up the database and table on the first run. setup_item_database() # 2. Run the job immediately once when the script starts. item_sync_job() # 3. Schedule the job to run every hour. print(f"Scheduling item sync job to run every hour. Press Ctrl+C to exit.") schedule.every().hour.do(item_sync_job) # 4. Run the scheduler loop. while True: try: schedule.run_pending() time.sleep(1) except KeyboardInterrupt: print("\nExiting scheduler.") break