SC-Discord-Bot/llm_tools/get_items.py

163 lines
5.2 KiB
Python

import requests
import sqlite3
import time
import schedule
from datetime import datetime
# --- Configuration for Item Prices ---
API_URL = "https://api.uexcorp.space/2.0/items_prices_all"
with open("uex_api_key", "r") as f:
BEARER_TOKEN = f.read().strip()
DB_NAME = "items.db" # Using a dedicated DB file
TABLE_NAME = "item_prices"
def setup_item_database():
"""
Sets up the SQLite database and creates the item_prices table if it doesn't exist.
"""
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
# Schema is derived from the new API documentation.
cursor.execute(f'''
CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
id INTEGER,
id_item INTEGER,
id_terminal INTEGER,
id_category INTEGER,
price_buy REAL,
price_sell REAL,
date_added INTEGER,
date_modified INTEGER,
item_name TEXT,
item_uuid TEXT,
terminal_name TEXT,
PRIMARY KEY (id_item, id_terminal)
)
''')
conn.commit()
conn.close()
print(f"Database setup complete. Table '{TABLE_NAME}' is ready.")
def fetch_item_data_from_api():
"""
Fetches the latest item price data from the UAX Corp API.
Returns the data as a list of dictionaries or None if an error occurs.
"""
headers = {
"Authorization": f"Bearer {BEARER_TOKEN}"
}
try:
response = requests.get(API_URL, headers=headers)
response.raise_for_status() # Check for HTTP errors
data = response.json()
if 'data' in data:
return data['data']
return data
except requests.exceptions.RequestException as e:
print(f"Error fetching item data from API: {e}")
return None
def sync_data_with_db(data):
"""
Synchronizes the database with the fetched API data.
It de-duplicates the source data, then deletes all old entries and
inserts the new ones in a single transaction.
"""
if not data:
print("No data received from API. Database will not be changed.")
return
# --- De-duplication Step ---
# The API is returning duplicates for (id_item, id_terminal).
# We will process the list and keep only the one with the latest 'date_modified'.
unique_items = {}
for item in data:
key = (item.get('id_item'), item.get('id_terminal'))
if key not in unique_items or item.get('date_modified') > unique_items[key].get('date_modified'):
unique_items[key] = item
# The final list of records to insert is the values of our de-duplicated dictionary.
clean_data = list(unique_items.values())
print(f"Received {len(data)} records from API. After de-duplication, {len(clean_data)} unique records will be processed.")
conn = None
try:
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
# --- Start Transaction ---
# 1. Delete all existing records from the table.
cursor.execute(f"DELETE FROM {TABLE_NAME}")
print(f"Cleared all old records from '{TABLE_NAME}'.")
# 2. Prepare the new, clean records for insertion.
records_to_insert = []
for item in clean_data: # Use the clean_data list now
records_to_insert.append((
item.get('id'),
item.get('id_item'),
item.get('id_terminal'),
item.get('id_category'),
item.get('price_buy'),
item.get('price_sell'),
item.get('date_added'),
item.get('date_modified'),
item.get('item_name'),
item.get('item_uuid'),
item.get('terminal_name')
))
# 3. Insert all new records.
sql_statement = f'''
INSERT INTO {TABLE_NAME} (
id, id_item, id_terminal, id_category, price_buy, price_sell,
date_added, date_modified, item_name, item_uuid, terminal_name
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
'''
cursor.executemany(sql_statement, records_to_insert)
# --- Commit Transaction ---
conn.commit()
print(f"Successfully synchronized {len(records_to_insert)} records into the database.")
except sqlite3.Error as e:
print(f"Database error: {e}")
if conn:
print("Rolling back changes.")
conn.rollback()
finally:
if conn:
conn.close()
def item_sync_job():
"""The main job function to be scheduled for syncing item prices."""
print(f"--- Running item sync job at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ---")
api_data = fetch_item_data_from_api()
sync_data_with_db(api_data)
print("--- Item sync job finished ---")
if __name__ == "__main__":
# 1. Set up the database and table on the first run.
setup_item_database()
# 2. Run the job immediately once when the script starts.
item_sync_job()
# 3. Schedule the job to run every hour.
print(f"Scheduling item sync job to run every hour. Press Ctrl+C to exit.")
schedule.every().hour.do(item_sync_job)
# 4. Run the scheduler loop.
while True:
try:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
print("\nExiting scheduler.")
break