Initial Push

This commit is contained in:
Oli Passey
2025-06-27 10:36:26 +01:00
parent cf1023c14a
commit 191184ba5e
31 changed files with 4531 additions and 68 deletions

7
src/__init__.py Normal file
View File

@@ -0,0 +1,7 @@
"""
Price Tracker - Web scraper for monitoring product prices across multiple sites
"""
__version__ = "1.0.0"
__author__ = "Price Tracker Team"
__description__ = "A comprehensive price tracking system using Beautiful Soup"

86
src/config.py Normal file
View File

@@ -0,0 +1,86 @@
"""
Configuration management for the price tracker
"""
import json
import os
from typing import Dict, Any, Optional
from pathlib import Path
class Config:
"""Configuration manager for the price tracker application."""
def __init__(self, config_path: Optional[str] = None):
self.config_path = config_path or "config.json"
self._config = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""Load configuration from JSON file."""
config_file = Path(self.config_path)
if not config_file.exists():
raise FileNotFoundError(f"Config file not found: {self.config_path}")
with open(config_file, 'r') as f:
return json.load(f)
@property
def database_path(self) -> str:
"""Get database file path."""
return self._config.get('database', {}).get('path', 'price_tracker.db')
@property
def scraping_config(self) -> Dict[str, Any]:
"""Get scraping configuration."""
return self._config.get('scraping', {})
@property
def delay_between_requests(self) -> float:
"""Get delay between requests in seconds."""
return self.scraping_config.get('delay_between_requests', 2)
@property
def max_concurrent_requests(self) -> int:
"""Get maximum concurrent requests."""
return self.scraping_config.get('max_concurrent_requests', 5)
@property
def timeout(self) -> int:
"""Get request timeout in seconds."""
return self.scraping_config.get('timeout', 30)
@property
def retry_attempts(self) -> int:
"""Get number of retry attempts."""
return self.scraping_config.get('retry_attempts', 3)
@property
def user_agents(self) -> list:
"""Get list of user agents."""
return self.scraping_config.get('user_agents', [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
])
@property
def notification_config(self) -> Dict[str, Any]:
"""Get notification configuration."""
return self._config.get('notifications', {})
@property
def sites_config(self) -> Dict[str, Any]:
"""Get sites configuration."""
return self._config.get('sites', {})
def get_site_config(self, site_name: str) -> Optional[Dict[str, Any]]:
"""Get configuration for a specific site."""
return self.sites_config.get(site_name)
def is_site_enabled(self, site_name: str) -> bool:
"""Check if a site is enabled."""
site_config = self.get_site_config(site_name)
return site_config.get('enabled', False) if site_config else False
def get_enabled_sites(self) -> list:
"""Get list of enabled sites."""
return [site for site, config in self.sites_config.items()
if config.get('enabled', False)]

228
src/database.py Normal file
View File

@@ -0,0 +1,228 @@
"""
Database management for price tracking
"""
import sqlite3
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
import json
import logging
logger = logging.getLogger(__name__)
class DatabaseManager:
"""Manages SQLite database operations for price tracking."""
def __init__(self, db_path: str):
self.db_path = db_path
self._init_database()
def _init_database(self):
"""Initialize database tables."""
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
target_price REAL,
urls TEXT NOT NULL, -- JSON string of site URLs
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
active BOOLEAN DEFAULT 1
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS price_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER NOT NULL,
site_name TEXT NOT NULL,
price REAL NOT NULL,
currency TEXT DEFAULT 'GBP',
availability BOOLEAN DEFAULT 1,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (product_id) REFERENCES products (id)
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS price_alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER NOT NULL,
site_name TEXT NOT NULL,
alert_price REAL NOT NULL,
triggered_at TIMESTAMP,
notified BOOLEAN DEFAULT 0,
FOREIGN KEY (product_id) REFERENCES products (id)
)
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_price_history_product_id
ON price_history (product_id)
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_price_history_timestamp
ON price_history (timestamp)
''')
def add_product(self, name: str, urls: Dict[str, str],
description: str = None, target_price: float = None) -> int:
"""Add a new product to track."""
urls_json = json.dumps(urls)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute('''
INSERT INTO products (name, description, target_price, urls)
VALUES (?, ?, ?, ?)
''', (name, description, target_price, urls_json))
product_id = cursor.lastrowid
logger.info(f"Added product: {name} (ID: {product_id})")
return product_id
def get_product(self, product_id: int) -> Optional[Dict[str, Any]]:
"""Get product by ID."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute('''
SELECT * FROM products WHERE id = ? AND active = 1
''', (product_id,))
row = cursor.fetchone()
if row:
product = dict(row)
product['urls'] = json.loads(product['urls'])
return product
return None
def get_all_products(self) -> List[Dict[str, Any]]:
"""Get all active products."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute('''
SELECT * FROM products WHERE active = 1 ORDER BY name
''')
products = []
for row in cursor.fetchall():
product = dict(row)
product['urls'] = json.loads(product['urls'])
products.append(product)
return products
def update_product(self, product_id: int, **kwargs):
"""Update product information."""
allowed_fields = ['name', 'description', 'target_price', 'urls']
updates = []
values = []
for field, value in kwargs.items():
if field in allowed_fields:
if field == 'urls':
value = json.dumps(value)
updates.append(f"{field} = ?")
values.append(value)
if not updates:
return
updates.append("updated_at = ?")
values.append(datetime.now())
values.append(product_id)
with sqlite3.connect(self.db_path) as conn:
conn.execute(f'''
UPDATE products SET {', '.join(updates)} WHERE id = ?
''', values)
def deactivate_product(self, product_id: int):
"""Deactivate a product (soft delete)."""
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
UPDATE products SET active = 0, updated_at = ? WHERE id = ?
''', (datetime.now(), product_id))
def save_price_history(self, product_id: int, site_name: str, price: float,
currency: str = 'GBP', availability: bool = True,
timestamp: datetime = None):
"""Save price history entry."""
if timestamp is None:
timestamp = datetime.now()
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
INSERT INTO price_history
(product_id, site_name, price, currency, availability, timestamp)
VALUES (?, ?, ?, ?, ?, ?)
''', (product_id, site_name, price, currency, availability, timestamp))
def get_price_history(self, product_id: int, days: int = 30) -> List[Dict[str, Any]]:
"""Get price history for a product."""
start_date = datetime.now() - timedelta(days=days)
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute('''
SELECT * FROM price_history
WHERE product_id = ? AND timestamp >= ?
ORDER BY timestamp DESC
''', (product_id, start_date))
return [dict(row) for row in cursor.fetchall()]
def get_latest_prices(self, product_id: int) -> Dict[str, Dict[str, Any]]:
"""Get latest price for each site for a product."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute('''
SELECT DISTINCT site_name,
FIRST_VALUE(price) OVER (PARTITION BY site_name ORDER BY timestamp DESC) as price,
FIRST_VALUE(currency) OVER (PARTITION BY site_name ORDER BY timestamp DESC) as currency,
FIRST_VALUE(availability) OVER (PARTITION BY site_name ORDER BY timestamp DESC) as availability,
FIRST_VALUE(timestamp) OVER (PARTITION BY site_name ORDER BY timestamp DESC) as timestamp
FROM price_history
WHERE product_id = ?
''', (product_id,))
result = {}
for row in cursor.fetchall():
result[row['site_name']] = {
'price': row['price'],
'currency': row['currency'],
'availability': bool(row['availability']),
'timestamp': row['timestamp']
}
return result
def get_price_statistics(self, product_id: int, days: int = 30) -> Dict[str, Any]:
"""Get price statistics for a product."""
start_date = datetime.now() - timedelta(days=days)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute('''
SELECT site_name,
MIN(price) as min_price,
MAX(price) as max_price,
AVG(price) as avg_price,
COUNT(*) as data_points
FROM price_history
WHERE product_id = ? AND timestamp >= ?
GROUP BY site_name
''', (product_id, start_date))
stats = {}
for row in cursor.fetchall():
stats[row[0]] = {
'min_price': row[1],
'max_price': row[2],
'avg_price': round(row[3], 2),
'data_points': row[4]
}
return stats

192
src/notification.py Normal file
View File

@@ -0,0 +1,192 @@
"""
Notification system for price alerts
"""
import smtplib
import logging
import aiohttp
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import List, Dict, Any
from datetime import datetime
logger = logging.getLogger(__name__)
class NotificationManager:
"""Manages notifications for price alerts."""
def __init__(self, config):
self.config = config
self.notification_config = config.notification_config
async def send_price_alerts(self, alerts: List[Dict[str, Any]]):
"""Send notifications for price alerts."""
if not alerts:
return
# Send email notifications
if self.notification_config.get('email', {}).get('enabled', False):
await self._send_email_alerts(alerts)
# Send webhook notifications
if self.notification_config.get('webhook', {}).get('enabled', False):
await self._send_webhook_alerts(alerts)
async def _send_email_alerts(self, alerts: List[Dict[str, Any]]):
"""Send email notifications for price alerts."""
email_config = self.notification_config.get('email', {})
try:
# Create email content
subject = f"Price Alert: {len(alerts)} product(s) at target price!"
body = self._create_email_body(alerts)
# Create message
msg = MIMEMultipart()
msg['From'] = email_config.get('sender_email')
msg['To'] = email_config.get('recipient_email')
msg['Subject'] = subject
msg.attach(MIMEText(body, 'html'))
# Send email
server = smtplib.SMTP(email_config.get('smtp_server'), email_config.get('smtp_port'))
server.starttls()
server.login(email_config.get('sender_email'), email_config.get('sender_password'))
text = msg.as_string()
server.sendmail(email_config.get('sender_email'),
email_config.get('recipient_email'), text)
server.quit()
logger.info(f"Email alert sent for {len(alerts)} products")
except Exception as e:
logger.error(f"Failed to send email alert: {e}")
async def _send_webhook_alerts(self, alerts: List[Dict[str, Any]]):
"""Send webhook notifications for price alerts."""
webhook_config = self.notification_config.get('webhook', {})
webhook_url = webhook_config.get('url')
if not webhook_url:
return
try:
payload = {
'timestamp': datetime.now().isoformat(),
'alert_count': len(alerts),
'alerts': []
}
for alert in alerts:
payload['alerts'].append({
'product_name': alert['product']['name'],
'site': alert['site'],
'current_price': alert['current_price'],
'target_price': alert['target_price'],
'savings': alert['target_price'] - alert['current_price']
})
async with aiohttp.ClientSession() as session:
async with session.post(webhook_url, json=payload) as response:
if response.status == 200:
logger.info(f"Webhook alert sent for {len(alerts)} products")
else:
logger.error(f"Webhook failed with status {response.status}")
except Exception as e:
logger.error(f"Failed to send webhook alert: {e}")
def _create_email_body(self, alerts: List[Dict[str, Any]]) -> str:
"""Create HTML email body for price alerts."""
html = """
<html>
<head>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.header { background-color: #4CAF50; color: white; padding: 20px; text-align: center; }
.alert { border: 1px solid #ddd; margin: 10px 0; padding: 15px; background-color: #f9f9f9; }
.product-name { font-size: 18px; font-weight: bold; color: #333; }
.price-info { margin: 10px 0; }
.current-price { color: #4CAF50; font-weight: bold; font-size: 16px; }
.target-price { color: #666; }
.savings { color: #FF5722; font-weight: bold; }
.site { background-color: #2196F3; color: white; padding: 5px 10px; border-radius: 3px; font-size: 12px; }
.footer { margin-top: 30px; font-size: 12px; color: #666; }
</style>
</head>
<body>
<div class="header">
<h1>🎉 Price Alert!</h1>
<p>Great news! We found products at your target price!</p>
</div>
"""
for alert in alerts:
product = alert['product']
savings = alert['target_price'] - alert['current_price']
html += f"""
<div class="alert">
<div class="product-name">{product['name']}</div>
<div class="price-info">
<span class="site">{alert['site'].upper()}</span>
<br><br>
<span class="current-price">Current Price: £{alert['current_price']:.2f}</span><br>
<span class="target-price">Your Target: £{alert['target_price']:.2f}</span><br>
<span class="savings">You Save: £{savings:.2f}</span>
</div>
</div>
"""
html += """
<div class="footer">
<p>This is an automated price alert from your Price Tracker system.</p>
<p>Happy shopping! 🛒</p>
</div>
</body>
</html>
"""
return html
async def send_test_notification(self) -> Dict[str, Any]:
"""Send a test notification to verify configuration."""
test_result = {
'email': {'enabled': False, 'success': False, 'error': None},
'webhook': {'enabled': False, 'success': False, 'error': None}
}
# Test email
if self.notification_config.get('email', {}).get('enabled', False):
test_result['email']['enabled'] = True
try:
test_alerts = [{
'product': {'name': 'Test Product'},
'site': 'test-site',
'current_price': 19.99,
'target_price': 25.00
}]
await self._send_email_alerts(test_alerts)
test_result['email']['success'] = True
except Exception as e:
test_result['email']['error'] = str(e)
# Test webhook
if self.notification_config.get('webhook', {}).get('enabled', False):
test_result['webhook']['enabled'] = True
try:
test_alerts = [{
'product': {'name': 'Test Product'},
'site': 'test-site',
'current_price': 19.99,
'target_price': 25.00
}]
await self._send_webhook_alerts(test_alerts)
test_result['webhook']['success'] = True
except Exception as e:
test_result['webhook']['error'] = str(e)
return test_result

334
src/scraper.py Normal file
View File

@@ -0,0 +1,334 @@
"""
Web scraping functionality for price tracking
"""
import asyncio
import aiohttp
import logging
import random
import re
from typing import Dict, List, Optional, Any, Tuple
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from .config import Config
logger = logging.getLogger(__name__)
class PriceScraper:
"""Base class for price scraping functionality."""
def __init__(self, config: Config):
self.config = config
self.ua = UserAgent()
self.session = None
async def __aenter__(self):
"""Async context manager entry."""
connector = aiohttp.TCPConnector(limit=self.config.max_concurrent_requests)
timeout = aiohttp.ClientTimeout(total=self.config.timeout)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers={'User-Agent': self.ua.random}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
if self.session:
await self.session.close()
def _get_headers(self, url: str = None) -> Dict[str, str]:
"""Get request headers with random user agent and site-specific headers."""
user_agents = self.config.user_agents
if user_agents:
user_agent = random.choice(user_agents)
else:
user_agent = self.ua.random
headers = {
'User-Agent': user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
'Accept-Language': 'en-GB,en-US;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
}
# Add site-specific headers
if url:
if 'amazon.co.uk' in url:
headers.update({
'Referer': 'https://www.amazon.co.uk/',
})
elif 'jjfoodservice.com' in url:
headers.update({
'Referer': 'https://www.jjfoodservice.com/',
})
elif 'atoz-catering.co.uk' in url:
headers.update({
'Referer': 'https://www.atoz-catering.co.uk/',
})
return headers
async def _fetch_page(self, url: str) -> Optional[str]:
"""Fetch a web page with retry logic and anti-bot measures."""
base_delay = random.uniform(1, 3) # Random delay between 1-3 seconds
for attempt in range(self.config.retry_attempts):
try:
# Add delay before each request (except first)
if attempt > 0:
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(delay)
headers = self._get_headers(url)
async with self.session.get(url, headers=headers) as response:
if response.status == 200:
return await response.text()
elif response.status == 403:
logger.warning(f"Access denied (403) for {url} - may be blocked by anti-bot measures")
# For 403 errors, wait longer before retry
if attempt < self.config.retry_attempts - 1:
await asyncio.sleep(random.uniform(5, 10))
elif response.status == 429:
logger.warning(f"Rate limited (429) for {url}")
# For rate limiting, wait even longer
if attempt < self.config.retry_attempts - 1:
await asyncio.sleep(random.uniform(10, 20))
else:
logger.warning(f"HTTP {response.status} for {url}")
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed for {url}: {e}")
if attempt < self.config.retry_attempts - 1:
await asyncio.sleep(base_delay * (2 ** attempt))
logger.error(f"Failed to fetch {url} after {self.config.retry_attempts} attempts")
return None
def _extract_price(self, soup: BeautifulSoup, selectors: List[str]) -> Optional[float]:
"""Extract price from HTML using CSS selectors."""
for selector in selectors:
try:
elements = soup.select(selector)
for element in elements:
price_text = element.get_text(strip=True)
price = self._parse_price(price_text)
if price is not None:
return price
except Exception as e:
logger.debug(f"Error with selector {selector}: {e}")
continue
return None
def _parse_price(self, price_text: str) -> Optional[float]:
"""Parse price from text string."""
if not price_text:
return None
# Remove common currency symbols and clean text
price_text = re.sub(r'[^\d.,]+', '', price_text)
price_text = price_text.replace(',', '')
# Try to extract price as float
try:
return float(price_text)
except (ValueError, TypeError):
# Try to find price pattern
price_match = re.search(r'(\d+\.?\d*)', price_text)
if price_match:
return float(price_match.group(1))
return None
def _extract_text(self, soup: BeautifulSoup, selectors: List[str]) -> Optional[str]:
"""Extract text from HTML using CSS selectors."""
for selector in selectors:
try:
element = soup.select_one(selector)
if element:
return element.get_text(strip=True)
except Exception as e:
logger.debug(f"Error with selector {selector}: {e}")
continue
return None
def _detect_site(self, url: str) -> Optional[str]:
"""Detect which site this URL belongs to."""
domain = urlparse(url).netloc.lower()
if 'amazon' in domain:
return 'amazon'
elif 'ebay' in domain:
return 'ebay'
elif 'walmart' in domain:
return 'walmart'
# Add more site detection logic here
return None
async def scrape_product_price(self, url: str, site_name: str = None) -> Dict[str, Any]:
"""Scrape price for a single product from a URL."""
result = {
'success': False,
'price': None,
'currency': 'GBP',
'title': None,
'availability': None,
'url': url,
'error': None
}
try:
# Auto-detect site if not provided
if not site_name:
site_name = self._detect_site(url)
if not site_name:
result['error'] = "Could not detect site from URL"
return result
# Get site configuration
site_config = self.config.get_site_config(site_name)
if not site_config:
result['error'] = f"No configuration found for site: {site_name}"
return result
if not self.config.is_site_enabled(site_name):
result['error'] = f"Site {site_name} is disabled"
return result
# Fetch page content
html_content = await self._fetch_page(url)
if not html_content:
result['error'] = "Failed to fetch page content"
return result
# Parse HTML
soup = BeautifulSoup(html_content, 'html.parser')
# Extract price
price_selectors = site_config.get('selectors', {}).get('price', [])
price = self._extract_price(soup, price_selectors)
if price is None:
result['error'] = "Could not extract price from page"
return result
# Extract additional information
title_selectors = site_config.get('selectors', {}).get('title', [])
title = self._extract_text(soup, title_selectors)
availability_selectors = site_config.get('selectors', {}).get('availability', [])
availability_text = self._extract_text(soup, availability_selectors)
availability = self._parse_availability(availability_text)
result.update({
'success': True,
'price': price,
'title': title,
'availability': availability
})
logger.info(f"Successfully scraped {site_name}: ${price}")
except Exception as e:
logger.error(f"Error scraping {url}: {e}")
result['error'] = str(e)
return result
def _parse_availability(self, availability_text: str) -> bool:
"""Parse availability from text."""
if not availability_text:
return True # Assume available if no info
availability_text = availability_text.lower()
# Common out of stock indicators
out_of_stock_indicators = [
'out of stock', 'unavailable', 'sold out', 'not available',
'temporarily out of stock', 'currently unavailable'
]
for indicator in out_of_stock_indicators:
if indicator in availability_text:
return False
return True
class ScraperManager:
"""Manages multiple price scrapers and coordinates scraping tasks."""
def __init__(self, config: Config):
self.config = config
self.semaphore = asyncio.Semaphore(config.max_concurrent_requests)
async def scrape_product(self, product: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
"""Scrape prices for a single product across all configured sites."""
product_id = product['id']
urls = product['urls']
results = {}
async with PriceScraper(self.config) as scraper:
tasks = []
for site_name, url in urls.items():
if self.config.is_site_enabled(site_name):
task = self._scrape_with_semaphore(scraper, url, site_name)
tasks.append((site_name, task))
# Add delay between requests
await asyncio.sleep(self.config.delay_between_requests)
# Wait for all tasks to complete
for site_name, task in tasks:
try:
result = await task
results[site_name] = result
except Exception as e:
logger.error(f"Error scraping {site_name} for product {product_id}: {e}")
results[site_name] = {
'success': False,
'error': str(e)
}
return results
async def _scrape_with_semaphore(self, scraper: PriceScraper, url: str, site_name: str):
"""Scrape with semaphore to limit concurrent requests."""
async with self.semaphore:
return await scraper.scrape_product_price(url, site_name)
async def scrape_all_products(self, products: List[Dict[str, Any]]) -> Dict[int, Dict[str, Dict[str, Any]]]:
"""Scrape prices for all products."""
results = {}
for product in products:
try:
product_id = product['id']
logger.info(f"Scraping product: {product['name']} (ID: {product_id})")
product_results = await self.scrape_product(product)
results[product_id] = product_results
# Add delay between products
await asyncio.sleep(self.config.delay_between_requests)
except Exception as e:
logger.error(f"Error scraping product {product.get('id', 'unknown')}: {e}")
return results

139
src/scraper_manager.py Normal file
View File

@@ -0,0 +1,139 @@
"""
Scraper manager for coordinating price scraping tasks
"""
import asyncio
import logging
from typing import Dict, List, Any
from .scraper import ScraperManager as BaseScraper
from .uk_scraper import UKCateringScraper
logger = logging.getLogger(__name__)
class ScraperManager(BaseScraper):
"""Enhanced scraper manager with additional coordination features."""
def __init__(self, config):
super().__init__(config)
self.active_tasks = {}
async def scrape_product_by_id(self, product_id: int, product_data: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
"""Scrape a specific product by ID with task tracking."""
if product_id in self.active_tasks:
logger.info(f"Product {product_id} is already being scraped")
return await self.active_tasks[product_id]
# Create and track the scraping task
task = asyncio.create_task(self.scrape_product(product_data))
self.active_tasks[product_id] = task
try:
result = await task
return result
finally:
# Clean up completed task
if product_id in self.active_tasks:
del self.active_tasks[product_id]
async def cancel_product_scraping(self, product_id: int) -> bool:
"""Cancel scraping for a specific product."""
if product_id in self.active_tasks:
task = self.active_tasks[product_id]
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
del self.active_tasks[product_id]
logger.info(f"Cancelled scraping for product {product_id}")
return True
return False
def get_active_scraping_tasks(self) -> List[int]:
"""Get list of product IDs currently being scraped."""
return list(self.active_tasks.keys())
async def health_check(self) -> Dict[str, Any]:
"""Perform a health check on the scraping system."""
health_status = {
'status': 'healthy',
'active_tasks': len(self.active_tasks),
'enabled_sites': len(self.config.get_enabled_sites()),
'site_checks': {}
}
# Test each enabled site with a simple request
enabled_sites = self.config.get_enabled_sites()
for site_name in enabled_sites:
site_config = self.config.get_site_config(site_name)
base_url = site_config.get('base_url', '')
try:
from .scraper import PriceScraper
async with PriceScraper(self.config) as scraper:
html_content = await scraper._fetch_page(base_url)
if html_content:
health_status['site_checks'][site_name] = 'accessible'
else:
health_status['site_checks'][site_name] = 'inaccessible'
except Exception as e:
health_status['site_checks'][site_name] = f'error: {str(e)}'
# Determine overall health
failed_sites = [site for site, status in health_status['site_checks'].items()
if status != 'accessible']
if len(failed_sites) == len(enabled_sites):
health_status['status'] = 'unhealthy'
elif failed_sites:
health_status['status'] = 'degraded'
return health_status
async def scrape_product(self, product: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
"""Scrape prices for a single product across all configured sites."""
product_id = product['id']
urls = product['urls']
results = {}
# Determine which scraper to use based on the sites
uk_catering_sites = {'jjfoodservice', 'atoz_catering', 'amazon_uk'}
has_uk_sites = any(site in uk_catering_sites for site in urls.keys())
if has_uk_sites:
# Use UK catering scraper
async with UKCateringScraper(self.config) as scraper:
tasks = []
for site_name, url in urls.items():
if self.config.is_site_enabled(site_name):
task = self._scrape_with_semaphore_uk(scraper, url, site_name)
tasks.append((site_name, task))
# Add delay between requests
await asyncio.sleep(self.config.delay_between_requests)
# Wait for all tasks to complete
for site_name, task in tasks:
try:
result = await task
results[site_name] = result
except Exception as e:
logger.error(f"Error scraping {site_name} for product {product_id}: {e}")
results[site_name] = {
'success': False,
'error': str(e)
}
else:
# Use standard scraper for other sites
results = await super().scrape_product(product)
return results
async def _scrape_with_semaphore_uk(self, scraper: UKCateringScraper, url: str, site_name: str):
"""Scrape with semaphore using UK scraper."""
async with self.semaphore:
return await scraper.scrape_product_price(url, site_name)

332
src/uk_scraper.py Normal file
View File

@@ -0,0 +1,332 @@
"""
Specialized scrapers for UK catering supply sites
"""
import re
import logging
from typing import Dict, Any, Optional
from bs4 import BeautifulSoup
from .scraper import PriceScraper
logger = logging.getLogger(__name__)
class UKCateringScraper(PriceScraper):
"""Specialized scraper for UK catering supply websites."""
def _parse_uk_price(self, price_text: str) -> Optional[float]:
"""Parse UK price format with £ symbol."""
if not price_text:
return None
# Remove common text and normalize
price_text = price_text.lower()
price_text = re.sub(r'delivery:|collection:|was:|now:|offer:|from:', '', price_text)
# Find price with £ symbol
price_match = re.search(r'£(\d+\.?\d*)', price_text)
if price_match:
try:
return float(price_match.group(1))
except ValueError:
pass
# Try without £ symbol but with decimal
price_match = re.search(r'(\d+\.\d{2})', price_text)
if price_match:
try:
return float(price_match.group(1))
except ValueError:
pass
return None
def _extract_jjfoodservice_data(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Extract data specifically from JJ Food Service."""
result = {
'price': None,
'title': None,
'availability': True,
'currency': 'GBP'
}
# Try multiple selectors for price
price_selectors = [
'.price',
'.product-price',
'[data-testid="price"]',
'.price-value',
'.current-price',
'.product-card-price',
'span:contains("£")',
'.cost'
]
for selector in price_selectors:
try:
elements = soup.select(selector)
for element in elements:
price_text = element.get_text(strip=True)
price = self._parse_uk_price(price_text)
if price is not None:
result['price'] = price
logger.info(f"Successfully scraped jjfoodservice: £{price}")
break
if result['price'] is not None:
break
except Exception as e:
logger.debug(f"Error with JJ Food Service price selector {selector}: {e}")
# Try to extract title
title_selectors = [
'h1',
'.product-title',
'.product-name',
'[data-testid="product-title"]',
'.product-card-title',
'title'
]
for selector in title_selectors:
try:
element = soup.select_one(selector)
if element:
result['title'] = element.get_text(strip=True)
break
except Exception as e:
logger.debug(f"Error with JJ Food Service title selector {selector}: {e}")
# Check availability
availability_indicators = [
'out of stock',
'unavailable',
'not available',
'temporarily unavailable'
]
page_text = soup.get_text().lower()
for indicator in availability_indicators:
if indicator in page_text:
result['availability'] = False
break
return result
def _extract_atoz_catering_data(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Extract data specifically from A to Z Catering."""
result = {
'price': None,
'title': None,
'availability': True,
'currency': 'GBP'
}
# A to Z Catering specific selectors
price_selectors = [
'.price',
'.product-price',
'.delivery-price',
'.collection-price',
'span:contains("£")',
'.price-value',
'.cost',
'.selling-price'
]
for selector in price_selectors:
try:
elements = soup.select(selector)
for element in elements:
price_text = element.get_text(strip=True)
# Skip if it contains "delivery" or "collection" but no price
if ('delivery' in price_text.lower() or 'collection' in price_text.lower()) and '£' not in price_text:
continue
price = self._parse_uk_price(price_text)
if price is not None:
result['price'] = price
logger.info(f"Successfully scraped atoz_catering: £{price}")
break
if result['price'] is not None:
break
except Exception as e:
logger.debug(f"Error with A to Z price selector {selector}: {e}")
# Extract title
title_selectors = [
'h1',
'.product-title',
'.product-name',
'a[href*="/products/product/"]',
'.product-link',
'title'
]
for selector in title_selectors:
try:
element = soup.select_one(selector)
if element:
result['title'] = element.get_text(strip=True)
break
except Exception as e:
logger.debug(f"Error with A to Z title selector {selector}: {e}")
# Check availability - A to Z specific indicators
availability_indicators = [
'out of stock',
'unavailable',
'not available',
'temporarily unavailable',
'contact us for availability'
]
page_text = soup.get_text().lower()
for indicator in availability_indicators:
if indicator in page_text:
result['availability'] = False
break
# Check if "Add to Basket" button is present (indicates availability)
add_to_basket = soup.select_one('.add-to-basket, button:contains("Add To Basket")')
if not add_to_basket and result['availability']:
# If no add to basket button and no explicit availability info, assume unavailable
out_of_stock_indicators = soup.select('.out-of-stock, .unavailable')
if out_of_stock_indicators:
result['availability'] = False
return result
def _extract_amazon_uk_data(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Extract data specifically from Amazon UK."""
result = {
'price': None,
'title': None,
'availability': True,
'currency': 'GBP'
}
# Amazon UK price selectors
price_selectors = [
'.a-price-whole',
'.a-price .a-offscreen',
'#priceblock_dealprice',
'#priceblock_ourprice',
'.a-price-range',
'.a-price.a-text-price.a-size-medium.apexPriceToPay',
'.a-price-current',
'span.a-price.a-text-price.a-size-medium'
]
for selector in price_selectors:
try:
elements = soup.select(selector)
for element in elements:
price_text = element.get_text(strip=True)
price = self._parse_uk_price(price_text)
if price is not None:
result['price'] = price
break
if result['price'] is not None:
break
except Exception as e:
logger.debug(f"Error with Amazon UK price selector {selector}: {e}")
# Extract title
title_selectors = [
'#productTitle',
'.product-title',
'h1.a-size-large',
'h1'
]
for selector in title_selectors:
try:
element = soup.select_one(selector)
if element:
result['title'] = element.get_text(strip=True)
break
except Exception as e:
logger.debug(f"Error with Amazon UK title selector {selector}: {e}")
# Check availability
availability_selectors = [
'#availability span',
'.a-size-medium.a-color-success',
'.a-size-medium.a-color-state',
'#availability .a-declarative'
]
for selector in availability_selectors:
try:
element = soup.select_one(selector)
if element:
availability_text = element.get_text().lower()
if any(phrase in availability_text for phrase in ['out of stock', 'unavailable', 'not available']):
result['availability'] = False
break
except Exception as e:
logger.debug(f"Error with Amazon UK availability selector {selector}: {e}")
return result
async def scrape_product(self, product_data: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
"""Scrape prices for a product from all configured sites."""
results = {}
urls = product_data.get('urls', {})
for site_name, url in urls.items():
try:
# Only process sites we support
if site_name not in ['jjfoodservice', 'atoz_catering', 'amazon_uk']:
logger.warning(f"Skipping unsupported site: {site_name}")
continue
html_content = await self._fetch_page(url)
if not html_content:
results[site_name] = {
'success': False,
'error': 'Failed to fetch page',
'price': None,
'currency': 'GBP'
}
continue
soup = BeautifulSoup(html_content, 'html.parser')
# Route to appropriate extraction method
if site_name == 'jjfoodservice':
extracted_data = self._extract_jjfoodservice_data(soup)
elif site_name == 'atoz_catering':
extracted_data = self._extract_atoz_catering_data(soup)
elif site_name == 'amazon_uk':
extracted_data = self._extract_amazon_uk_data(soup)
else:
# Fallback to generic extraction
extracted_data = self._extract_generic_data(soup, site_name)
if extracted_data['price'] is not None:
results[site_name] = {
'success': True,
'price': extracted_data['price'],
'currency': extracted_data['currency'],
'title': extracted_data.get('title'),
'availability': extracted_data.get('availability', True)
}
else:
results[site_name] = {
'success': False,
'error': 'Could not extract price',
'price': None,
'currency': 'GBP'
}
except Exception as e:
logger.error(f"Error scraping {site_name}: {e}")
results[site_name] = {
'success': False,
'error': str(e),
'price': None,
'currency': 'GBP'
}
return results

515
src/uk_scraper_old.py Normal file
View File

@@ -0,0 +1,515 @@
"""
Specialized scrapers for UK catering supply sites
"""
import re
import logging
from typing import Dict, Any, Optional
from bs4 import BeautifulSoup
from .scraper import PriceScraper
logger = logging.getLogger(__name__)
class UKCateringScraper(PriceScraper):
"""Specialized scraper for UK catering supply websites."""
def _parse_uk_price(self, price_text: str) -> Optional[float]:
"""Parse UK price format with £ symbol."""
if not price_text:
return None
# Remove common text and normalize
price_text = price_text.lower()
price_text = re.sub(r'delivery:|collection:|was:|now:|offer:|from:', '', price_text)
# Find price with £ symbol
price_match = re.search(r'£(\d+\.?\d*)', price_text)
if price_match:
try:
return float(price_match.group(1))
except ValueError:
pass
# Try without £ symbol but with decimal
price_match = re.search(r'(\d+\.\d{2})', price_text)
if price_match:
try:
return float(price_match.group(1))
except ValueError:
pass
return None
def _extract_jjfoodservice_data(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Extract data specifically from JJ Food Service."""
result = {
'price': None,
'title': None,
'availability': True,
'currency': 'GBP'
}
# Try multiple selectors for price
price_selectors = [
'.price',
'.product-price',
'[data-testid="price"]',
'.price-value',
'.current-price',
'.product-card-price',
'span:contains("£")',
'.cost'
]
for selector in price_selectors:
try:
elements = soup.select(selector)
for element in elements:
price_text = element.get_text(strip=True)
price = self._parse_uk_price(price_text)
if price is not None:
result['price'] = price
break
if result['price'] is not None:
break
except Exception as e:
logger.debug(f"Error with JJ Food Service price selector {selector}: {e}")
# Try to extract title
title_selectors = [
'h1',
'.product-title',
'.product-name',
'[data-testid="product-title"]',
'.product-card-title',
'title'
]
for selector in title_selectors:
try:
element = soup.select_one(selector)
if element:
result['title'] = element.get_text(strip=True)
break
except Exception as e:
logger.debug(f"Error with JJ Food Service title selector {selector}: {e}")
# Check availability
availability_indicators = [
'out of stock',
'unavailable',
'not available',
'sold out'
]
page_text = soup.get_text().lower()
for indicator in availability_indicators:
if indicator in page_text:
result['availability'] = False
break
return result
def _extract_atoz_data(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Extract data specifically from A to Z Catering."""
result = {
'price': None,
'title': None,
'availability': True,
'currency': 'GBP'
}
# A to Z Catering shows prices like "Delivery:£X.XX Collection:£Y.YY"
# We'll prioritize the lower price (usually collection)
price_text = soup.get_text()
# Look for delivery and collection prices
delivery_match = re.search(r'delivery:?\s*£(\d+\.?\d*)', price_text, re.IGNORECASE)
collection_match = re.search(r'collection:?\s*£(\d+\.?\d*)', price_text, re.IGNORECASE)
prices = []
if delivery_match:
try:
prices.append(float(delivery_match.group(1)))
except ValueError:
pass
if collection_match:
try:
prices.append(float(collection_match.group(1)))
except ValueError:
pass
# If we found prices, use the lowest one
if prices:
result['price'] = min(prices)
else:
# Fallback to general price extraction
price_selectors = [
'.price',
'.product-price',
'span:contains("£")',
'.price-value'
]
for selector in price_selectors:
try:
elements = soup.select(selector)
for element in elements:
price_text = element.get_text(strip=True)
price = self._parse_uk_price(price_text)
if price is not None:
result['price'] = price
break
if result['price'] is not None:
break
except Exception as e:
logger.debug(f"Error with A to Z price selector {selector}: {e}")
# Extract title - A to Z often has product names in links
title_selectors = [
'h1',
'.product-title',
'.product-name',
'a[href*="/products/product/"]',
'.product-link',
'title'
]
for selector in title_selectors:
try:
element = soup.select_one(selector)
if element:
title = element.get_text(strip=True)
# Clean up the title
if len(title) > 5 and 'A to Z' not in title:
result['title'] = title
break
except Exception as e:
logger.debug(f"Error with A to Z title selector {selector}: {e}")
# Check availability - look for "Add To Basket" button
add_to_basket = soup.find(text=re.compile('Add To Basket', re.IGNORECASE))
if not add_to_basket:
# Also check for out of stock indicators
out_of_stock_indicators = [
'out of stock',
'unavailable',
'not available',
'sold out'
]
page_text = soup.get_text().lower()
for indicator in out_of_stock_indicators:
if indicator in page_text:
result['availability'] = False
break
return result
def _extract_amazon_uk_data(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Extract data specifically from Amazon UK."""
result = {
'price': None,
'title': None,
'availability': True,
'currency': 'GBP'
}
# Amazon UK price selectors
price_selectors = [
'.a-price-whole',
'.a-price .a-offscreen',
'.a-price-current .a-offscreen',
'#priceblock_dealprice',
'#priceblock_ourprice',
'.a-price-range',
'.a-price.a-text-price.a-size-medium.apexPriceToPay .a-offscreen'
]
for selector in price_selectors:
try:
elements = soup.select(selector)
for element in elements:
price_text = element.get_text(strip=True)
price = self._parse_uk_price(price_text)
if price is not None:
result['price'] = price
break
if result['price'] is not None:
break
except Exception as e:
logger.debug(f"Error with Amazon UK price selector {selector}: {e}")
# Extract title
title_selectors = [
'#productTitle',
'.product-title',
'h1.a-size-large'
]
for selector in title_selectors:
try:
element = soup.select_one(selector)
if element:
result['title'] = element.get_text(strip=True)
break
except Exception as e:
logger.debug(f"Error with Amazon UK title selector {selector}: {e}")
# Check availability
availability_text = soup.get_text().lower()
if any(phrase in availability_text for phrase in ['out of stock', 'currently unavailable', 'not available']):
result['availability'] = False
return result
def _extract_tesco_data(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Extract data specifically from Tesco."""
result = {
'price': None,
'title': None,
'availability': True,
'currency': 'GBP'
}
# Tesco price selectors
price_selectors = [
'.price-control-wrapper .value',
'.price-per-sellable-unit .value',
'.price-per-quantity-weight .value',
'[data-testid="price-current-value"]',
'.price-current',
'.product-price .price'
]
for selector in price_selectors:
try:
elements = soup.select(selector)
for element in elements:
price_text = element.get_text(strip=True)
price = self._parse_uk_price(price_text)
if price is not None:
result['price'] = price
break
if result['price'] is not None:
break
except Exception as e:
logger.debug(f"Error with Tesco price selector {selector}: {e}")
# Extract title
title_selectors = [
'h1[data-testid="product-title"]',
'.product-details-tile h1',
'.product-title',
'h1.product-name'
]
for selector in title_selectors:
try:
element = soup.select_one(selector)
if element:
result['title'] = element.get_text(strip=True)
break
except Exception as e:
logger.debug(f"Error with Tesco title selector {selector}: {e}")
return result
def _extract_sainsburys_data(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Extract data specifically from Sainsburys."""
result = {
'price': None,
'title': None,
'availability': True,
'currency': 'GBP'
}
# Sainsburys price selectors
price_selectors = [
'.pd__cost__current-price',
'.pd__cost .pd__cost__retail-price',
'.pricing__now-price',
'.product-price__current',
'[data-testid="pd-retail-price"]',
'.price-per-unit'
]
for selector in price_selectors:
try:
elements = soup.select(selector)
for element in elements:
price_text = element.get_text(strip=True)
price = self._parse_uk_price(price_text)
if price is not None:
result['price'] = price
break
if result['price'] is not None:
break
except Exception as e:
logger.debug(f"Error with Sainsburys price selector {selector}: {e}")
# Extract title
title_selectors = [
'.pd__header h1',
'h1[data-testid="pd-product-name"]',
'.product-name',
'.pd__product-name'
]
for selector in title_selectors:
try:
element = soup.select_one(selector)
if element:
result['title'] = element.get_text(strip=True)
break
except Exception as e:
logger.debug(f"Error with Sainsburys title selector {selector}: {e}")
return result
def _extract_booker_data(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Extract data specifically from Booker."""
result = {
'price': None,
'title': None,
'availability': True,
'currency': 'GBP'
}
# Booker price selectors
price_selectors = [
'.price',
'.product-price',
'.price-current',
'.selling-price',
'[data-testid="price"]',
'.product-tile-price'
]
for selector in price_selectors:
try:
elements = soup.select(selector)
for element in elements:
price_text = element.get_text(strip=True)
price = self._parse_uk_price(price_text)
if price is not None:
result['price'] = price
break
if result['price'] is not None:
break
except Exception as e:
logger.debug(f"Error with Booker price selector {selector}: {e}")
# Extract title
title_selectors = [
'h1',
'.product-title',
'.product-name',
'.product-description h1',
'[data-testid="product-title"]'
]
for selector in title_selectors:
try:
element = soup.select_one(selector)
if element:
result['title'] = element.get_text(strip=True)
break
except Exception as e:
logger.debug(f"Error with Booker title selector {selector}: {e}")
return result
async def scrape_product_price(self, url: str, site_name: str = None) -> Dict[str, Any]:
"""Enhanced scraping for UK catering sites."""
result = {
'success': False,
'price': None,
'currency': 'GBP',
'title': None,
'availability': None,
'url': url,
'error': None
}
try:
# Auto-detect site if not provided
if not site_name:
site_name = self._detect_site(url)
if not site_name:
result['error'] = "Could not detect site from URL"
return result
# Check if site is enabled
if not self.config.is_site_enabled(site_name):
result['error'] = f"Site {site_name} is disabled"
return result
# Fetch page content
html_content = await self._fetch_page(url)
if not html_content:
result['error'] = "Failed to fetch page content"
return result
# Parse HTML
soup = BeautifulSoup(html_content, 'html.parser')
# Use specialized extraction based on site
if site_name == 'jjfoodservice':
extracted_data = self._extract_jjfoodservice_data(soup)
elif site_name == 'atoz_catering':
extracted_data = self._extract_atoz_data(soup)
elif site_name == 'amazon_uk':
extracted_data = self._extract_amazon_uk_data(soup)
elif site_name == 'tesco':
extracted_data = self._extract_tesco_data(soup)
elif site_name == 'sainsburys':
extracted_data = self._extract_sainsburys_data(soup)
elif site_name == 'booker':
extracted_data = self._extract_booker_data(soup)
else:
# Fall back to general extraction
return await super().scrape_product_price(url, site_name)
if extracted_data['price'] is None:
result['error'] = "Could not extract price from page"
return result
result.update({
'success': True,
'price': extracted_data['price'],
'currency': extracted_data.get('currency', 'GBP'),
'title': extracted_data.get('title'),
'availability': extracted_data.get('availability', True)
})
logger.info(f"Successfully scraped {site_name}: £{extracted_data['price']}")
except Exception as e:
logger.error(f"Error scraping {url}: {e}")
result['error'] = str(e)
return result
def _detect_site(self, url: str) -> Optional[str]:
"""Detect which UK catering site this URL belongs to."""
url_lower = url.lower()
if 'jjfoodservice.com' in url_lower:
return 'jjfoodservice'
elif 'atoz-catering.co.uk' in url_lower:
return 'atoz_catering'
elif 'amazon.co.uk' in url_lower:
return 'amazon_uk'
elif 'tesco.com' in url_lower:
return 'tesco'
elif 'sainsburys.co.uk' in url_lower:
return 'sainsburys'
elif 'booker.co.uk' in url_lower:
return 'booker'
# Fall back to parent detection for other sites
return super()._detect_site(url)

118
src/utils.py Normal file
View File

@@ -0,0 +1,118 @@
"""
Utility functions for the price tracker
"""
import logging
from typing import Dict, Any, List
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
def format_price(price: float, currency: str = 'GBP') -> str:
"""Format price with appropriate currency symbol."""
if currency == 'GBP':
return f"£{price:.2f}"
elif currency == 'USD':
return f"${price:.2f}"
elif currency == 'EUR':
return f"{price:.2f}"
else:
return f"{price:.2f} {currency}"
def calculate_price_change(old_price: float, new_price: float) -> Dict[str, Any]:
"""Calculate price change percentage and direction."""
if old_price == 0:
return {
'change': 0.0,
'percentage': 0.0,
'direction': 'stable'
}
change = new_price - old_price
percentage = (change / old_price) * 100
if percentage > 0.1:
direction = 'up'
elif percentage < -0.1:
direction = 'down'
else:
direction = 'stable'
return {
'change': change,
'percentage': percentage,
'direction': direction
}
def is_site_accessible(site_name: str, last_success: datetime = None) -> bool:
"""Check if a site is likely accessible based on recent success."""
if not last_success:
return True # Assume accessible if no data
# Consider site inaccessible if no success in last 24 hours
return (datetime.now() - last_success) < timedelta(hours=24)
def get_retry_delay(attempt: int, base_delay: float = 1.0, max_delay: float = 60.0) -> float:
"""Calculate exponential backoff delay with jitter."""
import random
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1) # Add 10% jitter
return delay + jitter
def clean_product_name(name: str) -> str:
"""Clean and normalize product name."""
import re
# Remove extra whitespace and normalize
name = re.sub(r'\s+', ' ', name.strip())
# Remove special characters that might cause issues
name = re.sub(r'[^\w\s\-\(\)&]', '', name)
return name
def is_valid_price(price: float) -> bool:
"""Check if a price is valid (positive and reasonable)."""
return price > 0 and price < 10000 # Max £10,000 seems reasonable for catering supplies
def get_price_alert_message(product_name: str, site_name: str, current_price: float,
target_price: float, currency: str = 'GBP') -> str:
"""Generate price alert message."""
current_formatted = format_price(current_price, currency)
target_formatted = format_price(target_price, currency)
return (f"Price Alert: {product_name} is now {current_formatted} on {site_name}, "
f"which is at or below your target price of {target_formatted}!")
def group_results_by_status(results: Dict[str, Dict[str, Any]]) -> Dict[str, List]:
"""Group scraping results by success/failure status."""
grouped = {
'successful': [],
'failed': [],
'blocked': []
}
for site_name, result in results.items():
if result.get('success'):
grouped['successful'].append({
'site': site_name,
'price': result.get('price'),
'currency': result.get('currency', 'GBP')
})
elif 'blocked' in str(result.get('error', '')).lower() or '403' in str(result.get('error', '')):
grouped['blocked'].append({
'site': site_name,
'error': result.get('error')
})
else:
grouped['failed'].append({
'site': site_name,
'error': result.get('error')
})
return grouped

271
src/web_ui.py Normal file
View File

@@ -0,0 +1,271 @@
"""
Web UI for the price tracker application
"""
from flask import Flask, render_template, request, jsonify, redirect, url_for, flash, send_from_directory
from flask_wtf import FlaskForm
from wtforms import StringField, FloatField, TextAreaField, SubmitField, URLField
from wtforms.validators import DataRequired, NumberRange, URL, Optional
import json
import asyncio
from datetime import datetime, timedelta
import plotly
import plotly.graph_objs as go
import pandas as pd
import os
from .database import DatabaseManager
from .config import Config
from .scraper_manager import ScraperManager
from .notification import NotificationManager
from .utils import format_price, group_results_by_status
def create_app():
"""Create Flask application."""
# Get the project root directory (parent of src)
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
template_dir = os.path.join(project_root, 'templates')
app = Flask(__name__, template_folder=template_dir)
app.config['SECRET_KEY'] = 'your-secret-key-change-this'
# Initialize components
config = Config()
db_manager = DatabaseManager(config.database_path)
scraper_manager = ScraperManager(config)
notification_manager = NotificationManager(config)
class ProductForm(FlaskForm):
name = StringField('Product Name', validators=[DataRequired()])
description = TextAreaField('Description')
target_price = FloatField('Target Price (£)', validators=[Optional(), NumberRange(min=0)])
jjfoodservice_url = URLField('JJ Food Service URL', validators=[Optional(), URL()])
atoz_catering_url = URLField('A to Z Catering URL', validators=[Optional(), URL()])
amazon_uk_url = URLField('Amazon UK URL', validators=[Optional(), URL()])
submit = SubmitField('Add Product')
@app.route('/')
def index():
"""Home page showing all products."""
products = db_manager.get_all_products()
# Get latest prices for each product
for product in products:
latest_prices = db_manager.get_latest_prices(product['id'])
product['latest_prices'] = latest_prices
# Find best current price
if latest_prices:
best_price = min(latest_prices.values(), key=lambda x: x['price'])
product['best_price'] = best_price
else:
product['best_price'] = None
return render_template('index.html', products=products)
@app.route('/add_product', methods=['GET', 'POST'])
def add_product():
"""Add a new product to track."""
form = ProductForm()
if form.validate_on_submit():
urls = {}
if form.jjfoodservice_url.data:
urls['jjfoodservice'] = form.jjfoodservice_url.data
if form.atoz_catering_url.data:
urls['atoz_catering'] = form.atoz_catering_url.data
if form.amazon_uk_url.data:
urls['amazon_uk'] = form.amazon_uk_url.data
if not urls:
flash('Please provide at least one URL to track.', 'error')
return render_template('add_product.html', form=form)
try:
product_id = db_manager.add_product(
name=form.name.data,
description=form.description.data,
target_price=form.target_price.data,
urls=urls
)
flash(f'Product "{form.name.data}" added successfully!', 'success')
return redirect(url_for('product_detail', product_id=product_id))
except Exception as e:
flash(f'Error adding product: {str(e)}', 'error')
return render_template('add_product.html', form=form)
@app.route('/product/<int:product_id>')
def product_detail(product_id):
"""Show detailed information for a product."""
product = db_manager.get_product(product_id)
if not product:
flash('Product not found.', 'error')
return redirect(url_for('index'))
# Get price history
price_history = db_manager.get_price_history(product_id, days=30)
latest_prices = db_manager.get_latest_prices(product_id)
price_stats = db_manager.get_price_statistics(product_id, days=30)
# Create price chart
chart_json = create_price_chart(price_history, product['name'])
return render_template('product_detail.html',
product=product,
price_history=price_history,
latest_prices=latest_prices,
price_stats=price_stats,
chart_json=chart_json)
@app.route('/scrape/<int:product_id>', methods=['POST'])
def scrape_product(product_id):
"""Manually trigger scraping for a specific product."""
product = db_manager.get_product(product_id)
if not product:
return jsonify({'error': 'Product not found'}), 404
try:
# Run scraping in a new event loop (since we're in Flask)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
results = loop.run_until_complete(scraper_manager.scrape_product(product))
# Save results to database
for site_name, result in results.items():
if result['success']:
db_manager.save_price_history(
product_id=product_id,
site_name=site_name,
price=result['price'],
availability=result.get('availability', True),
timestamp=datetime.now()
)
loop.close()
return jsonify({
'success': True,
'results': results,
'message': 'Scraping completed successfully'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/scrape_all', methods=['POST'])
def scrape_all_products():
"""Trigger scraping for all products."""
try:
products = db_manager.get_all_products()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
results = loop.run_until_complete(scraper_manager.scrape_all_products(products))
# Save results to database
total_updated = 0
for product_id, site_results in results.items():
for site_name, result in site_results.items():
if result['success']:
db_manager.save_price_history(
product_id=product_id,
site_name=site_name,
price=result['price'],
availability=result.get('availability', True),
timestamp=datetime.now()
)
total_updated += 1
loop.close()
return jsonify({
'success': True,
'total_updated': total_updated,
'message': f'Updated prices for {total_updated} product-site combinations'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/products')
def api_products():
"""API endpoint to get all products."""
products = db_manager.get_all_products()
return jsonify(products)
@app.route('/api/product/<int:product_id>/prices')
def api_product_prices(product_id):
"""API endpoint to get price history for a product."""
days = request.args.get('days', 30, type=int)
price_history = db_manager.get_price_history(product_id, days)
return jsonify(price_history)
@app.route('/settings')
def settings():
"""Settings page."""
return render_template('settings.html', config=config)
@app.route('/test_notifications', methods=['POST'])
def test_notifications():
"""Test notification system."""
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(notification_manager.send_test_notification())
loop.close()
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/favicon.ico')
def favicon():
"""Serve the favicon."""
return send_from_directory(os.path.join(app.root_path, 'static'),
'favicon.ico', mimetype='image/vnd.microsoft.icon')
def create_price_chart(price_history, product_name):
"""Create a price history chart using Plotly."""
if not price_history:
return json.dumps({})
# Convert to DataFrame for easier manipulation
df = pd.DataFrame(price_history)
df['timestamp'] = pd.to_datetime(df['timestamp'])
# Create traces for each site
traces = []
sites = df['site_name'].unique()
colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd']
for i, site in enumerate(sites):
site_data = df[df['site_name'] == site].sort_values('timestamp')
trace = go.Scatter(
x=site_data['timestamp'],
y=site_data['price'],
mode='lines+markers',
name=site.title(),
line=dict(color=colors[i % len(colors)], width=2),
marker=dict(size=6)
)
traces.append(trace)
layout = go.Layout(
title=f'Price History - {product_name}',
xaxis=dict(title='Date'),
yaxis=dict(title='Price (USD)'),
hovermode='closest',
margin=dict(l=50, r=50, t=50, b=50)
)
fig = go.Figure(data=traces, layout=layout)
return json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder)
return app