163 lines
5.2 KiB
Python
163 lines
5.2 KiB
Python
import requests
|
|
import sqlite3
|
|
import time
|
|
import schedule
|
|
from datetime import datetime
|
|
|
|
# --- Configuration for Item Prices ---
|
|
API_URL = "https://api.uexcorp.space/2.0/items_prices_all"
|
|
with open("uex_api_key", "r") as f:
|
|
BEARER_TOKEN = f.read().strip()
|
|
DB_NAME = "items.db" # Using a dedicated DB file
|
|
TABLE_NAME = "item_prices"
|
|
|
|
def setup_item_database():
|
|
"""
|
|
Sets up the SQLite database and creates the item_prices table if it doesn't exist.
|
|
"""
|
|
conn = sqlite3.connect(DB_NAME)
|
|
cursor = conn.cursor()
|
|
|
|
# Schema is derived from the new API documentation.
|
|
cursor.execute(f'''
|
|
CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
|
|
id INTEGER,
|
|
id_item INTEGER,
|
|
id_terminal INTEGER,
|
|
id_category INTEGER,
|
|
price_buy REAL,
|
|
price_sell REAL,
|
|
date_added INTEGER,
|
|
date_modified INTEGER,
|
|
item_name TEXT,
|
|
item_uuid TEXT,
|
|
terminal_name TEXT,
|
|
PRIMARY KEY (id_item, id_terminal)
|
|
)
|
|
''')
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
print(f"Database setup complete. Table '{TABLE_NAME}' is ready.")
|
|
|
|
def fetch_item_data_from_api():
|
|
"""
|
|
Fetches the latest item price data from the UAX Corp API.
|
|
Returns the data as a list of dictionaries or None if an error occurs.
|
|
"""
|
|
headers = {
|
|
"Authorization": f"Bearer {BEARER_TOKEN}"
|
|
}
|
|
try:
|
|
response = requests.get(API_URL, headers=headers)
|
|
response.raise_for_status() # Check for HTTP errors
|
|
|
|
data = response.json()
|
|
if 'data' in data:
|
|
return data['data']
|
|
return data
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Error fetching item data from API: {e}")
|
|
return None
|
|
|
|
def sync_data_with_db(data):
|
|
"""
|
|
Synchronizes the database with the fetched API data.
|
|
It de-duplicates the source data, then deletes all old entries and
|
|
inserts the new ones in a single transaction.
|
|
"""
|
|
if not data:
|
|
print("No data received from API. Database will not be changed.")
|
|
return
|
|
|
|
# --- De-duplication Step ---
|
|
# The API is returning duplicates for (id_item, id_terminal).
|
|
# We will process the list and keep only the one with the latest 'date_modified'.
|
|
unique_items = {}
|
|
for item in data:
|
|
key = (item.get('id_item'), item.get('id_terminal'))
|
|
if key not in unique_items or item.get('date_modified') > unique_items[key].get('date_modified'):
|
|
unique_items[key] = item
|
|
|
|
# The final list of records to insert is the values of our de-duplicated dictionary.
|
|
clean_data = list(unique_items.values())
|
|
print(f"Received {len(data)} records from API. After de-duplication, {len(clean_data)} unique records will be processed.")
|
|
|
|
conn = None
|
|
try:
|
|
conn = sqlite3.connect(DB_NAME)
|
|
cursor = conn.cursor()
|
|
|
|
# --- Start Transaction ---
|
|
# 1. Delete all existing records from the table.
|
|
cursor.execute(f"DELETE FROM {TABLE_NAME}")
|
|
print(f"Cleared all old records from '{TABLE_NAME}'.")
|
|
|
|
# 2. Prepare the new, clean records for insertion.
|
|
records_to_insert = []
|
|
for item in clean_data: # Use the clean_data list now
|
|
records_to_insert.append((
|
|
item.get('id'),
|
|
item.get('id_item'),
|
|
item.get('id_terminal'),
|
|
item.get('id_category'),
|
|
item.get('price_buy'),
|
|
item.get('price_sell'),
|
|
item.get('date_added'),
|
|
item.get('date_modified'),
|
|
item.get('item_name'),
|
|
item.get('item_uuid'),
|
|
item.get('terminal_name')
|
|
))
|
|
|
|
# 3. Insert all new records.
|
|
sql_statement = f'''
|
|
INSERT INTO {TABLE_NAME} (
|
|
id, id_item, id_terminal, id_category, price_buy, price_sell,
|
|
date_added, date_modified, item_name, item_uuid, terminal_name
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
'''
|
|
cursor.executemany(sql_statement, records_to_insert)
|
|
|
|
# --- Commit Transaction ---
|
|
conn.commit()
|
|
print(f"Successfully synchronized {len(records_to_insert)} records into the database.")
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"Database error: {e}")
|
|
if conn:
|
|
print("Rolling back changes.")
|
|
conn.rollback()
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
def item_sync_job():
|
|
"""The main job function to be scheduled for syncing item prices."""
|
|
print(f"--- Running item sync job at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ---")
|
|
api_data = fetch_item_data_from_api()
|
|
sync_data_with_db(api_data)
|
|
print("--- Item sync job finished ---")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# 1. Set up the database and table on the first run.
|
|
setup_item_database()
|
|
|
|
# 2. Run the job immediately once when the script starts.
|
|
item_sync_job()
|
|
|
|
# 3. Schedule the job to run every hour.
|
|
print(f"Scheduling item sync job to run every hour. Press Ctrl+C to exit.")
|
|
schedule.every().hour.do(item_sync_job)
|
|
|
|
# 4. Run the scheduler loop.
|
|
while True:
|
|
try:
|
|
schedule.run_pending()
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
print("\nExiting scheduler.")
|
|
break
|