scrape fix

This commit is contained in:
Oli Passey
2025-06-27 17:25:56 +01:00
parent ee0142121a
commit 5726183115
27 changed files with 2353 additions and 621 deletions

View File

@@ -147,6 +147,15 @@ class DatabaseManager:
UPDATE products SET active = 0, updated_at = ? WHERE id = ?
''', (datetime.now(), product_id))
def delete_product(self, product_id: int):
"""Delete a product and all its associated price history."""
with sqlite3.connect(self.db_path) as conn:
# Delete price history first (due to foreign key constraints)
conn.execute('DELETE FROM price_history WHERE product_id = ?', (product_id,))
# Delete the product
conn.execute('DELETE FROM products WHERE id = ?', (product_id,))
def save_price_history(self, product_id: int, site_name: str, price: float,
currency: str = 'GBP', availability: bool = True,
timestamp: datetime = None):

View File

@@ -169,13 +169,21 @@ class PriceScraper:
"""Detect which site this URL belongs to."""
domain = urlparse(url).netloc.lower()
if 'amazon' in domain:
# UK Catering sites (handled by UKCateringScraper)
if 'jjfoodservice.com' in domain:
return 'jjfoodservice'
elif 'atoz-catering.co.uk' in domain:
return 'atoz_catering'
elif 'amazon.co.uk' in domain:
return 'amazon_uk'
# International sites (handled by base PriceScraper)
elif 'amazon.com' in domain or 'amazon.' in domain:
return 'amazon'
elif 'ebay' in domain:
return 'ebay'
elif 'walmart' in domain:
return 'walmart'
# Add more site detection logic here
return None
@@ -267,6 +275,17 @@ class PriceScraper:
return False
return True
def should_use_uk_scraper(self, url: str) -> bool:
"""Determine if this URL should use the UK catering scraper."""
site_name = self._detect_site(url)
uk_sites = {'jjfoodservice', 'atoz_catering', 'amazon_uk'}
return site_name in uk_sites
@classmethod
def get_uk_catering_sites(cls) -> set:
"""Get the list of UK catering sites."""
return {'jjfoodservice', 'atoz_catering', 'amazon_uk'}
class ScraperManager:

View File

@@ -17,6 +17,7 @@ class ScraperManager(BaseScraper):
def __init__(self, config):
super().__init__(config)
self.active_tasks = {}
self.semaphore = asyncio.Semaphore(config.max_concurrent_requests)
async def scrape_product_by_id(self, product_id: int, product_data: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
"""Scrape a specific product by ID with task tracking."""
@@ -36,6 +37,79 @@ class ScraperManager(BaseScraper):
if product_id in self.active_tasks:
del self.active_tasks[product_id]
async def scrape_product(self, product: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
"""Scrape prices for a single product across all configured sites."""
product_id = product['id']
urls = product['urls']
results = {}
# Check if this product has UK catering sites
uk_catering_sites = {'jjfoodservice', 'atoz_catering', 'amazon_uk'}
has_uk_sites = any(site in uk_catering_sites for site in urls.keys())
if has_uk_sites:
# Use UK-specific scraper
async with UKCateringScraper(self.config) as scraper:
tasks = []
for site_name, url in urls.items():
if self.config.is_site_enabled(site_name):
task = self._scrape_with_semaphore_uk(scraper, url, site_name)
tasks.append((site_name, task))
# Add delay between requests
await asyncio.sleep(self.config.delay_between_requests)
# Wait for all tasks to complete
for site_name, task in tasks:
try:
result = await task
results[site_name] = result
except Exception as e:
logger.error(f"Error scraping {site_name} for product {product_id}: {e}")
results[site_name] = {
'success': False,
'error': str(e)
}
else:
# Use generic scraper for non-UK sites
from .scraper import PriceScraper
async with PriceScraper(self.config) as scraper:
tasks = []
for site_name, url in urls.items():
if self.config.is_site_enabled(site_name):
task = self._scrape_with_semaphore(scraper, url, site_name)
tasks.append((site_name, task))
# Add delay between requests
await asyncio.sleep(self.config.delay_between_requests)
# Wait for all tasks to complete
for site_name, task in tasks:
try:
result = await task
results[site_name] = result
except Exception as e:
logger.error(f"Error scraping {site_name} for product {product_id}: {e}")
results[site_name] = {
'success': False,
'error': str(e)
}
return results
async def _scrape_with_semaphore_uk(self, scraper: UKCateringScraper, url: str, site_name: str):
"""Scrape with semaphore using UK scraper."""
async with self.semaphore:
return await scraper.scrape_product_price(url, site_name)
async def _scrape_with_semaphore(self, scraper, url: str, site_name: str):
"""Scrape with semaphore using generic scraper."""
async with self.semaphore:
return await scraper.scrape_product_price(url, site_name)
async def cancel_product_scraping(self, product_id: int) -> bool:
"""Cancel scraping for a specific product."""
if product_id in self.active_tasks:

View File

@@ -4,8 +4,8 @@ Specialized scrapers for UK catering supply sites
import re
import logging
from typing import Dict, Any, Optional
from bs4 import BeautifulSoup
from typing import Dict, Any, Optional, List, Tuple
from bs4 import BeautifulSoup, Tag
from .scraper import PriceScraper
logger = logging.getLogger(__name__)
@@ -14,35 +14,153 @@ logger = logging.getLogger(__name__)
class UKCateringScraper(PriceScraper):
"""Specialized scraper for UK catering supply websites."""
def _parse_uk_price(self, price_text: str) -> Optional[float]:
"""Parse UK price format with £ symbol."""
def _extract_special_pricing_context(self, element: Tag) -> Dict[str, Any]:
"""Extract special pricing context from an element and its surroundings."""
context = {
'has_strikethrough': False,
'has_offer_label': False,
'has_was_now': False,
'prices': [],
'price_types': []
}
# Get parent elements to check for special pricing context
parents = [element] + [p for p in element.parents if p.name][:3] # Check up to 3 levels up
for parent in parents:
parent_text = parent.get_text().lower() if parent else ""
# Check for strikethrough pricing
strikethrough_elements = parent.find_all(['del', 's', 'strike']) if parent else []
if strikethrough_elements:
context['has_strikethrough'] = True
for strike_elem in strikethrough_elements:
strike_price = self._parse_uk_price(strike_elem.get_text())
if strike_price:
context['prices'].append(strike_price)
context['price_types'].append('was_price')
# Check for offer/sale/discount labels
offer_patterns = [
r'\bsale\b', r'\boffer\b', r'\bdeal\b', r'\bdiscount\b',
r'\bspecial\b', r'\bpromo\b', r'\breduced\b', r'\bsave\b',
r'\bwas\s*£', r'\bnow\s*£', r'\b\d+%\s*off\b'
]
for pattern in offer_patterns:
if re.search(pattern, parent_text):
context['has_offer_label'] = True
break
# Look for "was/now" pricing patterns
was_now_match = re.search(r'was\s*£([\d.]+).*?now\s*£([\d.]+)', parent_text, re.IGNORECASE)
if was_now_match:
context['has_was_now'] = True
was_price = float(was_now_match.group(1))
now_price = float(was_now_match.group(2))
context['prices'].extend([was_price, now_price])
context['price_types'].extend(['was_price', 'now_price'])
return context
def _parse_uk_price(self, price_text: str, prefer_delivery: bool = False) -> Optional[float]:
"""Simple, conservative UK price parsing - just extract the first reasonable price."""
if not price_text:
return None
# Remove common text and normalize
price_text = price_text.lower()
price_text = re.sub(r'delivery:|collection:|was:|now:|offer:|from:', '', price_text)
# Skip very long text blocks that are unlikely to contain just prices
if len(price_text) > 100:
return None
# Check if this is delivery or collection pricing
is_delivery = 'delivery' in price_text.lower()
is_collection = 'collection' in price_text.lower()
# If we prefer delivery and this is explicitly collection, skip it
if prefer_delivery and is_collection and not is_delivery:
return None
# Simple regex to find prices - be very specific
price_match = re.search(r'£(\d{1,3}(?:\.\d{2})?)', price_text)
# Find price with £ symbol
price_match = re.search(r'£(\d+\.?\d*)', price_text)
if price_match:
try:
return float(price_match.group(1))
except ValueError:
pass
# Try without £ symbol but with decimal
price_match = re.search(r'(\d+\.\d{2})', price_text)
if price_match:
try:
return float(price_match.group(1))
price_val = float(price_match.group(1))
# Only accept reasonable food product prices
if 2.0 <= price_val <= 100.0:
return price_val
except ValueError:
pass
return None
def _find_special_offer_prices(self, soup: BeautifulSoup, site_name: str) -> List[Tuple[float, str]]:
"""Find special offer prices using enhanced selectors."""
special_prices = []
# Enhanced selectors for special offers
special_offer_selectors = [
# General special offer containers
'.special-offer', '.sale-price', '.offer-price', '.discount-price',
'.promo-price', '.reduced-price', '.deal-price',
# Strikethrough and comparison pricing
'del:contains("£"), s:contains("£"), strike:contains("£")',
'.was-price', '.original-price', '.rrp-price',
# Was/Now pricing containers
'.was-now-pricing', '.price-comparison', '.before-after-price',
# Sale badges and labels
'.sale-badge', '.offer-badge', '.discount-badge',
'*[class*="sale"]:contains("£")',
'*[class*="offer"]:contains("£")',
'*[class*="discount"]:contains("£")',
# Site-specific patterns
'.product-price-wrapper', '.price-container', '.pricing-section'
]
if site_name == 'atoz_catering':
# A to Z specific selectors - prioritize the offer price class
special_offer_selectors.extend([
'.my-price.price-offer', # Primary A to Z offer price selector
'h3:contains("£")', 'h4:contains("£")',
'.delivery-price-special', '.collection-price-special',
'*[style*="text-decoration: line-through"]',
'*[style*="text-decoration:line-through"]'
])
elif site_name == 'jjfoodservice':
# JJ Food Service specific selectors
special_offer_selectors.extend([
'.member-price', '.trade-price', '.bulk-price',
'.quantity-discount', '.volume-discount'
])
elif site_name == 'amazon_uk':
# Amazon UK specific selectors
special_offer_selectors.extend([
'.a-price.a-text-price.a-size-medium.apexPriceToPay .a-offscreen',
'.a-price-strike .a-offscreen',
'#priceblock_dealprice', '#priceblock_saleprice',
'.a-price-was', '.a-price-save'
])
for selector in special_offer_selectors:
try:
elements = soup.select(selector)
for element in elements:
price_text = element.get_text(strip=True)
if '£' in price_text:
price = self._parse_uk_price(price_text, detect_special_offers=True, element=element)
if price:
special_prices.append((price, selector))
except Exception as e:
logger.debug(f"Error with special offer selector {selector}: {e}")
return special_prices
def _extract_jjfoodservice_data(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Extract data specifically from JJ Food Service."""
"""Extract data specifically from JJ Food Service - simplified approach."""
result = {
'price': None,
'title': None,
@@ -50,43 +168,85 @@ class UKCateringScraper(PriceScraper):
'currency': 'GBP'
}
# Try multiple selectors for price
price_selectors = [
'.price',
'.product-price',
'[data-testid="price"]',
'.price-value',
'.current-price',
'.product-card-price',
'span:contains("£")',
'.cost'
# First, try to find elements with Price in class name and extract delivery price
price_elements = soup.select('[class*="Price"]')
logger.debug(f"JJ Food Service: Found {len(price_elements)} price elements")
for element in price_elements:
text = element.get_text(strip=True)
logger.debug(f"JJ Food Service: Checking price element text: '{text[:100]}'")
# Look for delivery price in concatenated strings like "Collection:£10.49£4.62 per kgDelivery:£11.79£5.19 per kg"
delivery_match = re.search(r'Delivery:£(\d{1,3}\.\d{2})', text, re.IGNORECASE)
if delivery_match:
price_val = float(delivery_match.group(1))
result['price'] = price_val
logger.info(f"JJ Food Service: Found delivery price £{price_val} in price element")
# extract title
title_el = soup.select_one('h1')
if title_el:
result['title'] = title_el.get_text(strip=True)
return result
# Second, attempt regex-based parsing of delivery price from raw page text
page_text = soup.get_text(separator=' ')
logger.debug(f"JJ Food Service page_text snippet: {page_text[:500]!r}")
# Look for delivery price patterns in the text
if 'DELIVERY' in page_text or 'delivery' in page_text:
logger.debug(f"Found 'DELIVERY' in page text, looking for price patterns...")
delivery_section = page_text[page_text.lower().find('delivery'):page_text.lower().find('delivery')+100]
logger.debug(f"Delivery section: {delivery_section!r}")
# Try multiple patterns for delivery price (based on actual HTML structure)
delivery_patterns = [
r'Delivery:£(\d{1,3}\.\d{2})', # Delivery:£11.79 (actual format found)
r'DELIVERY:£(\d{1,3}\.\d{2})', # DELIVERY:£11.79
r'delivery:£(\d{1,3}\.\d{2})', # delivery:£11.79
r'DELIVERY:\s*£(\d{1,3}\.\d{2})', # DELIVERY: £11.79 (with space)
r'delivery:\s*£(\d{1,3}\.\d{2})', # delivery: £11.79 (with space)
]
for selector in price_selectors:
for pattern in delivery_patterns:
logger.debug(f"JJ Food Service: Trying pattern: {pattern}")
delivery_match = re.search(pattern, page_text, re.IGNORECASE)
if delivery_match:
price_val = float(delivery_match.group(1))
result['price'] = price_val
logger.info(f"JJ Food Service: Parsed delivery price £{price_val} via regex pattern: {pattern}")
# extract title
title_el = soup.select_one('h1')
if title_el:
result['title'] = title_el.get_text(strip=True)
return result
else:
logger.debug(f"JJ Food Service: Pattern {pattern} did not match")
# Otherwise, try very specific selectors first - likely to contain prices
specific_selectors = [
'.price-delivery', # Delivery price specifically
'.delivery-price', # Alternative delivery price
'.price', # General price class
]
for selector in specific_selectors:
try:
elements = soup.select(selector)
for element in elements:
price_text = element.get_text(strip=True)
price = self._parse_uk_price(price_text)
if price is not None:
result['price'] = price
logger.info(f"Successfully scraped jjfoodservice: £{price}")
break
# Only process short text snippets that likely contain just prices
if '£' in price_text and len(price_text) < 30:
price = self._parse_uk_price(price_text, prefer_delivery=True)
if price is not None:
result['price'] = price
logger.info(f"JJ Food Service: Found price £{price} with selector '{selector}' from text: '{price_text}'")
break
if result['price'] is not None:
break
except Exception as e:
logger.debug(f"Error with JJ Food Service price selector {selector}: {e}")
# Try to extract title
title_selectors = [
'h1',
'.product-title',
'.product-name',
'[data-testid="product-title"]',
'.product-card-title',
'title'
]
logger.debug(f"Error with JJ Food Service selector {selector}: {e}")
# Extract title
title_selectors = ['h1', '.product-title', '.product-name']
for selector in title_selectors:
try:
element = soup.select_one(selector)
@@ -96,61 +256,65 @@ class UKCateringScraper(PriceScraper):
except Exception as e:
logger.debug(f"Error with JJ Food Service title selector {selector}: {e}")
# Check availability
availability_indicators = [
'out of stock',
'unavailable',
'not available',
'temporarily unavailable'
]
page_text = soup.get_text().lower()
for indicator in availability_indicators:
if indicator in page_text:
result['availability'] = False
break
return result
def _extract_atoz_catering_data(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Extract data specifically from A to Z Catering."""
result = {
'price': None,
'title': None,
'availability': True,
'currency': 'GBP'
}
"""Extract data specifically from A to Z Catering - prioritize delivery pricing using regex parse."""
result = {'price': None, 'title': None, 'availability': True, 'currency': 'GBP'}
# First, attempt to parse delivery price directly from page text
page_text = soup.get_text(separator=' ')
delivery_match = re.search(r'Delivery:\s*£(\d{1,3}\.\d{2})', page_text)
if delivery_match:
price_val = float(delivery_match.group(1))
result['price'] = price_val
logger.info(f"A to Z Catering: Parsed delivery price £{price_val} via regex")
# extract title
title_el = soup.select_one('h1')
if title_el:
result['title'] = title_el.get_text(strip=True)
return result
# A to Z Catering specific selectors
price_selectors = [
'.price',
'.product-price',
'.delivery-price',
'.collection-price',
'span:contains("£")',
'.price-value',
'.cost',
'.selling-price'
]
for selector in price_selectors:
# 1) Delivery-specific selectors
for selector in ['.delivery-price', '.price-delivery']:
try:
elements = soup.select(selector)
for element in elements:
price_text = element.get_text(strip=True)
# Skip if it contains "delivery" or "collection" but no price
if ('delivery' in price_text.lower() or 'collection' in price_text.lower()) and '£' not in price_text:
continue
price = self._parse_uk_price(price_text)
text = element.get_text(strip=True)
price = self._parse_uk_price(text, prefer_delivery=True)
if price is not None:
result['price'] = price
logger.info(f"Successfully scraped atoz_catering: £{price}")
break
if result['price'] is not None:
break
logger.info(f"A to Z Catering: Found delivery price £{price} from {selector}")
return result
except Exception as e:
logger.debug(f"Error with A to Z price selector {selector}: {e}")
logger.debug(f"Error with A to Z delivery selector {selector}: {e}")
# 2) Main offer selector (fallback to collection price)
for selector in ['.my-price.price-offer']:
try:
elements = soup.select(selector)
for element in elements:
text = element.get_text(strip=True)
price = self._parse_uk_price(text)
if price is not None:
result['price'] = price
logger.info(f"A to Z Catering: Found collection price £{price} from {selector}")
return result
except Exception as e:
logger.debug(f"Error with A to Z main selector {selector}: {e}")
# 3) Fallback general selectors
for selector in ['.price', '.product-price']:
try:
elements = soup.select(selector)
for element in elements:
text = element.get_text(strip=True)
price = self._parse_uk_price(text)
if price is not None:
result['price'] = price
logger.info(f"A to Z Catering: Fallback parsed price £{price} from {selector}")
return result
except Exception as e:
logger.debug(f"Error with A to Z fallback selector {selector}: {e}")
# Extract title
title_selectors = [
@@ -197,7 +361,7 @@ class UKCateringScraper(PriceScraper):
return result
def _extract_amazon_uk_data(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Extract data specifically from Amazon UK."""
"""Extract data specifically from Amazon UK with enhanced special pricing detection."""
result = {
'price': None,
'title': None,
@@ -205,6 +369,15 @@ class UKCateringScraper(PriceScraper):
'currency': 'GBP'
}
# First, check for special offer prices using enhanced detection
special_prices = self._find_special_offer_prices(soup, 'amazon_uk')
if special_prices:
# Use the lowest special offer price found
best_special_price = min(price for price, _ in special_prices)
result['price'] = best_special_price
logger.info(f"Successfully scraped amazon_uk special offer price: £{best_special_price}")
return result
# Amazon UK price selectors
price_selectors = [
'.a-price-whole',
@@ -222,7 +395,7 @@ class UKCateringScraper(PriceScraper):
elements = soup.select(selector)
for element in elements:
price_text = element.get_text(strip=True)
price = self._parse_uk_price(price_text)
price = self._parse_uk_price(price_text, detect_special_offers=True, element=element)
if price is not None:
result['price'] = price
break
@@ -269,6 +442,122 @@ class UKCateringScraper(PriceScraper):
return result
def _extract_generic_data(self, soup: BeautifulSoup, site_name: str) -> Dict[str, Any]:
"""Generic data extraction for UK sites not specifically implemented."""
result = {
'price': None,
'title': None,
'availability': True,
'currency': 'GBP'
}
# Generic price selectors
price_selectors = [
'.price',
'.product-price',
'[data-testid="price"]',
'.price-value',
'.current-price',
'span:contains("£")',
'.cost',
'.selling-price'
]
for selector in price_selectors:
try:
elements = soup.select(selector)
for element in elements:
price_text = element.get_text(strip=True)
price = self._parse_uk_price(price_text)
if price is not None:
result['price'] = price
logger.info(f"Successfully scraped {site_name} generic price: £{price}")
break
if result['price'] is not None:
break
except Exception as e:
logger.debug(f"Error with generic price selector {selector}: {e}")
# Generic title selectors
title_selectors = [
'h1',
'.product-title',
'.product-name',
'[data-testid="product-title"]',
'title'
]
for selector in title_selectors:
try:
element = soup.select_one(selector)
if element:
result['title'] = element.get_text(strip=True)
break
except Exception as e:
logger.debug(f"Error with generic title selector {selector}: {e}")
return result
async def scrape_product_price(self, url: str, site_name: str = None) -> Dict[str, Any]:
"""Scrape price for a single product from a URL using UK-specific logic."""
result = {
'success': False,
'price': None,
'currency': 'GBP',
'title': None,
'availability': None,
'url': url,
'error': None
}
try:
# Validate that this is a supported UK site
if site_name not in ['jjfoodservice', 'atoz_catering', 'amazon_uk']:
result['error'] = f"Unsupported site for UK scraper: {site_name}"
return result
# Check if site is enabled
if not self.config.is_site_enabled(site_name):
result['error'] = f"Site {site_name} is disabled"
return result
# Fetch page content
html_content = await self._fetch_page(url)
if not html_content:
result['error'] = "Failed to fetch page content"
return result
# Parse HTML
soup = BeautifulSoup(html_content, 'html.parser')
# Route to appropriate extraction method
if site_name == 'jjfoodservice':
extracted_data = self._extract_jjfoodservice_data(soup)
elif site_name == 'atoz_catering':
extracted_data = self._extract_atoz_catering_data(soup)
elif site_name == 'amazon_uk':
extracted_data = self._extract_amazon_uk_data(soup)
else:
# Fallback to generic extraction
extracted_data = self._extract_generic_data(soup, site_name)
if extracted_data['price'] is not None:
result.update({
'success': True,
'price': extracted_data['price'],
'title': extracted_data.get('title'),
'availability': extracted_data.get('availability')
})
logger.info(f"Successfully scraped {site_name}: £{extracted_data['price']}")
else:
result['error'] = "Could not extract price from page"
except Exception as e:
logger.error(f"Error scraping {url}: {e}")
result['error'] = str(e)
return result
async def scrape_product(self, product_data: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
"""Scrape prices for a product from all configured sites."""
results = {}

View File

@@ -1,515 +0,0 @@
"""
Specialized scrapers for UK catering supply sites
"""
import re
import logging
from typing import Dict, Any, Optional
from bs4 import BeautifulSoup
from .scraper import PriceScraper
logger = logging.getLogger(__name__)
class UKCateringScraper(PriceScraper):
"""Specialized scraper for UK catering supply websites."""
def _parse_uk_price(self, price_text: str) -> Optional[float]:
"""Parse UK price format with £ symbol."""
if not price_text:
return None
# Remove common text and normalize
price_text = price_text.lower()
price_text = re.sub(r'delivery:|collection:|was:|now:|offer:|from:', '', price_text)
# Find price with £ symbol
price_match = re.search(r'£(\d+\.?\d*)', price_text)
if price_match:
try:
return float(price_match.group(1))
except ValueError:
pass
# Try without £ symbol but with decimal
price_match = re.search(r'(\d+\.\d{2})', price_text)
if price_match:
try:
return float(price_match.group(1))
except ValueError:
pass
return None
def _extract_jjfoodservice_data(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Extract data specifically from JJ Food Service."""
result = {
'price': None,
'title': None,
'availability': True,
'currency': 'GBP'
}
# Try multiple selectors for price
price_selectors = [
'.price',
'.product-price',
'[data-testid="price"]',
'.price-value',
'.current-price',
'.product-card-price',
'span:contains("£")',
'.cost'
]
for selector in price_selectors:
try:
elements = soup.select(selector)
for element in elements:
price_text = element.get_text(strip=True)
price = self._parse_uk_price(price_text)
if price is not None:
result['price'] = price
break
if result['price'] is not None:
break
except Exception as e:
logger.debug(f"Error with JJ Food Service price selector {selector}: {e}")
# Try to extract title
title_selectors = [
'h1',
'.product-title',
'.product-name',
'[data-testid="product-title"]',
'.product-card-title',
'title'
]
for selector in title_selectors:
try:
element = soup.select_one(selector)
if element:
result['title'] = element.get_text(strip=True)
break
except Exception as e:
logger.debug(f"Error with JJ Food Service title selector {selector}: {e}")
# Check availability
availability_indicators = [
'out of stock',
'unavailable',
'not available',
'sold out'
]
page_text = soup.get_text().lower()
for indicator in availability_indicators:
if indicator in page_text:
result['availability'] = False
break
return result
def _extract_atoz_data(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Extract data specifically from A to Z Catering."""
result = {
'price': None,
'title': None,
'availability': True,
'currency': 'GBP'
}
# A to Z Catering shows prices like "Delivery:£X.XX Collection:£Y.YY"
# We'll prioritize the lower price (usually collection)
price_text = soup.get_text()
# Look for delivery and collection prices
delivery_match = re.search(r'delivery:?\s*£(\d+\.?\d*)', price_text, re.IGNORECASE)
collection_match = re.search(r'collection:?\s*£(\d+\.?\d*)', price_text, re.IGNORECASE)
prices = []
if delivery_match:
try:
prices.append(float(delivery_match.group(1)))
except ValueError:
pass
if collection_match:
try:
prices.append(float(collection_match.group(1)))
except ValueError:
pass
# If we found prices, use the lowest one
if prices:
result['price'] = min(prices)
else:
# Fallback to general price extraction
price_selectors = [
'.price',
'.product-price',
'span:contains("£")',
'.price-value'
]
for selector in price_selectors:
try:
elements = soup.select(selector)
for element in elements:
price_text = element.get_text(strip=True)
price = self._parse_uk_price(price_text)
if price is not None:
result['price'] = price
break
if result['price'] is not None:
break
except Exception as e:
logger.debug(f"Error with A to Z price selector {selector}: {e}")
# Extract title - A to Z often has product names in links
title_selectors = [
'h1',
'.product-title',
'.product-name',
'a[href*="/products/product/"]',
'.product-link',
'title'
]
for selector in title_selectors:
try:
element = soup.select_one(selector)
if element:
title = element.get_text(strip=True)
# Clean up the title
if len(title) > 5 and 'A to Z' not in title:
result['title'] = title
break
except Exception as e:
logger.debug(f"Error with A to Z title selector {selector}: {e}")
# Check availability - look for "Add To Basket" button
add_to_basket = soup.find(text=re.compile('Add To Basket', re.IGNORECASE))
if not add_to_basket:
# Also check for out of stock indicators
out_of_stock_indicators = [
'out of stock',
'unavailable',
'not available',
'sold out'
]
page_text = soup.get_text().lower()
for indicator in out_of_stock_indicators:
if indicator in page_text:
result['availability'] = False
break
return result
def _extract_amazon_uk_data(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Extract data specifically from Amazon UK."""
result = {
'price': None,
'title': None,
'availability': True,
'currency': 'GBP'
}
# Amazon UK price selectors
price_selectors = [
'.a-price-whole',
'.a-price .a-offscreen',
'.a-price-current .a-offscreen',
'#priceblock_dealprice',
'#priceblock_ourprice',
'.a-price-range',
'.a-price.a-text-price.a-size-medium.apexPriceToPay .a-offscreen'
]
for selector in price_selectors:
try:
elements = soup.select(selector)
for element in elements:
price_text = element.get_text(strip=True)
price = self._parse_uk_price(price_text)
if price is not None:
result['price'] = price
break
if result['price'] is not None:
break
except Exception as e:
logger.debug(f"Error with Amazon UK price selector {selector}: {e}")
# Extract title
title_selectors = [
'#productTitle',
'.product-title',
'h1.a-size-large'
]
for selector in title_selectors:
try:
element = soup.select_one(selector)
if element:
result['title'] = element.get_text(strip=True)
break
except Exception as e:
logger.debug(f"Error with Amazon UK title selector {selector}: {e}")
# Check availability
availability_text = soup.get_text().lower()
if any(phrase in availability_text for phrase in ['out of stock', 'currently unavailable', 'not available']):
result['availability'] = False
return result
def _extract_tesco_data(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Extract data specifically from Tesco."""
result = {
'price': None,
'title': None,
'availability': True,
'currency': 'GBP'
}
# Tesco price selectors
price_selectors = [
'.price-control-wrapper .value',
'.price-per-sellable-unit .value',
'.price-per-quantity-weight .value',
'[data-testid="price-current-value"]',
'.price-current',
'.product-price .price'
]
for selector in price_selectors:
try:
elements = soup.select(selector)
for element in elements:
price_text = element.get_text(strip=True)
price = self._parse_uk_price(price_text)
if price is not None:
result['price'] = price
break
if result['price'] is not None:
break
except Exception as e:
logger.debug(f"Error with Tesco price selector {selector}: {e}")
# Extract title
title_selectors = [
'h1[data-testid="product-title"]',
'.product-details-tile h1',
'.product-title',
'h1.product-name'
]
for selector in title_selectors:
try:
element = soup.select_one(selector)
if element:
result['title'] = element.get_text(strip=True)
break
except Exception as e:
logger.debug(f"Error with Tesco title selector {selector}: {e}")
return result
def _extract_sainsburys_data(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Extract data specifically from Sainsburys."""
result = {
'price': None,
'title': None,
'availability': True,
'currency': 'GBP'
}
# Sainsburys price selectors
price_selectors = [
'.pd__cost__current-price',
'.pd__cost .pd__cost__retail-price',
'.pricing__now-price',
'.product-price__current',
'[data-testid="pd-retail-price"]',
'.price-per-unit'
]
for selector in price_selectors:
try:
elements = soup.select(selector)
for element in elements:
price_text = element.get_text(strip=True)
price = self._parse_uk_price(price_text)
if price is not None:
result['price'] = price
break
if result['price'] is not None:
break
except Exception as e:
logger.debug(f"Error with Sainsburys price selector {selector}: {e}")
# Extract title
title_selectors = [
'.pd__header h1',
'h1[data-testid="pd-product-name"]',
'.product-name',
'.pd__product-name'
]
for selector in title_selectors:
try:
element = soup.select_one(selector)
if element:
result['title'] = element.get_text(strip=True)
break
except Exception as e:
logger.debug(f"Error with Sainsburys title selector {selector}: {e}")
return result
def _extract_booker_data(self, soup: BeautifulSoup) -> Dict[str, Any]:
"""Extract data specifically from Booker."""
result = {
'price': None,
'title': None,
'availability': True,
'currency': 'GBP'
}
# Booker price selectors
price_selectors = [
'.price',
'.product-price',
'.price-current',
'.selling-price',
'[data-testid="price"]',
'.product-tile-price'
]
for selector in price_selectors:
try:
elements = soup.select(selector)
for element in elements:
price_text = element.get_text(strip=True)
price = self._parse_uk_price(price_text)
if price is not None:
result['price'] = price
break
if result['price'] is not None:
break
except Exception as e:
logger.debug(f"Error with Booker price selector {selector}: {e}")
# Extract title
title_selectors = [
'h1',
'.product-title',
'.product-name',
'.product-description h1',
'[data-testid="product-title"]'
]
for selector in title_selectors:
try:
element = soup.select_one(selector)
if element:
result['title'] = element.get_text(strip=True)
break
except Exception as e:
logger.debug(f"Error with Booker title selector {selector}: {e}")
return result
async def scrape_product_price(self, url: str, site_name: str = None) -> Dict[str, Any]:
"""Enhanced scraping for UK catering sites."""
result = {
'success': False,
'price': None,
'currency': 'GBP',
'title': None,
'availability': None,
'url': url,
'error': None
}
try:
# Auto-detect site if not provided
if not site_name:
site_name = self._detect_site(url)
if not site_name:
result['error'] = "Could not detect site from URL"
return result
# Check if site is enabled
if not self.config.is_site_enabled(site_name):
result['error'] = f"Site {site_name} is disabled"
return result
# Fetch page content
html_content = await self._fetch_page(url)
if not html_content:
result['error'] = "Failed to fetch page content"
return result
# Parse HTML
soup = BeautifulSoup(html_content, 'html.parser')
# Use specialized extraction based on site
if site_name == 'jjfoodservice':
extracted_data = self._extract_jjfoodservice_data(soup)
elif site_name == 'atoz_catering':
extracted_data = self._extract_atoz_data(soup)
elif site_name == 'amazon_uk':
extracted_data = self._extract_amazon_uk_data(soup)
elif site_name == 'tesco':
extracted_data = self._extract_tesco_data(soup)
elif site_name == 'sainsburys':
extracted_data = self._extract_sainsburys_data(soup)
elif site_name == 'booker':
extracted_data = self._extract_booker_data(soup)
else:
# Fall back to general extraction
return await super().scrape_product_price(url, site_name)
if extracted_data['price'] is None:
result['error'] = "Could not extract price from page"
return result
result.update({
'success': True,
'price': extracted_data['price'],
'currency': extracted_data.get('currency', 'GBP'),
'title': extracted_data.get('title'),
'availability': extracted_data.get('availability', True)
})
logger.info(f"Successfully scraped {site_name}: £{extracted_data['price']}")
except Exception as e:
logger.error(f"Error scraping {url}: {e}")
result['error'] = str(e)
return result
def _detect_site(self, url: str) -> Optional[str]:
"""Detect which UK catering site this URL belongs to."""
url_lower = url.lower()
if 'jjfoodservice.com' in url_lower:
return 'jjfoodservice'
elif 'atoz-catering.co.uk' in url_lower:
return 'atoz_catering'
elif 'amazon.co.uk' in url_lower:
return 'amazon_uk'
elif 'tesco.com' in url_lower:
return 'tesco'
elif 'sainsburys.co.uk' in url_lower:
return 'sainsburys'
elif 'booker.co.uk' in url_lower:
return 'booker'
# Fall back to parent detection for other sites
return super()._detect_site(url)

View File

@@ -268,4 +268,70 @@ def create_app():
fig = go.Figure(data=traces, layout=layout)
return json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder)
@app.route('/edit_product/<int:product_id>', methods=['GET', 'POST'])
def edit_product(product_id):
"""Edit an existing product."""
product = db_manager.get_product(product_id)
if not product:
flash('Product not found.', 'error')
return redirect(url_for('index'))
form = ProductForm()
if form.validate_on_submit():
urls = {}
if form.jjfoodservice_url.data:
urls['jjfoodservice'] = form.jjfoodservice_url.data
if form.atoz_catering_url.data:
urls['atoz_catering'] = form.atoz_catering_url.data
if form.amazon_uk_url.data:
urls['amazon_uk'] = form.amazon_uk_url.data
if not urls:
flash('Please provide at least one URL to track.', 'error')
return render_template('edit_product.html', form=form, product=product)
try:
db_manager.update_product(
product_id=product_id,
name=form.name.data,
description=form.description.data,
target_price=form.target_price.data,
urls=urls
)
flash(f'Product "{form.name.data}" updated successfully!', 'success')
return redirect(url_for('product_detail', product_id=product_id))
except Exception as e:
flash(f'Error updating product: {str(e)}', 'error')
# Pre-populate form with existing data
if request.method == 'GET':
form.name.data = product['name']
form.description.data = product['description']
form.target_price.data = product['target_price']
# URLs are already parsed as a dictionary by the database method
urls = product['urls'] if product['urls'] else {}
form.jjfoodservice_url.data = urls.get('jjfoodservice', '')
form.atoz_catering_url.data = urls.get('atoz_catering', '')
form.amazon_uk_url.data = urls.get('amazon_uk', '')
return render_template('edit_product.html', form=form, product=product)
@app.route('/delete_product/<int:product_id>', methods=['POST'])
def delete_product(product_id):
"""Delete a product."""
product = db_manager.get_product(product_id)
if not product:
flash('Product not found.', 'error')
return redirect(url_for('index'))
try:
db_manager.delete_product(product_id)
flash(f'Product "{product["name"]}" deleted successfully!', 'success')
except Exception as e:
flash(f'Error deleting product: {str(e)}', 'error')
return redirect(url_for('index'))
return app