diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..4621e4c --- /dev/null +++ b/.dockerignore @@ -0,0 +1,77 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# Virtual environments +venv/ +env/ +ENV/ +.venv/ + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db + +# Git +.git/ +.gitignore + +# Logs +*.log +logs/ + +# Database files (will be mounted as volume) +*.db +*.sqlite +*.sqlite3 + +# Temporary files +*.tmp +*.temp +.cache/ + +# Documentation +README.md +*.md + +# Docker +Dockerfile* +docker-compose*.yml +.dockerignore + +# Scripts +setup.sh +scripts/ + +# Examples +examples/ + +# Old files +*_old.py +*.bak diff --git a/.gitignore b/.gitignore index 7b004e5..96ac300 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,7 @@ parts/ sdist/ var/ wheels/ +pip-wheel-metadata/ share/python-wheels/ *.egg-info/ .installed.cfg @@ -49,7 +50,6 @@ coverage.xml *.py,cover .hypothesis/ .pytest_cache/ -cover/ # Translations *.mo @@ -72,7 +72,6 @@ instance/ docs/_build/ # PyBuilder -.pybuilder/ target/ # Jupyter Notebook @@ -83,9 +82,7 @@ profile_default/ ipython_config.py # pyenv -# For a library or package, you might want to ignore these files since the code is -# intended to run in multiple environments; otherwise, check them in: -# .python-version +.python-version # pipenv # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. @@ -94,30 +91,7 @@ ipython_config.py # install all needed dependencies. #Pipfile.lock -# UV -# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -#uv.lock - -# poetry -# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control -#poetry.lock - -# pdm -# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. -#pdm.lock -# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it -# in version control. -# https://pdm.fming.dev/latest/usage/project/#working-with-version-control -.pdm.toml -.pdm-python -.pdm-build/ - -# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +# PEP 582; used by e.g. github.com/David-OConnor/pyflow __pypackages__/ # Celery stuff @@ -154,41 +128,9 @@ dmypy.json # Pyre type checker .pyre/ -# pytype static type analyzer -.pytype/ - -# Cython debug symbols -cython_debug/ - -# PyCharm -# JetBrains specific template is maintained in a separate JetBrains.gitignore that can -# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore -# and can be added to the global gitignore or merged into this file. For a more nuclear -# option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ - -# Abstra -# Abstra is an AI-powered process automation framework. -# Ignore directories containing user credentials, local state, and settings. -# Learn more at https://abstra.io/docs -.abstra/ - -# Visual Studio Code -# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore -# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore -# and can be added to the global gitignore or merged into this file. However, if you prefer, -# you could uncomment the following to ignore the enitre vscode folder -# .vscode/ - -# Ruff stuff: -.ruff_cache/ - -# PyPI configuration file -.pypirc - -# Cursor -# Cursor is an AI-powered code editor. `.cursorignore` specifies files/directories to -# exclude from AI features like autocomplete and code analysis. Recommended for sensitive data -# refer to https://docs.cursor.com/context/ignore-files -.cursorignore -.cursorindexingignore \ No newline at end of file +# Price Tracker specific +price_tracker.db +*.log +config-local.json +.vscode/ +.idea/ diff --git a/DOCKER.md b/DOCKER.md new file mode 100644 index 0000000..d1734f1 --- /dev/null +++ b/DOCKER.md @@ -0,0 +1,208 @@ +# Price Tracker - Docker Deployment + +This guide covers how to build, deploy, and run the Price Tracker application using Docker. + +## Quick Start with Docker + +### 1. Build the Image + +```bash +# Build with default tag +./build.sh + +# Build with specific tag +./build.sh v1.0.0 + +# Build and tag for your registry +./build.sh latest your-registry.com +``` + +### 2. Run with Docker Compose (Recommended) + +```bash +# Start the application +docker-compose up -d + +# View logs +docker-compose logs -f + +# Stop the application +docker-compose down +``` + +### 3. Manual Docker Run + +```bash +# Create directories for persistence +mkdir -p data logs + +# Run the container +docker run -d \ + --name price-tracker \ + --restart unless-stopped \ + -p 5000:5000 \ + -v $(pwd)/data:/app/data \ + -v $(pwd)/logs:/app/logs \ + -v $(pwd)/config.json:/app/config.json:ro \ + -e FLASK_ENV=production \ + price-tracker:latest +``` + +## Registry Deployment + +### Push to Registry + +```bash +# Tag for your registry +docker tag price-tracker:latest your-registry.com/price-tracker:latest + +# Push to registry +docker push your-registry.com/price-tracker:latest +``` + +### Deploy from Registry + +```bash +# Deploy using script +./deploy.sh latest your-registry.com + +# Or manually +docker pull your-registry.com/price-tracker:latest +docker run -d \ + --name price-tracker \ + --restart unless-stopped \ + -p 5000:5000 \ + -v $(pwd)/data:/app/data \ + -v $(pwd)/logs:/app/logs \ + -v $(pwd)/config.json:/app/config.json:ro \ + -e FLASK_ENV=production \ + your-registry.com/price-tracker:latest +``` + +## Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `FLASK_HOST` | `0.0.0.0` | Host to bind the Flask server | +| `FLASK_PORT` | `5000` | Port to bind the Flask server | +| `FLASK_ENV` | `production` | Flask environment (production/development) | +| `PYTHONUNBUFFERED` | `1` | Enable unbuffered Python output | + +## Volumes + +| Container Path | Description | +|----------------|-------------| +| `/app/data` | Database and persistent data | +| `/app/logs` | Application logs | +| `/app/config.json` | Configuration file (read-only) | + +## Health Check + +The container includes a health check that verifies the application is responding on port 5000. + +```bash +# Check container health +docker ps + +# View health check logs +docker inspect price-tracker | grep -A 10 Health +``` + +## Monitoring + +### View Logs + +```bash +# Real-time logs +docker logs -f price-tracker + +# Last 100 lines +docker logs --tail 100 price-tracker + +# With docker-compose +docker-compose logs -f +``` + +### Container Stats + +```bash +# Resource usage +docker stats price-tracker + +# Container information +docker inspect price-tracker +``` + +## Troubleshooting + +### Container Won't Start + +1. Check logs: `docker logs price-tracker` +2. Verify config file exists and is valid JSON +3. Ensure data and logs directories exist with correct permissions + +### Application Not Accessible + +1. Verify port mapping: `docker ps` +2. Check firewall settings +3. Verify container is healthy: `docker ps` (should show "healthy") + +### Database Issues + +1. Check if data directory is properly mounted +2. Verify database file permissions +3. Check logs for database errors + +## Production Considerations + +### Security + +- Run container as non-root user (already configured) +- Use read-only config file mount +- Consider running behind a reverse proxy (nginx, traefik) +- Set up proper firewall rules + +### Performance + +- Allocate sufficient memory for scraping operations +- Consider scaling with multiple instances behind a load balancer +- Monitor resource usage and adjust limits as needed + +### Backup + +```bash +# Backup data directory +tar -czf price-tracker-backup-$(date +%Y%m%d).tar.gz data/ + +# Restore backup +tar -xzf price-tracker-backup-YYYYMMDD.tar.gz +``` + +## Development + +### Local Development with Docker + +```bash +# Build development image +docker build -t price-tracker:dev . + +# Run with development settings +docker run -it --rm \ + -p 5000:5000 \ + -v $(pwd):/app \ + -e FLASK_ENV=development \ + price-tracker:dev +``` + +### Debugging + +```bash +# Run container with bash shell +docker run -it --rm \ + -v $(pwd):/app \ + price-tracker:latest \ + /bin/bash + +# Execute commands in running container +docker exec -it price-tracker /bin/bash +``` diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..e5e8a3f --- /dev/null +++ b/Dockerfile @@ -0,0 +1,48 @@ +# Use Python 3.11 slim image for smaller size +FROM python:3.11-slim + +# Set working directory +WORKDIR /app + +# Set environment variables +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + FLASK_APP=main.py \ + FLASK_ENV=production + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + gcc \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements first for better caching +COPY requirements.txt . + +# Install Python dependencies +RUN pip install --no-cache-dir -r requirements.txt + +# Create non-root user for security +RUN useradd --create-home --shell /bin/bash tracker && \ + chown -R tracker:tracker /app + +# Copy application code +COPY . . + +# Create necessary directories +RUN mkdir -p /app/logs && \ + mkdir -p /app/data && \ + chown -R tracker:tracker /app + +# Switch to non-root user +USER tracker + +# Expose port +EXPOSE 5000 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ + CMD curl -f http://localhost:5000/ || exit 1 + +# Run the application +CMD ["python", "main.py"] diff --git a/README.md b/README.md index 97fe051..882f661 100644 --- a/README.md +++ b/README.md @@ -1 +1,205 @@ -# price-tracker \ No newline at end of file +# Price Tracker 🛒 + +A comprehensive web scraper for tracking product prices across multiple e-commerce sites. Built with Python, Beautiful Soup, and Flask. + +## Features ✨ + +- **Multi-site Price Tracking**: Monitor prices across Amazon, eBay, Walmart, and more +- **Beautiful Web UI**: Clean, responsive interface for managing products and viewing price history +- **Price Alerts**: Get notified when products reach your target price +- **Historical Data**: View price trends with interactive charts +- **Automated Scraping**: Schedule regular price checks +- **Multiple Notifications**: Email and webhook notifications +- **Robust Scraping**: Built-in retry logic, rotating user agents, and rate limiting + +## Quick Start 🚀 + +1. **Clone and Setup**: + ```bash + git clone + cd price-tracker + chmod +x setup.sh + ./setup.sh + ``` + +2. **Start the Web UI**: + ```bash + source venv/bin/activate + python main.py --mode web + ``` + +3. **Visit**: http://localhost:5000 + +## Usage 📋 + +### Web Interface + +The web interface provides: +- **Dashboard**: Overview of all tracked products with current prices +- **Add Products**: Easy form to add new products with URLs from multiple sites +- **Product Details**: Detailed view with price history charts and statistics +- **Settings**: Configuration management and system health checks + +### Command Line + +```bash +# Start web UI +python main.py --mode web + +# Run scraping once +python main.py --mode scrape + +# Add sample products for testing +python examples/add_sample_products.py + +# Scheduled scraping (for cron jobs) +python scripts/scheduled_scraping.py +``` + +### Scheduled Scraping + +Add to your crontab for automatic price checks: +```bash +# Every 6 hours +0 */6 * * * cd /path/to/price-tracker && source venv/bin/activate && python scripts/scheduled_scraping.py + +# Daily at 8 AM +0 8 * * * cd /path/to/price-tracker && source venv/bin/activate && python scripts/scheduled_scraping.py +``` + +## Configuration ⚙️ + +Edit `config.json` to customize: + +### Scraping Settings +```json +{ + "scraping": { + "delay_between_requests": 2, + "max_concurrent_requests": 5, + "timeout": 30, + "retry_attempts": 3 + } +} +``` + +### Email Notifications +```json +{ + "notifications": { + "email": { + "enabled": true, + "smtp_server": "smtp.gmail.com", + "smtp_port": 587, + "sender_email": "your-email@gmail.com", + "sender_password": "your-app-password", + "recipient_email": "alerts@yourdomain.com" + } + } +} +``` + +### Adding New Sites + +Add new e-commerce sites by extending the sites configuration: + +```json +{ + "sites": { + "your_site": { + "enabled": true, + "base_url": "https://www.yoursite.com", + "selectors": { + "price": [".price", ".cost"], + "title": [".product-title"], + "availability": [".stock-status"] + } + } + } +} +``` + +## Architecture 🏗️ + +- **`main.py`**: Application entry point +- **`src/config.py`**: Configuration management +- **`src/database.py`**: SQLite database operations +- **`src/scraper.py`**: Core scraping logic with Beautiful Soup +- **`src/scraper_manager.py`**: Scraping coordination and task management +- **`src/notification.py`**: Email and webhook notifications +- **`src/web_ui.py`**: Flask web interface +- **`templates/`**: HTML templates with Bootstrap styling + +## Features in Detail 🔍 + +### Smart Price Extraction +- Multiple CSS selectors per site for robust price detection +- Handles various price formats and currencies +- Availability detection (in stock/out of stock) +- Automatic retry with exponential backoff + +### Data Storage +- SQLite database for price history +- Product management with URLs and target prices +- Price statistics and trend analysis + +### Web Interface +- Responsive design with Bootstrap 5 +- Interactive price charts with Plotly +- Real-time scraping from the UI +- Product comparison and best price highlighting + +### Notifications +- Email alerts when target prices are reached +- Webhook integration for custom notifications +- Rich HTML email templates +- Test notification functionality + +## Tips for Best Results 📈 + +1. **Respectful Scraping**: The tool includes delays and rate limiting to be respectful to websites +2. **URL Selection**: Use direct product page URLs, not search results or category pages +3. **Target Prices**: Set realistic target prices based on historical data +4. **Multiple Sites**: Track the same product on multiple sites for best deals +5. **Regular Updates**: Run scraping regularly but not too frequently (every few hours is good) + +## Troubleshooting 🔧 + +### Common Issues + +1. **No prices found**: Check if the CSS selectors are correct for the site +2. **403/429 errors**: Sites may be blocking requests - try different user agents or increase delays +3. **Database errors**: Ensure the database file is writable +4. **Email not working**: Verify SMTP settings and app passwords for Gmail + +### Adding Debug Information + +Enable debug logging by modifying the logging level in `main.py`: +```python +logging.basicConfig(level=logging.DEBUG) +``` + +## Legal and Ethical Considerations ⚖️ + +- Respect robots.txt files +- Don't overload servers with too many requests +- Use for personal/educational purposes +- Check terms of service for each site +- Be mindful of rate limits + +## Contributing 🤝 + +Feel free to contribute by: +- Adding support for new e-commerce sites +- Improving CSS selectors for existing sites +- Adding new notification methods +- Enhancing the web UI +- Fixing bugs and improving performance + +## License 📄 + +This project is for educational purposes. Please review the terms of service of websites you scrape and use responsibly. + +--- + +**Happy price tracking! 🛍️** diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..2ca4a8a --- /dev/null +++ b/build.sh @@ -0,0 +1,37 @@ +#!/bin/bash + +# Build script for Price Tracker Docker container + +set -e + +# Configuration +IMAGE_NAME="price-tracker" +TAG="${1:-latest}" +REGISTRY="${2:-your-registry.com}" # Replace with your actual registry + +echo "Building Price Tracker Docker image..." + +# Build the Docker image +docker build -t "${IMAGE_NAME}:${TAG}" . + +# Tag for registry if provided +if [ "$REGISTRY" != "your-registry.com" ]; then + docker tag "${IMAGE_NAME}:${TAG}" "${REGISTRY}/${IMAGE_NAME}:${TAG}" + echo "Tagged image as ${REGISTRY}/${IMAGE_NAME}:${TAG}" +fi + +echo "Build completed successfully!" +echo "Image: ${IMAGE_NAME}:${TAG}" + +# Display image info +docker images | grep "${IMAGE_NAME}" + +echo "" +echo "To run locally:" +echo " docker run -p 5000:5000 ${IMAGE_NAME}:${TAG}" +echo "" +echo "To push to registry:" +echo " docker push ${REGISTRY}/${IMAGE_NAME}:${TAG}" +echo "" +echo "To run with docker-compose:" +echo " docker-compose up -d" diff --git a/config.json b/config.json new file mode 100644 index 0000000..ea4da65 --- /dev/null +++ b/config.json @@ -0,0 +1,113 @@ +{ + "database": { + "path": "price_tracker.db" + }, + "scraping": { + "delay_between_requests": 2, + "max_concurrent_requests": 1, + "timeout": 30, + "retry_attempts": 3, + "user_agents": [ + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" + ] + }, + "notifications": { + "email": { + "enabled": false, + "smtp_server": "smtp.gmail.com", + "smtp_port": 587, + "sender_email": "", + "sender_password": "", + "recipient_email": "" + }, + "webhook": { + "enabled": false, + "url": "" + } + }, + "sites": { + "jjfoodservice": { + "enabled": true, + "base_url": "https://www.jjfoodservice.com", + "selectors": { + "price": [ + ".price", + ".product-price", + "[data-testid='price']", + ".price-value", + ".current-price", + ".product-card-price" + ], + "title": [ + "h1", + ".product-title", + ".product-name", + "[data-testid='product-title']", + ".product-card-title" + ], + "availability": [ + ".stock-status", + ".availability", + "[data-testid='availability']", + ".product-availability" + ] + } + }, + "atoz_catering": { + "enabled": true, + "base_url": "https://www.atoz-catering.co.uk", + "selectors": { + "price": [ + ".price", + ".product-price", + ".delivery-price", + ".collection-price", + "span:contains('£')", + ".price-value" + ], + "title": [ + "h1", + ".product-title", + ".product-name", + "a[href*='/products/product/']", + ".product-link" + ], + "availability": [ + ".stock-status", + ".availability", + ".add-to-basket", + "button:contains('Add To Basket')", + ".out-of-stock" + ] + } + }, + "amazon_uk": { + "enabled": true, + "base_url": "https://www.amazon.co.uk", + "selectors": { + "price": [ + ".a-price-whole", + ".a-price .a-offscreen", + "#priceblock_dealprice", + "#priceblock_ourprice", + ".a-price-range", + ".a-price.a-text-price.a-size-medium.apexPriceToPay", + ".a-price-current" + ], + "title": [ + "#productTitle", + ".product-title", + "h1.a-size-large" + ], + "availability": [ + "#availability span", + ".a-size-medium.a-color-success", + ".a-size-medium.a-color-state", + "#availability .a-declarative" + ] + } + } + } +} diff --git a/deploy.sh b/deploy.sh new file mode 100755 index 0000000..5c1759f --- /dev/null +++ b/deploy.sh @@ -0,0 +1,50 @@ +#!/bin/bash + +# Deployment script for Price Tracker + +set -e + +# Configuration +IMAGE_NAME="price-tracker" +TAG="${1:-latest}" +REGISTRY="${2:-your-registry.com}" # Replace with your actual registry +CONTAINER_NAME="price-tracker" + +echo "Deploying Price Tracker..." + +# Pull latest image if using registry +if [ "$REGISTRY" != "your-registry.com" ]; then + echo "Pulling latest image from registry..." + docker pull "${REGISTRY}/${IMAGE_NAME}:${TAG}" +fi + +# Stop and remove existing container if it exists +if docker ps -a | grep -q "${CONTAINER_NAME}"; then + echo "Stopping existing container..." + docker stop "${CONTAINER_NAME}" || true + docker rm "${CONTAINER_NAME}" || true +fi + +# Create data and logs directories if they don't exist +mkdir -p ./data ./logs + +# Run the container +echo "Starting new container..." +docker run -d \ + --name "${CONTAINER_NAME}" \ + --restart unless-stopped \ + -p 5000:5000 \ + -v "$(pwd)/data:/app/data" \ + -v "$(pwd)/logs:/app/logs" \ + -v "$(pwd)/config.json:/app/config.json:ro" \ + -e FLASK_ENV=production \ + "${REGISTRY}/${IMAGE_NAME}:${TAG}" + +echo "Container started successfully!" +echo "Access the application at: http://localhost:5000" +echo "" +echo "To view logs:" +echo " docker logs -f ${CONTAINER_NAME}" +echo "" +echo "To stop the container:" +echo " docker stop ${CONTAINER_NAME}" diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..dc1dd54 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,34 @@ +version: '3.8' + +services: + price-tracker: + build: . + container_name: price-tracker + restart: unless-stopped + ports: + - "5000:5000" + environment: + - FLASK_ENV=production + - PYTHONUNBUFFERED=1 + volumes: + # Mount database and logs for persistence + - ./data:/app/data + - ./logs:/app/logs + # Mount config for easy updates + - ./config.json:/app/config.json:ro + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:5000/"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 40s + networks: + - price-tracker-network + +networks: + price-tracker-network: + driver: bridge + +volumes: + price-tracker-data: + price-tracker-logs: diff --git a/examples/add_sample_products.py b/examples/add_sample_products.py new file mode 100644 index 0000000..4ed7a5f --- /dev/null +++ b/examples/add_sample_products.py @@ -0,0 +1,85 @@ +""" +Example script to add sample products for testing +""" + +import sys +import os +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from src.database import DatabaseManager +from src.config import Config + +def add_sample_products(): + """Add some sample products for testing.""" + config = Config() + db_manager = DatabaseManager(config.database_path) + + # Sample products with real URLs (for demonstration) + sample_products = [ + { + 'name': 'AirPods Pro (2nd Generation)', + 'description': 'Apple AirPods Pro with Active Noise Cancellation', + 'target_price': 200.00, + 'urls': { + 'amazon': 'https://www.amazon.com/dp/B0BDHWDR12', + 'walmart': 'https://www.walmart.com/ip/AirPods-Pro-2nd-generation/1952646965' + } + }, + { + 'name': 'Sony WH-1000XM4 Headphones', + 'description': 'Wireless Noise Canceling Over-Ear Headphones', + 'target_price': 250.00, + 'urls': { + 'amazon': 'https://www.amazon.com/dp/B0863TXGM3', + 'ebay': 'https://www.ebay.com/itm/Sony-WH-1000XM4-Wireless-Headphones/324298765234' + } + }, + { + 'name': 'iPad Air (5th Generation)', + 'description': '10.9-inch iPad Air with M1 Chip, 64GB', + 'target_price': 500.00, + 'urls': { + 'amazon': 'https://www.amazon.com/dp/B09V3HN1KC', + 'walmart': 'https://www.walmart.com/ip/iPad-Air-5th-Gen/612825603' + } + }, + { + 'name': 'Nintendo Switch OLED', + 'description': 'Nintendo Switch OLED Model Gaming Console', + 'target_price': 300.00, + 'urls': { + 'amazon': 'https://www.amazon.com/dp/B098RKWHHZ', + 'walmart': 'https://www.walmart.com/ip/Nintendo-Switch-OLED/910582148' + } + }, + { + 'name': 'Samsung 55" 4K Smart TV', + 'description': 'Samsung 55-inch Crystal UHD 4K Smart TV', + 'target_price': 400.00, + 'urls': { + 'amazon': 'https://www.amazon.com/dp/B08T6F5H1Y', + 'walmart': 'https://www.walmart.com/ip/Samsung-55-Class-4K-Crystal-UHD/485926403' + } + } + ] + + print("Adding sample products...") + + for product_data in sample_products: + try: + product_id = db_manager.add_product( + name=product_data['name'], + description=product_data['description'], + target_price=product_data['target_price'], + urls=product_data['urls'] + ) + print(f"✓ Added: {product_data['name']} (ID: {product_id})") + except Exception as e: + print(f"✗ Failed to add {product_data['name']}: {e}") + + print("\nSample products added successfully!") + print("You can now run the web UI with: python main.py --mode web") + print("Or start scraping with: python main.py --mode scrape") + +if __name__ == "__main__": + add_sample_products() diff --git a/examples/add_uk_catering_products.py b/examples/add_uk_catering_products.py new file mode 100644 index 0000000..b915cb2 --- /dev/null +++ b/examples/add_uk_catering_products.py @@ -0,0 +1,99 @@ +""" +Example script to add UK catering sample products for testing +""" + +import sys +import os +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from src.database import DatabaseManager +from src.config import Config + +def add_uk_catering_products(): + """Add some sample UK catering products for testing.""" + config = Config() + db_manager = DatabaseManager(config.database_path) + + # Sample UK catering products with example URLs + # Note: These are example URLs - you'll need to replace with real product URLs + sample_products = [ + { + 'name': 'McCain Straight Cut Oven Chips 2.5kg', + 'description': 'Frozen straight cut oven chips for catering use', + 'target_price': 4.50, + 'urls': { + 'jjfoodservice': 'https://www.jjfoodservice.com/products/mccain-straight-cut-oven-chips', + 'atoz_catering': 'https://www.atoz-catering.co.uk/products/product/mccain-straight-cut-oven-chips-25kg' + } + }, + { + 'name': 'Heinz Baked Beans 6x2.62kg', + 'description': 'Catering size baked beans in tomato sauce', + 'target_price': 25.00, + 'urls': { + 'atoz_catering': 'https://www.atoz-catering.co.uk/products/product/heinz-baked-beans--6x262kg', + 'jjfoodservice': 'https://www.jjfoodservice.com/products/heinz-baked-beans-catering' + } + }, + { + 'name': 'Chef Select Chicken Breast Fillets 2kg', + 'description': 'Fresh chicken breast fillets for professional kitchens', + 'target_price': 12.00, + 'urls': { + 'jjfoodservice': 'https://www.jjfoodservice.com/products/chicken-breast-fillets-2kg', + 'atoz_catering': 'https://www.atoz-catering.co.uk/products/product/chicken-breast-fillets-2kg' + } + }, + { + 'name': 'Whole Milk 2 Litre Bottles (Case of 6)', + 'description': 'Fresh whole milk in 2L bottles for catering', + 'target_price': 8.00, + 'urls': { + 'atoz_catering': 'https://www.atoz-catering.co.uk/products/product/cotteswold-whole-milk-1x2lt-blue', + 'jjfoodservice': 'https://www.jjfoodservice.com/products/whole-milk-2l-case' + } + }, + { + 'name': 'Vegetable Oil 20L Container', + 'description': 'Catering vegetable oil for deep frying and cooking', + 'target_price': 35.00, + 'urls': { + 'jjfoodservice': 'https://www.jjfoodservice.com/products/vegetable-oil-20l', + 'atoz_catering': 'https://www.atoz-catering.co.uk/products/product/vegetable-oil-20l-container' + } + }, + { + 'name': 'Plain Flour 16kg Sack', + 'description': 'Professional baking flour for commercial use', + 'target_price': 18.00, + 'urls': { + 'atoz_catering': 'https://www.atoz-catering.co.uk/products/product/plain-flour-16kg-sack', + 'jjfoodservice': 'https://www.jjfoodservice.com/products/plain-flour-16kg' + } + } + ] + + print("Adding UK catering sample products...") + + for product_data in sample_products: + try: + product_id = db_manager.add_product( + name=product_data['name'], + description=product_data['description'], + target_price=product_data['target_price'], + urls=product_data['urls'] + ) + print(f"✓ Added: {product_data['name']} (ID: {product_id})") + except Exception as e: + print(f"✗ Failed to add {product_data['name']}: {e}") + + print("\nUK catering sample products added successfully!") + print("Note: The URLs in this example are placeholders.") + print("You'll need to replace them with real product URLs from:") + print("- JJ Food Service: https://www.jjfoodservice.com/") + print("- A to Z Catering: https://www.atoz-catering.co.uk/") + print("\nYou can now run the web UI with: python main.py --mode web") + print("Or start scraping with: python main.py --mode scrape") + +if __name__ == "__main__": + add_uk_catering_products() diff --git a/main.py b/main.py new file mode 100644 index 0000000..b17e48e --- /dev/null +++ b/main.py @@ -0,0 +1,115 @@ +#!/uhttps://www.atoz-catering.co.uksr/bin/env python3 +""" +Price Tracker - Web Scraper for Product Price Monitoring +Tracks product prices across multiple e-commerce sites +""" + +import asyncio +import logging +from datetime import datetime +from typing import List, Dict, Optional +import argparse + +from src.scraper_manager import ScraperManager +from src.database import DatabaseManager +from src.config import Config +from src.notification import NotificationManager +from src.web_ui import create_app + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('price_tracker.log'), + logging.StreamHandler() + ] +) + +logger = logging.getLogger(__name__) + +async def run_scraper(): + """Run the price scraping process.""" + try: + config = Config() + db_manager = DatabaseManager(config.database_path) + scraper_manager = ScraperManager(config) + notification_manager = NotificationManager(config) + + logger.info("Starting price tracking session") + + # Load products from database + products = db_manager.get_all_products() + if not products: + logger.warning("No products found in database. Add products first.") + return + + # Scrape prices for all products + results = await scraper_manager.scrape_all_products(products) + + # Process results and save to database + price_alerts = [] + for product_id, site_prices in results.items(): + for site_name, price_data in site_prices.items(): + if price_data['success']: + # Save price to database + db_manager.save_price_history( + product_id=product_id, + site_name=site_name, + price=price_data['price'], + currency=price_data.get('currency', 'USD'), + availability=price_data.get('availability', True), + timestamp=datetime.now() + ) + + # Check for price alerts + product = db_manager.get_product(product_id) + if product and price_data['price'] <= product['target_price']: + price_alerts.append({ + 'product': product, + 'site': site_name, + 'current_price': price_data['price'], + 'target_price': product['target_price'] + }) + + # Send notifications for price alerts + if price_alerts: + await notification_manager.send_price_alerts(price_alerts) + + logger.info(f"Scraping completed. Found {len(price_alerts)} price alerts.") + + except Exception as e: + logger.error(f"Error during scraping: {e}") + raise + + +def run_web_ui(): + """Run the web UI for managing products and viewing price history.""" + import os + + # Use environment variables for configuration + host = os.environ.get('FLASK_HOST', '0.0.0.0') + port = int(os.environ.get('FLASK_PORT', 5000)) + debug = os.environ.get('FLASK_ENV', 'production').lower() != 'production' + + app = create_app() + logger.info(f"Starting Price Tracker web server on {host}:{port}") + app.run(host=host, port=port, debug=debug) + + +def main(): + parser = argparse.ArgumentParser(description='Price Tracker') + parser.add_argument('--mode', choices=['scrape', 'web'], default='web', + help='Run mode: scrape prices or start web UI') + parser.add_argument('--config', help='Path to config file') + + args = parser.parse_args() + + if args.mode == 'scrape': + asyncio.run(run_scraper()) + else: + run_web_ui() + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0890ed4 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,15 @@ +beautifulsoup4==4.12.3 +requests==2.31.0 +aiohttp==3.9.1 +flask==3.0.0 +flask-wtf==1.2.1 +wtforms==3.1.1 +python-dotenv==1.0.0 +lxml==5.1.0 +fake-useragent==1.4.0 +email-validator==2.1.0 +jinja2==3.1.2 +plotly==5.17.0 +pandas==2.1.4 +numpy==1.26.2 +python-dateutil==2.8.2 diff --git a/scripts/scheduled_scraping.py b/scripts/scheduled_scraping.py new file mode 100644 index 0000000..634d72e --- /dev/null +++ b/scripts/scheduled_scraping.py @@ -0,0 +1,102 @@ +""" +Scheduled price scraping script for cron jobs +""" + +import sys +import os +import asyncio +import logging +from datetime import datetime + +# Add the parent directory to sys.path to import our modules +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from src.config import Config +from src.database import DatabaseManager +from src.scraper_manager import ScraperManager +from src.notification import NotificationManager + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('scheduled_scraping.log'), + logging.StreamHandler() + ] +) + +logger = logging.getLogger(__name__) + +async def run_scheduled_scraping(): + """Run the scheduled price scraping.""" + try: + logger.info("=== Starting scheduled price scraping ===") + + # Initialize components + config = Config() + db_manager = DatabaseManager(config.database_path) + scraper_manager = ScraperManager(config) + notification_manager = NotificationManager(config) + + # Get all products + products = db_manager.get_all_products() + if not products: + logger.warning("No products found in database") + return + + logger.info(f"Found {len(products)} products to scrape") + + # Scrape all products + results = await scraper_manager.scrape_all_products(products) + + # Process results + total_success = 0 + total_failed = 0 + price_alerts = [] + + for product_id, site_results in results.items(): + product = db_manager.get_product(product_id) + + for site_name, result in site_results.items(): + if result['success']: + total_success += 1 + + # Save to database + db_manager.save_price_history( + product_id=product_id, + site_name=site_name, + price=result['price'], + currency=result.get('currency', 'USD'), + availability=result.get('availability', True), + timestamp=datetime.now() + ) + + # Check for price alerts + if product and product['target_price'] and result['price'] <= product['target_price']: + price_alerts.append({ + 'product': product, + 'site': site_name, + 'current_price': result['price'], + 'target_price': product['target_price'] + }) + + logger.info(f"Price alert: {product['name']} on {site_name} - ${result['price']:.2f}") + else: + total_failed += 1 + logger.error(f"Failed to scrape {product['name']} on {site_name}: {result.get('error', 'Unknown error')}") + + # Send notifications for price alerts + if price_alerts: + await notification_manager.send_price_alerts(price_alerts) + logger.info(f"Sent notifications for {len(price_alerts)} price alerts") + + logger.info(f"Scraping completed: {total_success} successful, {total_failed} failed") + logger.info(f"Found {len(price_alerts)} price alerts") + + except Exception as e: + logger.error(f"Error during scheduled scraping: {e}", exc_info=True) + raise + +if __name__ == "__main__": + asyncio.run(run_scheduled_scraping()) diff --git a/setup.sh b/setup.sh new file mode 100755 index 0000000..7e84f8e --- /dev/null +++ b/setup.sh @@ -0,0 +1,66 @@ +#!/bin/bash + +# Price Tracker Setup Script +# This script helps set up the price tracker environment + +echo "🛒 Price Tracker Setup" +echo "=====================" + +# Check if Python 3.8+ is installed +python_version=$(python3 -c 'import sys; print(".".join(map(str, sys.version_info[:2])))') +echo "Python version: $python_version" + +if python3 -c 'import sys; exit(0 if sys.version_info >= (3, 8) else 1)'; then + echo "✓ Python version is suitable" +else + echo "✗ Python 3.8+ is required" + exit 1 +fi + +# Create virtual environment +echo "" +echo "📦 Creating virtual environment..." +python3 -m venv venv + +# Activate virtual environment +echo "🔧 Activating virtual environment..." +source venv/bin/activate + +# Install requirements +echo "📥 Installing requirements..." +pip install --upgrade pip +pip install -r requirements.txt + +# Create initial database +echo "" +echo "🗄️ Initializing database..." +python3 -c " +from src.database import DatabaseManager +from src.config import Config +config = Config() +db = DatabaseManager(config.database_path) +print('Database initialized successfully!') +" + +# Ask if user wants to add sample products +echo "" +read -p "Would you like to add sample products for testing? (y/n): " -n 1 -r +echo +if [[ $REPLY =~ ^[Yy]$ ]]; then + echo "🏪 Adding sample products..." + python3 examples/add_sample_products.py +fi + +echo "" +echo "🎉 Setup complete!" +echo "" +echo "Next steps:" +echo "1. Activate the virtual environment: source venv/bin/activate" +echo "2. Configure settings in config.json if needed" +echo "3. Start the web UI: python main.py --mode web" +echo "4. Or run scraping: python main.py --mode scrape" +echo "" +echo "Web UI will be available at: http://localhost:5000" +echo "" +echo "For scheduled scraping, add this to your crontab:" +echo "0 */6 * * * cd $(pwd) && source venv/bin/activate && python scripts/scheduled_scraping.py" diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..5dd25a4 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,7 @@ +""" +Price Tracker - Web scraper for monitoring product prices across multiple sites +""" + +__version__ = "1.0.0" +__author__ = "Price Tracker Team" +__description__ = "A comprehensive price tracking system using Beautiful Soup" diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..30dbfab --- /dev/null +++ b/src/config.py @@ -0,0 +1,86 @@ +""" +Configuration management for the price tracker +""" + +import json +import os +from typing import Dict, Any, Optional +from pathlib import Path + + +class Config: + """Configuration manager for the price tracker application.""" + + def __init__(self, config_path: Optional[str] = None): + self.config_path = config_path or "config.json" + self._config = self._load_config() + + def _load_config(self) -> Dict[str, Any]: + """Load configuration from JSON file.""" + config_file = Path(self.config_path) + if not config_file.exists(): + raise FileNotFoundError(f"Config file not found: {self.config_path}") + + with open(config_file, 'r') as f: + return json.load(f) + + @property + def database_path(self) -> str: + """Get database file path.""" + return self._config.get('database', {}).get('path', 'price_tracker.db') + + @property + def scraping_config(self) -> Dict[str, Any]: + """Get scraping configuration.""" + return self._config.get('scraping', {}) + + @property + def delay_between_requests(self) -> float: + """Get delay between requests in seconds.""" + return self.scraping_config.get('delay_between_requests', 2) + + @property + def max_concurrent_requests(self) -> int: + """Get maximum concurrent requests.""" + return self.scraping_config.get('max_concurrent_requests', 5) + + @property + def timeout(self) -> int: + """Get request timeout in seconds.""" + return self.scraping_config.get('timeout', 30) + + @property + def retry_attempts(self) -> int: + """Get number of retry attempts.""" + return self.scraping_config.get('retry_attempts', 3) + + @property + def user_agents(self) -> list: + """Get list of user agents.""" + return self.scraping_config.get('user_agents', [ + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" + ]) + + @property + def notification_config(self) -> Dict[str, Any]: + """Get notification configuration.""" + return self._config.get('notifications', {}) + + @property + def sites_config(self) -> Dict[str, Any]: + """Get sites configuration.""" + return self._config.get('sites', {}) + + def get_site_config(self, site_name: str) -> Optional[Dict[str, Any]]: + """Get configuration for a specific site.""" + return self.sites_config.get(site_name) + + def is_site_enabled(self, site_name: str) -> bool: + """Check if a site is enabled.""" + site_config = self.get_site_config(site_name) + return site_config.get('enabled', False) if site_config else False + + def get_enabled_sites(self) -> list: + """Get list of enabled sites.""" + return [site for site, config in self.sites_config.items() + if config.get('enabled', False)] diff --git a/src/database.py b/src/database.py new file mode 100644 index 0000000..70b5de8 --- /dev/null +++ b/src/database.py @@ -0,0 +1,228 @@ +""" +Database management for price tracking +""" + +import sqlite3 +from datetime import datetime, timedelta +from typing import List, Dict, Any, Optional +import json +import logging + +logger = logging.getLogger(__name__) + + +class DatabaseManager: + """Manages SQLite database operations for price tracking.""" + + def __init__(self, db_path: str): + self.db_path = db_path + self._init_database() + + def _init_database(self): + """Initialize database tables.""" + with sqlite3.connect(self.db_path) as conn: + conn.execute(''' + CREATE TABLE IF NOT EXISTS products ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + description TEXT, + target_price REAL, + urls TEXT NOT NULL, -- JSON string of site URLs + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + active BOOLEAN DEFAULT 1 + ) + ''') + + conn.execute(''' + CREATE TABLE IF NOT EXISTS price_history ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + product_id INTEGER NOT NULL, + site_name TEXT NOT NULL, + price REAL NOT NULL, + currency TEXT DEFAULT 'GBP', + availability BOOLEAN DEFAULT 1, + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (product_id) REFERENCES products (id) + ) + ''') + + conn.execute(''' + CREATE TABLE IF NOT EXISTS price_alerts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + product_id INTEGER NOT NULL, + site_name TEXT NOT NULL, + alert_price REAL NOT NULL, + triggered_at TIMESTAMP, + notified BOOLEAN DEFAULT 0, + FOREIGN KEY (product_id) REFERENCES products (id) + ) + ''') + + conn.execute(''' + CREATE INDEX IF NOT EXISTS idx_price_history_product_id + ON price_history (product_id) + ''') + + conn.execute(''' + CREATE INDEX IF NOT EXISTS idx_price_history_timestamp + ON price_history (timestamp) + ''') + + def add_product(self, name: str, urls: Dict[str, str], + description: str = None, target_price: float = None) -> int: + """Add a new product to track.""" + urls_json = json.dumps(urls) + + with sqlite3.connect(self.db_path) as conn: + cursor = conn.execute(''' + INSERT INTO products (name, description, target_price, urls) + VALUES (?, ?, ?, ?) + ''', (name, description, target_price, urls_json)) + + product_id = cursor.lastrowid + logger.info(f"Added product: {name} (ID: {product_id})") + return product_id + + def get_product(self, product_id: int) -> Optional[Dict[str, Any]]: + """Get product by ID.""" + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute(''' + SELECT * FROM products WHERE id = ? AND active = 1 + ''', (product_id,)) + + row = cursor.fetchone() + if row: + product = dict(row) + product['urls'] = json.loads(product['urls']) + return product + return None + + def get_all_products(self) -> List[Dict[str, Any]]: + """Get all active products.""" + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute(''' + SELECT * FROM products WHERE active = 1 ORDER BY name + ''') + + products = [] + for row in cursor.fetchall(): + product = dict(row) + product['urls'] = json.loads(product['urls']) + products.append(product) + + return products + + def update_product(self, product_id: int, **kwargs): + """Update product information.""" + allowed_fields = ['name', 'description', 'target_price', 'urls'] + updates = [] + values = [] + + for field, value in kwargs.items(): + if field in allowed_fields: + if field == 'urls': + value = json.dumps(value) + updates.append(f"{field} = ?") + values.append(value) + + if not updates: + return + + updates.append("updated_at = ?") + values.append(datetime.now()) + values.append(product_id) + + with sqlite3.connect(self.db_path) as conn: + conn.execute(f''' + UPDATE products SET {', '.join(updates)} WHERE id = ? + ''', values) + + def deactivate_product(self, product_id: int): + """Deactivate a product (soft delete).""" + with sqlite3.connect(self.db_path) as conn: + conn.execute(''' + UPDATE products SET active = 0, updated_at = ? WHERE id = ? + ''', (datetime.now(), product_id)) + + def save_price_history(self, product_id: int, site_name: str, price: float, + currency: str = 'GBP', availability: bool = True, + timestamp: datetime = None): + """Save price history entry.""" + if timestamp is None: + timestamp = datetime.now() + + with sqlite3.connect(self.db_path) as conn: + conn.execute(''' + INSERT INTO price_history + (product_id, site_name, price, currency, availability, timestamp) + VALUES (?, ?, ?, ?, ?, ?) + ''', (product_id, site_name, price, currency, availability, timestamp)) + + def get_price_history(self, product_id: int, days: int = 30) -> List[Dict[str, Any]]: + """Get price history for a product.""" + start_date = datetime.now() - timedelta(days=days) + + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute(''' + SELECT * FROM price_history + WHERE product_id = ? AND timestamp >= ? + ORDER BY timestamp DESC + ''', (product_id, start_date)) + + return [dict(row) for row in cursor.fetchall()] + + def get_latest_prices(self, product_id: int) -> Dict[str, Dict[str, Any]]: + """Get latest price for each site for a product.""" + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute(''' + SELECT DISTINCT site_name, + FIRST_VALUE(price) OVER (PARTITION BY site_name ORDER BY timestamp DESC) as price, + FIRST_VALUE(currency) OVER (PARTITION BY site_name ORDER BY timestamp DESC) as currency, + FIRST_VALUE(availability) OVER (PARTITION BY site_name ORDER BY timestamp DESC) as availability, + FIRST_VALUE(timestamp) OVER (PARTITION BY site_name ORDER BY timestamp DESC) as timestamp + FROM price_history + WHERE product_id = ? + ''', (product_id,)) + + result = {} + for row in cursor.fetchall(): + result[row['site_name']] = { + 'price': row['price'], + 'currency': row['currency'], + 'availability': bool(row['availability']), + 'timestamp': row['timestamp'] + } + + return result + + def get_price_statistics(self, product_id: int, days: int = 30) -> Dict[str, Any]: + """Get price statistics for a product.""" + start_date = datetime.now() - timedelta(days=days) + + with sqlite3.connect(self.db_path) as conn: + cursor = conn.execute(''' + SELECT site_name, + MIN(price) as min_price, + MAX(price) as max_price, + AVG(price) as avg_price, + COUNT(*) as data_points + FROM price_history + WHERE product_id = ? AND timestamp >= ? + GROUP BY site_name + ''', (product_id, start_date)) + + stats = {} + for row in cursor.fetchall(): + stats[row[0]] = { + 'min_price': row[1], + 'max_price': row[2], + 'avg_price': round(row[3], 2), + 'data_points': row[4] + } + + return stats diff --git a/src/notification.py b/src/notification.py new file mode 100644 index 0000000..f412bb8 --- /dev/null +++ b/src/notification.py @@ -0,0 +1,192 @@ +""" +Notification system for price alerts +""" + +import smtplib +import logging +import aiohttp +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +from typing import List, Dict, Any +from datetime import datetime + +logger = logging.getLogger(__name__) + + +class NotificationManager: + """Manages notifications for price alerts.""" + + def __init__(self, config): + self.config = config + self.notification_config = config.notification_config + + async def send_price_alerts(self, alerts: List[Dict[str, Any]]): + """Send notifications for price alerts.""" + if not alerts: + return + + # Send email notifications + if self.notification_config.get('email', {}).get('enabled', False): + await self._send_email_alerts(alerts) + + # Send webhook notifications + if self.notification_config.get('webhook', {}).get('enabled', False): + await self._send_webhook_alerts(alerts) + + async def _send_email_alerts(self, alerts: List[Dict[str, Any]]): + """Send email notifications for price alerts.""" + email_config = self.notification_config.get('email', {}) + + try: + # Create email content + subject = f"Price Alert: {len(alerts)} product(s) at target price!" + body = self._create_email_body(alerts) + + # Create message + msg = MIMEMultipart() + msg['From'] = email_config.get('sender_email') + msg['To'] = email_config.get('recipient_email') + msg['Subject'] = subject + + msg.attach(MIMEText(body, 'html')) + + # Send email + server = smtplib.SMTP(email_config.get('smtp_server'), email_config.get('smtp_port')) + server.starttls() + server.login(email_config.get('sender_email'), email_config.get('sender_password')) + + text = msg.as_string() + server.sendmail(email_config.get('sender_email'), + email_config.get('recipient_email'), text) + server.quit() + + logger.info(f"Email alert sent for {len(alerts)} products") + + except Exception as e: + logger.error(f"Failed to send email alert: {e}") + + async def _send_webhook_alerts(self, alerts: List[Dict[str, Any]]): + """Send webhook notifications for price alerts.""" + webhook_config = self.notification_config.get('webhook', {}) + webhook_url = webhook_config.get('url') + + if not webhook_url: + return + + try: + payload = { + 'timestamp': datetime.now().isoformat(), + 'alert_count': len(alerts), + 'alerts': [] + } + + for alert in alerts: + payload['alerts'].append({ + 'product_name': alert['product']['name'], + 'site': alert['site'], + 'current_price': alert['current_price'], + 'target_price': alert['target_price'], + 'savings': alert['target_price'] - alert['current_price'] + }) + + async with aiohttp.ClientSession() as session: + async with session.post(webhook_url, json=payload) as response: + if response.status == 200: + logger.info(f"Webhook alert sent for {len(alerts)} products") + else: + logger.error(f"Webhook failed with status {response.status}") + + except Exception as e: + logger.error(f"Failed to send webhook alert: {e}") + + def _create_email_body(self, alerts: List[Dict[str, Any]]) -> str: + """Create HTML email body for price alerts.""" + html = """ + + + + + +
+

🎉 Price Alert!

+

Great news! We found products at your target price!

+
+ """ + + for alert in alerts: + product = alert['product'] + savings = alert['target_price'] - alert['current_price'] + + html += f""" +
+
{product['name']}
+
+ {alert['site'].upper()} +

+ Current Price: £{alert['current_price']:.2f}
+ Your Target: £{alert['target_price']:.2f}
+ You Save: £{savings:.2f} +
+
+ """ + + html += """ + + + + """ + + return html + + async def send_test_notification(self) -> Dict[str, Any]: + """Send a test notification to verify configuration.""" + test_result = { + 'email': {'enabled': False, 'success': False, 'error': None}, + 'webhook': {'enabled': False, 'success': False, 'error': None} + } + + # Test email + if self.notification_config.get('email', {}).get('enabled', False): + test_result['email']['enabled'] = True + try: + test_alerts = [{ + 'product': {'name': 'Test Product'}, + 'site': 'test-site', + 'current_price': 19.99, + 'target_price': 25.00 + }] + await self._send_email_alerts(test_alerts) + test_result['email']['success'] = True + except Exception as e: + test_result['email']['error'] = str(e) + + # Test webhook + if self.notification_config.get('webhook', {}).get('enabled', False): + test_result['webhook']['enabled'] = True + try: + test_alerts = [{ + 'product': {'name': 'Test Product'}, + 'site': 'test-site', + 'current_price': 19.99, + 'target_price': 25.00 + }] + await self._send_webhook_alerts(test_alerts) + test_result['webhook']['success'] = True + except Exception as e: + test_result['webhook']['error'] = str(e) + + return test_result diff --git a/src/scraper.py b/src/scraper.py new file mode 100644 index 0000000..ffd6c27 --- /dev/null +++ b/src/scraper.py @@ -0,0 +1,334 @@ +""" +Web scraping functionality for price tracking +""" + +import asyncio +import aiohttp +import logging +import random +import re +from typing import Dict, List, Optional, Any, Tuple +from urllib.parse import urljoin, urlparse +from bs4 import BeautifulSoup +from fake_useragent import UserAgent + +from .config import Config + +logger = logging.getLogger(__name__) + + +class PriceScraper: + """Base class for price scraping functionality.""" + + def __init__(self, config: Config): + self.config = config + self.ua = UserAgent() + self.session = None + + async def __aenter__(self): + """Async context manager entry.""" + connector = aiohttp.TCPConnector(limit=self.config.max_concurrent_requests) + timeout = aiohttp.ClientTimeout(total=self.config.timeout) + self.session = aiohttp.ClientSession( + connector=connector, + timeout=timeout, + headers={'User-Agent': self.ua.random} + ) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit.""" + if self.session: + await self.session.close() + + def _get_headers(self, url: str = None) -> Dict[str, str]: + """Get request headers with random user agent and site-specific headers.""" + user_agents = self.config.user_agents + if user_agents: + user_agent = random.choice(user_agents) + else: + user_agent = self.ua.random + + headers = { + 'User-Agent': user_agent, + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', + 'Accept-Language': 'en-GB,en-US;q=0.9,en;q=0.8', + 'Accept-Encoding': 'gzip, deflate, br', + 'DNT': '1', + 'Connection': 'keep-alive', + 'Upgrade-Insecure-Requests': '1', + 'Sec-Fetch-Dest': 'document', + 'Sec-Fetch-Mode': 'navigate', + 'Sec-Fetch-Site': 'none', + } + + # Add site-specific headers + if url: + if 'amazon.co.uk' in url: + headers.update({ + 'Referer': 'https://www.amazon.co.uk/', + }) + elif 'jjfoodservice.com' in url: + headers.update({ + 'Referer': 'https://www.jjfoodservice.com/', + }) + elif 'atoz-catering.co.uk' in url: + headers.update({ + 'Referer': 'https://www.atoz-catering.co.uk/', + }) + + return headers + + async def _fetch_page(self, url: str) -> Optional[str]: + """Fetch a web page with retry logic and anti-bot measures.""" + base_delay = random.uniform(1, 3) # Random delay between 1-3 seconds + + for attempt in range(self.config.retry_attempts): + try: + # Add delay before each request (except first) + if attempt > 0: + delay = base_delay * (2 ** attempt) + random.uniform(0, 1) + await asyncio.sleep(delay) + + headers = self._get_headers(url) + + async with self.session.get(url, headers=headers) as response: + if response.status == 200: + return await response.text() + elif response.status == 403: + logger.warning(f"Access denied (403) for {url} - may be blocked by anti-bot measures") + # For 403 errors, wait longer before retry + if attempt < self.config.retry_attempts - 1: + await asyncio.sleep(random.uniform(5, 10)) + elif response.status == 429: + logger.warning(f"Rate limited (429) for {url}") + # For rate limiting, wait even longer + if attempt < self.config.retry_attempts - 1: + await asyncio.sleep(random.uniform(10, 20)) + else: + logger.warning(f"HTTP {response.status} for {url}") + + except Exception as e: + logger.warning(f"Attempt {attempt + 1} failed for {url}: {e}") + if attempt < self.config.retry_attempts - 1: + await asyncio.sleep(base_delay * (2 ** attempt)) + + logger.error(f"Failed to fetch {url} after {self.config.retry_attempts} attempts") + return None + + def _extract_price(self, soup: BeautifulSoup, selectors: List[str]) -> Optional[float]: + """Extract price from HTML using CSS selectors.""" + for selector in selectors: + try: + elements = soup.select(selector) + for element in elements: + price_text = element.get_text(strip=True) + price = self._parse_price(price_text) + if price is not None: + return price + except Exception as e: + logger.debug(f"Error with selector {selector}: {e}") + continue + + return None + + def _parse_price(self, price_text: str) -> Optional[float]: + """Parse price from text string.""" + if not price_text: + return None + + # Remove common currency symbols and clean text + price_text = re.sub(r'[^\d.,]+', '', price_text) + price_text = price_text.replace(',', '') + + # Try to extract price as float + try: + return float(price_text) + except (ValueError, TypeError): + # Try to find price pattern + price_match = re.search(r'(\d+\.?\d*)', price_text) + if price_match: + return float(price_match.group(1)) + + return None + + def _extract_text(self, soup: BeautifulSoup, selectors: List[str]) -> Optional[str]: + """Extract text from HTML using CSS selectors.""" + for selector in selectors: + try: + element = soup.select_one(selector) + if element: + return element.get_text(strip=True) + except Exception as e: + logger.debug(f"Error with selector {selector}: {e}") + continue + + return None + + def _detect_site(self, url: str) -> Optional[str]: + """Detect which site this URL belongs to.""" + domain = urlparse(url).netloc.lower() + + if 'amazon' in domain: + return 'amazon' + elif 'ebay' in domain: + return 'ebay' + elif 'walmart' in domain: + return 'walmart' + # Add more site detection logic here + + return None + + async def scrape_product_price(self, url: str, site_name: str = None) -> Dict[str, Any]: + """Scrape price for a single product from a URL.""" + result = { + 'success': False, + 'price': None, + 'currency': 'GBP', + 'title': None, + 'availability': None, + 'url': url, + 'error': None + } + + try: + # Auto-detect site if not provided + if not site_name: + site_name = self._detect_site(url) + if not site_name: + result['error'] = "Could not detect site from URL" + return result + + # Get site configuration + site_config = self.config.get_site_config(site_name) + if not site_config: + result['error'] = f"No configuration found for site: {site_name}" + return result + + if not self.config.is_site_enabled(site_name): + result['error'] = f"Site {site_name} is disabled" + return result + + # Fetch page content + html_content = await self._fetch_page(url) + if not html_content: + result['error'] = "Failed to fetch page content" + return result + + # Parse HTML + soup = BeautifulSoup(html_content, 'html.parser') + + # Extract price + price_selectors = site_config.get('selectors', {}).get('price', []) + price = self._extract_price(soup, price_selectors) + + if price is None: + result['error'] = "Could not extract price from page" + return result + + # Extract additional information + title_selectors = site_config.get('selectors', {}).get('title', []) + title = self._extract_text(soup, title_selectors) + + availability_selectors = site_config.get('selectors', {}).get('availability', []) + availability_text = self._extract_text(soup, availability_selectors) + availability = self._parse_availability(availability_text) + + result.update({ + 'success': True, + 'price': price, + 'title': title, + 'availability': availability + }) + + logger.info(f"Successfully scraped {site_name}: ${price}") + + except Exception as e: + logger.error(f"Error scraping {url}: {e}") + result['error'] = str(e) + + return result + + def _parse_availability(self, availability_text: str) -> bool: + """Parse availability from text.""" + if not availability_text: + return True # Assume available if no info + + availability_text = availability_text.lower() + + # Common out of stock indicators + out_of_stock_indicators = [ + 'out of stock', 'unavailable', 'sold out', 'not available', + 'temporarily out of stock', 'currently unavailable' + ] + + for indicator in out_of_stock_indicators: + if indicator in availability_text: + return False + + return True + + +class ScraperManager: + """Manages multiple price scrapers and coordinates scraping tasks.""" + + def __init__(self, config: Config): + self.config = config + self.semaphore = asyncio.Semaphore(config.max_concurrent_requests) + + async def scrape_product(self, product: Dict[str, Any]) -> Dict[str, Dict[str, Any]]: + """Scrape prices for a single product across all configured sites.""" + product_id = product['id'] + urls = product['urls'] + + results = {} + + async with PriceScraper(self.config) as scraper: + tasks = [] + + for site_name, url in urls.items(): + if self.config.is_site_enabled(site_name): + task = self._scrape_with_semaphore(scraper, url, site_name) + tasks.append((site_name, task)) + + # Add delay between requests + await asyncio.sleep(self.config.delay_between_requests) + + # Wait for all tasks to complete + for site_name, task in tasks: + try: + result = await task + results[site_name] = result + except Exception as e: + logger.error(f"Error scraping {site_name} for product {product_id}: {e}") + results[site_name] = { + 'success': False, + 'error': str(e) + } + + return results + + async def _scrape_with_semaphore(self, scraper: PriceScraper, url: str, site_name: str): + """Scrape with semaphore to limit concurrent requests.""" + async with self.semaphore: + return await scraper.scrape_product_price(url, site_name) + + async def scrape_all_products(self, products: List[Dict[str, Any]]) -> Dict[int, Dict[str, Dict[str, Any]]]: + """Scrape prices for all products.""" + results = {} + + for product in products: + try: + product_id = product['id'] + logger.info(f"Scraping product: {product['name']} (ID: {product_id})") + + product_results = await self.scrape_product(product) + results[product_id] = product_results + + # Add delay between products + await asyncio.sleep(self.config.delay_between_requests) + + except Exception as e: + logger.error(f"Error scraping product {product.get('id', 'unknown')}: {e}") + + return results diff --git a/src/scraper_manager.py b/src/scraper_manager.py new file mode 100644 index 0000000..9d1a670 --- /dev/null +++ b/src/scraper_manager.py @@ -0,0 +1,139 @@ +""" +Scraper manager for coordinating price scraping tasks +""" + +import asyncio +import logging +from typing import Dict, List, Any +from .scraper import ScraperManager as BaseScraper +from .uk_scraper import UKCateringScraper + +logger = logging.getLogger(__name__) + + +class ScraperManager(BaseScraper): + """Enhanced scraper manager with additional coordination features.""" + + def __init__(self, config): + super().__init__(config) + self.active_tasks = {} + + async def scrape_product_by_id(self, product_id: int, product_data: Dict[str, Any]) -> Dict[str, Dict[str, Any]]: + """Scrape a specific product by ID with task tracking.""" + if product_id in self.active_tasks: + logger.info(f"Product {product_id} is already being scraped") + return await self.active_tasks[product_id] + + # Create and track the scraping task + task = asyncio.create_task(self.scrape_product(product_data)) + self.active_tasks[product_id] = task + + try: + result = await task + return result + finally: + # Clean up completed task + if product_id in self.active_tasks: + del self.active_tasks[product_id] + + async def cancel_product_scraping(self, product_id: int) -> bool: + """Cancel scraping for a specific product.""" + if product_id in self.active_tasks: + task = self.active_tasks[product_id] + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + del self.active_tasks[product_id] + logger.info(f"Cancelled scraping for product {product_id}") + return True + return False + + def get_active_scraping_tasks(self) -> List[int]: + """Get list of product IDs currently being scraped.""" + return list(self.active_tasks.keys()) + + async def health_check(self) -> Dict[str, Any]: + """Perform a health check on the scraping system.""" + health_status = { + 'status': 'healthy', + 'active_tasks': len(self.active_tasks), + 'enabled_sites': len(self.config.get_enabled_sites()), + 'site_checks': {} + } + + # Test each enabled site with a simple request + enabled_sites = self.config.get_enabled_sites() + + for site_name in enabled_sites: + site_config = self.config.get_site_config(site_name) + base_url = site_config.get('base_url', '') + + try: + from .scraper import PriceScraper + async with PriceScraper(self.config) as scraper: + html_content = await scraper._fetch_page(base_url) + if html_content: + health_status['site_checks'][site_name] = 'accessible' + else: + health_status['site_checks'][site_name] = 'inaccessible' + except Exception as e: + health_status['site_checks'][site_name] = f'error: {str(e)}' + + # Determine overall health + failed_sites = [site for site, status in health_status['site_checks'].items() + if status != 'accessible'] + + if len(failed_sites) == len(enabled_sites): + health_status['status'] = 'unhealthy' + elif failed_sites: + health_status['status'] = 'degraded' + + return health_status + + async def scrape_product(self, product: Dict[str, Any]) -> Dict[str, Dict[str, Any]]: + """Scrape prices for a single product across all configured sites.""" + product_id = product['id'] + urls = product['urls'] + + results = {} + + # Determine which scraper to use based on the sites + uk_catering_sites = {'jjfoodservice', 'atoz_catering', 'amazon_uk'} + has_uk_sites = any(site in uk_catering_sites for site in urls.keys()) + + if has_uk_sites: + # Use UK catering scraper + async with UKCateringScraper(self.config) as scraper: + tasks = [] + + for site_name, url in urls.items(): + if self.config.is_site_enabled(site_name): + task = self._scrape_with_semaphore_uk(scraper, url, site_name) + tasks.append((site_name, task)) + + # Add delay between requests + await asyncio.sleep(self.config.delay_between_requests) + + # Wait for all tasks to complete + for site_name, task in tasks: + try: + result = await task + results[site_name] = result + except Exception as e: + logger.error(f"Error scraping {site_name} for product {product_id}: {e}") + results[site_name] = { + 'success': False, + 'error': str(e) + } + else: + # Use standard scraper for other sites + results = await super().scrape_product(product) + + return results + + async def _scrape_with_semaphore_uk(self, scraper: UKCateringScraper, url: str, site_name: str): + """Scrape with semaphore using UK scraper.""" + async with self.semaphore: + return await scraper.scrape_product_price(url, site_name) diff --git a/src/uk_scraper.py b/src/uk_scraper.py new file mode 100644 index 0000000..cc7b72a --- /dev/null +++ b/src/uk_scraper.py @@ -0,0 +1,332 @@ +""" +Specialized scrapers for UK catering supply sites +""" + +import re +import logging +from typing import Dict, Any, Optional +from bs4 import BeautifulSoup +from .scraper import PriceScraper + +logger = logging.getLogger(__name__) + + +class UKCateringScraper(PriceScraper): + """Specialized scraper for UK catering supply websites.""" + + def _parse_uk_price(self, price_text: str) -> Optional[float]: + """Parse UK price format with £ symbol.""" + if not price_text: + return None + + # Remove common text and normalize + price_text = price_text.lower() + price_text = re.sub(r'delivery:|collection:|was:|now:|offer:|from:', '', price_text) + + # Find price with £ symbol + price_match = re.search(r'£(\d+\.?\d*)', price_text) + if price_match: + try: + return float(price_match.group(1)) + except ValueError: + pass + + # Try without £ symbol but with decimal + price_match = re.search(r'(\d+\.\d{2})', price_text) + if price_match: + try: + return float(price_match.group(1)) + except ValueError: + pass + + return None + + def _extract_jjfoodservice_data(self, soup: BeautifulSoup) -> Dict[str, Any]: + """Extract data specifically from JJ Food Service.""" + result = { + 'price': None, + 'title': None, + 'availability': True, + 'currency': 'GBP' + } + + # Try multiple selectors for price + price_selectors = [ + '.price', + '.product-price', + '[data-testid="price"]', + '.price-value', + '.current-price', + '.product-card-price', + 'span:contains("£")', + '.cost' + ] + + for selector in price_selectors: + try: + elements = soup.select(selector) + for element in elements: + price_text = element.get_text(strip=True) + price = self._parse_uk_price(price_text) + if price is not None: + result['price'] = price + logger.info(f"Successfully scraped jjfoodservice: £{price}") + break + if result['price'] is not None: + break + except Exception as e: + logger.debug(f"Error with JJ Food Service price selector {selector}: {e}") + + # Try to extract title + title_selectors = [ + 'h1', + '.product-title', + '.product-name', + '[data-testid="product-title"]', + '.product-card-title', + 'title' + ] + + for selector in title_selectors: + try: + element = soup.select_one(selector) + if element: + result['title'] = element.get_text(strip=True) + break + except Exception as e: + logger.debug(f"Error with JJ Food Service title selector {selector}: {e}") + + # Check availability + availability_indicators = [ + 'out of stock', + 'unavailable', + 'not available', + 'temporarily unavailable' + ] + + page_text = soup.get_text().lower() + for indicator in availability_indicators: + if indicator in page_text: + result['availability'] = False + break + + return result + + def _extract_atoz_catering_data(self, soup: BeautifulSoup) -> Dict[str, Any]: + """Extract data specifically from A to Z Catering.""" + result = { + 'price': None, + 'title': None, + 'availability': True, + 'currency': 'GBP' + } + + # A to Z Catering specific selectors + price_selectors = [ + '.price', + '.product-price', + '.delivery-price', + '.collection-price', + 'span:contains("£")', + '.price-value', + '.cost', + '.selling-price' + ] + + for selector in price_selectors: + try: + elements = soup.select(selector) + for element in elements: + price_text = element.get_text(strip=True) + # Skip if it contains "delivery" or "collection" but no price + if ('delivery' in price_text.lower() or 'collection' in price_text.lower()) and '£' not in price_text: + continue + + price = self._parse_uk_price(price_text) + if price is not None: + result['price'] = price + logger.info(f"Successfully scraped atoz_catering: £{price}") + break + if result['price'] is not None: + break + except Exception as e: + logger.debug(f"Error with A to Z price selector {selector}: {e}") + + # Extract title + title_selectors = [ + 'h1', + '.product-title', + '.product-name', + 'a[href*="/products/product/"]', + '.product-link', + 'title' + ] + + for selector in title_selectors: + try: + element = soup.select_one(selector) + if element: + result['title'] = element.get_text(strip=True) + break + except Exception as e: + logger.debug(f"Error with A to Z title selector {selector}: {e}") + + # Check availability - A to Z specific indicators + availability_indicators = [ + 'out of stock', + 'unavailable', + 'not available', + 'temporarily unavailable', + 'contact us for availability' + ] + + page_text = soup.get_text().lower() + for indicator in availability_indicators: + if indicator in page_text: + result['availability'] = False + break + + # Check if "Add to Basket" button is present (indicates availability) + add_to_basket = soup.select_one('.add-to-basket, button:contains("Add To Basket")') + if not add_to_basket and result['availability']: + # If no add to basket button and no explicit availability info, assume unavailable + out_of_stock_indicators = soup.select('.out-of-stock, .unavailable') + if out_of_stock_indicators: + result['availability'] = False + + return result + + def _extract_amazon_uk_data(self, soup: BeautifulSoup) -> Dict[str, Any]: + """Extract data specifically from Amazon UK.""" + result = { + 'price': None, + 'title': None, + 'availability': True, + 'currency': 'GBP' + } + + # Amazon UK price selectors + price_selectors = [ + '.a-price-whole', + '.a-price .a-offscreen', + '#priceblock_dealprice', + '#priceblock_ourprice', + '.a-price-range', + '.a-price.a-text-price.a-size-medium.apexPriceToPay', + '.a-price-current', + 'span.a-price.a-text-price.a-size-medium' + ] + + for selector in price_selectors: + try: + elements = soup.select(selector) + for element in elements: + price_text = element.get_text(strip=True) + price = self._parse_uk_price(price_text) + if price is not None: + result['price'] = price + break + if result['price'] is not None: + break + except Exception as e: + logger.debug(f"Error with Amazon UK price selector {selector}: {e}") + + # Extract title + title_selectors = [ + '#productTitle', + '.product-title', + 'h1.a-size-large', + 'h1' + ] + + for selector in title_selectors: + try: + element = soup.select_one(selector) + if element: + result['title'] = element.get_text(strip=True) + break + except Exception as e: + logger.debug(f"Error with Amazon UK title selector {selector}: {e}") + + # Check availability + availability_selectors = [ + '#availability span', + '.a-size-medium.a-color-success', + '.a-size-medium.a-color-state', + '#availability .a-declarative' + ] + + for selector in availability_selectors: + try: + element = soup.select_one(selector) + if element: + availability_text = element.get_text().lower() + if any(phrase in availability_text for phrase in ['out of stock', 'unavailable', 'not available']): + result['availability'] = False + break + except Exception as e: + logger.debug(f"Error with Amazon UK availability selector {selector}: {e}") + + return result + + async def scrape_product(self, product_data: Dict[str, Any]) -> Dict[str, Dict[str, Any]]: + """Scrape prices for a product from all configured sites.""" + results = {} + urls = product_data.get('urls', {}) + + for site_name, url in urls.items(): + try: + # Only process sites we support + if site_name not in ['jjfoodservice', 'atoz_catering', 'amazon_uk']: + logger.warning(f"Skipping unsupported site: {site_name}") + continue + + html_content = await self._fetch_page(url) + if not html_content: + results[site_name] = { + 'success': False, + 'error': 'Failed to fetch page', + 'price': None, + 'currency': 'GBP' + } + continue + + soup = BeautifulSoup(html_content, 'html.parser') + + # Route to appropriate extraction method + if site_name == 'jjfoodservice': + extracted_data = self._extract_jjfoodservice_data(soup) + elif site_name == 'atoz_catering': + extracted_data = self._extract_atoz_catering_data(soup) + elif site_name == 'amazon_uk': + extracted_data = self._extract_amazon_uk_data(soup) + else: + # Fallback to generic extraction + extracted_data = self._extract_generic_data(soup, site_name) + + if extracted_data['price'] is not None: + results[site_name] = { + 'success': True, + 'price': extracted_data['price'], + 'currency': extracted_data['currency'], + 'title': extracted_data.get('title'), + 'availability': extracted_data.get('availability', True) + } + else: + results[site_name] = { + 'success': False, + 'error': 'Could not extract price', + 'price': None, + 'currency': 'GBP' + } + + except Exception as e: + logger.error(f"Error scraping {site_name}: {e}") + results[site_name] = { + 'success': False, + 'error': str(e), + 'price': None, + 'currency': 'GBP' + } + + return results diff --git a/src/uk_scraper_old.py b/src/uk_scraper_old.py new file mode 100644 index 0000000..9ab1bde --- /dev/null +++ b/src/uk_scraper_old.py @@ -0,0 +1,515 @@ +""" +Specialized scrapers for UK catering supply sites +""" + +import re +import logging +from typing import Dict, Any, Optional +from bs4 import BeautifulSoup +from .scraper import PriceScraper + +logger = logging.getLogger(__name__) + + +class UKCateringScraper(PriceScraper): + """Specialized scraper for UK catering supply websites.""" + + def _parse_uk_price(self, price_text: str) -> Optional[float]: + """Parse UK price format with £ symbol.""" + if not price_text: + return None + + # Remove common text and normalize + price_text = price_text.lower() + price_text = re.sub(r'delivery:|collection:|was:|now:|offer:|from:', '', price_text) + + # Find price with £ symbol + price_match = re.search(r'£(\d+\.?\d*)', price_text) + if price_match: + try: + return float(price_match.group(1)) + except ValueError: + pass + + # Try without £ symbol but with decimal + price_match = re.search(r'(\d+\.\d{2})', price_text) + if price_match: + try: + return float(price_match.group(1)) + except ValueError: + pass + + return None + + def _extract_jjfoodservice_data(self, soup: BeautifulSoup) -> Dict[str, Any]: + """Extract data specifically from JJ Food Service.""" + result = { + 'price': None, + 'title': None, + 'availability': True, + 'currency': 'GBP' + } + + # Try multiple selectors for price + price_selectors = [ + '.price', + '.product-price', + '[data-testid="price"]', + '.price-value', + '.current-price', + '.product-card-price', + 'span:contains("£")', + '.cost' + ] + + for selector in price_selectors: + try: + elements = soup.select(selector) + for element in elements: + price_text = element.get_text(strip=True) + price = self._parse_uk_price(price_text) + if price is not None: + result['price'] = price + break + if result['price'] is not None: + break + except Exception as e: + logger.debug(f"Error with JJ Food Service price selector {selector}: {e}") + + # Try to extract title + title_selectors = [ + 'h1', + '.product-title', + '.product-name', + '[data-testid="product-title"]', + '.product-card-title', + 'title' + ] + + for selector in title_selectors: + try: + element = soup.select_one(selector) + if element: + result['title'] = element.get_text(strip=True) + break + except Exception as e: + logger.debug(f"Error with JJ Food Service title selector {selector}: {e}") + + # Check availability + availability_indicators = [ + 'out of stock', + 'unavailable', + 'not available', + 'sold out' + ] + + page_text = soup.get_text().lower() + for indicator in availability_indicators: + if indicator in page_text: + result['availability'] = False + break + + return result + + def _extract_atoz_data(self, soup: BeautifulSoup) -> Dict[str, Any]: + """Extract data specifically from A to Z Catering.""" + result = { + 'price': None, + 'title': None, + 'availability': True, + 'currency': 'GBP' + } + + # A to Z Catering shows prices like "Delivery:£X.XX Collection:£Y.YY" + # We'll prioritize the lower price (usually collection) + + price_text = soup.get_text() + + # Look for delivery and collection prices + delivery_match = re.search(r'delivery:?\s*£(\d+\.?\d*)', price_text, re.IGNORECASE) + collection_match = re.search(r'collection:?\s*£(\d+\.?\d*)', price_text, re.IGNORECASE) + + prices = [] + if delivery_match: + try: + prices.append(float(delivery_match.group(1))) + except ValueError: + pass + + if collection_match: + try: + prices.append(float(collection_match.group(1))) + except ValueError: + pass + + # If we found prices, use the lowest one + if prices: + result['price'] = min(prices) + else: + # Fallback to general price extraction + price_selectors = [ + '.price', + '.product-price', + 'span:contains("£")', + '.price-value' + ] + + for selector in price_selectors: + try: + elements = soup.select(selector) + for element in elements: + price_text = element.get_text(strip=True) + price = self._parse_uk_price(price_text) + if price is not None: + result['price'] = price + break + if result['price'] is not None: + break + except Exception as e: + logger.debug(f"Error with A to Z price selector {selector}: {e}") + + # Extract title - A to Z often has product names in links + title_selectors = [ + 'h1', + '.product-title', + '.product-name', + 'a[href*="/products/product/"]', + '.product-link', + 'title' + ] + + for selector in title_selectors: + try: + element = soup.select_one(selector) + if element: + title = element.get_text(strip=True) + # Clean up the title + if len(title) > 5 and 'A to Z' not in title: + result['title'] = title + break + except Exception as e: + logger.debug(f"Error with A to Z title selector {selector}: {e}") + + # Check availability - look for "Add To Basket" button + add_to_basket = soup.find(text=re.compile('Add To Basket', re.IGNORECASE)) + if not add_to_basket: + # Also check for out of stock indicators + out_of_stock_indicators = [ + 'out of stock', + 'unavailable', + 'not available', + 'sold out' + ] + + page_text = soup.get_text().lower() + for indicator in out_of_stock_indicators: + if indicator in page_text: + result['availability'] = False + break + + return result + + def _extract_amazon_uk_data(self, soup: BeautifulSoup) -> Dict[str, Any]: + """Extract data specifically from Amazon UK.""" + result = { + 'price': None, + 'title': None, + 'availability': True, + 'currency': 'GBP' + } + + # Amazon UK price selectors + price_selectors = [ + '.a-price-whole', + '.a-price .a-offscreen', + '.a-price-current .a-offscreen', + '#priceblock_dealprice', + '#priceblock_ourprice', + '.a-price-range', + '.a-price.a-text-price.a-size-medium.apexPriceToPay .a-offscreen' + ] + + for selector in price_selectors: + try: + elements = soup.select(selector) + for element in elements: + price_text = element.get_text(strip=True) + price = self._parse_uk_price(price_text) + if price is not None: + result['price'] = price + break + if result['price'] is not None: + break + except Exception as e: + logger.debug(f"Error with Amazon UK price selector {selector}: {e}") + + # Extract title + title_selectors = [ + '#productTitle', + '.product-title', + 'h1.a-size-large' + ] + + for selector in title_selectors: + try: + element = soup.select_one(selector) + if element: + result['title'] = element.get_text(strip=True) + break + except Exception as e: + logger.debug(f"Error with Amazon UK title selector {selector}: {e}") + + # Check availability + availability_text = soup.get_text().lower() + if any(phrase in availability_text for phrase in ['out of stock', 'currently unavailable', 'not available']): + result['availability'] = False + + return result + + def _extract_tesco_data(self, soup: BeautifulSoup) -> Dict[str, Any]: + """Extract data specifically from Tesco.""" + result = { + 'price': None, + 'title': None, + 'availability': True, + 'currency': 'GBP' + } + + # Tesco price selectors + price_selectors = [ + '.price-control-wrapper .value', + '.price-per-sellable-unit .value', + '.price-per-quantity-weight .value', + '[data-testid="price-current-value"]', + '.price-current', + '.product-price .price' + ] + + for selector in price_selectors: + try: + elements = soup.select(selector) + for element in elements: + price_text = element.get_text(strip=True) + price = self._parse_uk_price(price_text) + if price is not None: + result['price'] = price + break + if result['price'] is not None: + break + except Exception as e: + logger.debug(f"Error with Tesco price selector {selector}: {e}") + + # Extract title + title_selectors = [ + 'h1[data-testid="product-title"]', + '.product-details-tile h1', + '.product-title', + 'h1.product-name' + ] + + for selector in title_selectors: + try: + element = soup.select_one(selector) + if element: + result['title'] = element.get_text(strip=True) + break + except Exception as e: + logger.debug(f"Error with Tesco title selector {selector}: {e}") + + return result + + def _extract_sainsburys_data(self, soup: BeautifulSoup) -> Dict[str, Any]: + """Extract data specifically from Sainsburys.""" + result = { + 'price': None, + 'title': None, + 'availability': True, + 'currency': 'GBP' + } + + # Sainsburys price selectors + price_selectors = [ + '.pd__cost__current-price', + '.pd__cost .pd__cost__retail-price', + '.pricing__now-price', + '.product-price__current', + '[data-testid="pd-retail-price"]', + '.price-per-unit' + ] + + for selector in price_selectors: + try: + elements = soup.select(selector) + for element in elements: + price_text = element.get_text(strip=True) + price = self._parse_uk_price(price_text) + if price is not None: + result['price'] = price + break + if result['price'] is not None: + break + except Exception as e: + logger.debug(f"Error with Sainsburys price selector {selector}: {e}") + + # Extract title + title_selectors = [ + '.pd__header h1', + 'h1[data-testid="pd-product-name"]', + '.product-name', + '.pd__product-name' + ] + + for selector in title_selectors: + try: + element = soup.select_one(selector) + if element: + result['title'] = element.get_text(strip=True) + break + except Exception as e: + logger.debug(f"Error with Sainsburys title selector {selector}: {e}") + + return result + + def _extract_booker_data(self, soup: BeautifulSoup) -> Dict[str, Any]: + """Extract data specifically from Booker.""" + result = { + 'price': None, + 'title': None, + 'availability': True, + 'currency': 'GBP' + } + + # Booker price selectors + price_selectors = [ + '.price', + '.product-price', + '.price-current', + '.selling-price', + '[data-testid="price"]', + '.product-tile-price' + ] + + for selector in price_selectors: + try: + elements = soup.select(selector) + for element in elements: + price_text = element.get_text(strip=True) + price = self._parse_uk_price(price_text) + if price is not None: + result['price'] = price + break + if result['price'] is not None: + break + except Exception as e: + logger.debug(f"Error with Booker price selector {selector}: {e}") + + # Extract title + title_selectors = [ + 'h1', + '.product-title', + '.product-name', + '.product-description h1', + '[data-testid="product-title"]' + ] + + for selector in title_selectors: + try: + element = soup.select_one(selector) + if element: + result['title'] = element.get_text(strip=True) + break + except Exception as e: + logger.debug(f"Error with Booker title selector {selector}: {e}") + + return result + + async def scrape_product_price(self, url: str, site_name: str = None) -> Dict[str, Any]: + """Enhanced scraping for UK catering sites.""" + result = { + 'success': False, + 'price': None, + 'currency': 'GBP', + 'title': None, + 'availability': None, + 'url': url, + 'error': None + } + + try: + # Auto-detect site if not provided + if not site_name: + site_name = self._detect_site(url) + if not site_name: + result['error'] = "Could not detect site from URL" + return result + + # Check if site is enabled + if not self.config.is_site_enabled(site_name): + result['error'] = f"Site {site_name} is disabled" + return result + + # Fetch page content + html_content = await self._fetch_page(url) + if not html_content: + result['error'] = "Failed to fetch page content" + return result + + # Parse HTML + soup = BeautifulSoup(html_content, 'html.parser') + + # Use specialized extraction based on site + if site_name == 'jjfoodservice': + extracted_data = self._extract_jjfoodservice_data(soup) + elif site_name == 'atoz_catering': + extracted_data = self._extract_atoz_data(soup) + elif site_name == 'amazon_uk': + extracted_data = self._extract_amazon_uk_data(soup) + elif site_name == 'tesco': + extracted_data = self._extract_tesco_data(soup) + elif site_name == 'sainsburys': + extracted_data = self._extract_sainsburys_data(soup) + elif site_name == 'booker': + extracted_data = self._extract_booker_data(soup) + else: + # Fall back to general extraction + return await super().scrape_product_price(url, site_name) + + if extracted_data['price'] is None: + result['error'] = "Could not extract price from page" + return result + + result.update({ + 'success': True, + 'price': extracted_data['price'], + 'currency': extracted_data.get('currency', 'GBP'), + 'title': extracted_data.get('title'), + 'availability': extracted_data.get('availability', True) + }) + + logger.info(f"Successfully scraped {site_name}: £{extracted_data['price']}") + + except Exception as e: + logger.error(f"Error scraping {url}: {e}") + result['error'] = str(e) + + return result + + def _detect_site(self, url: str) -> Optional[str]: + """Detect which UK catering site this URL belongs to.""" + url_lower = url.lower() + + if 'jjfoodservice.com' in url_lower: + return 'jjfoodservice' + elif 'atoz-catering.co.uk' in url_lower: + return 'atoz_catering' + elif 'amazon.co.uk' in url_lower: + return 'amazon_uk' + elif 'tesco.com' in url_lower: + return 'tesco' + elif 'sainsburys.co.uk' in url_lower: + return 'sainsburys' + elif 'booker.co.uk' in url_lower: + return 'booker' + + # Fall back to parent detection for other sites + return super()._detect_site(url) diff --git a/src/utils.py b/src/utils.py new file mode 100644 index 0000000..c896097 --- /dev/null +++ b/src/utils.py @@ -0,0 +1,118 @@ +""" +Utility functions for the price tracker +""" + +import logging +from typing import Dict, Any, List +from datetime import datetime, timedelta + +logger = logging.getLogger(__name__) + + +def format_price(price: float, currency: str = 'GBP') -> str: + """Format price with appropriate currency symbol.""" + if currency == 'GBP': + return f"£{price:.2f}" + elif currency == 'USD': + return f"${price:.2f}" + elif currency == 'EUR': + return f"€{price:.2f}" + else: + return f"{price:.2f} {currency}" + + +def calculate_price_change(old_price: float, new_price: float) -> Dict[str, Any]: + """Calculate price change percentage and direction.""" + if old_price == 0: + return { + 'change': 0.0, + 'percentage': 0.0, + 'direction': 'stable' + } + + change = new_price - old_price + percentage = (change / old_price) * 100 + + if percentage > 0.1: + direction = 'up' + elif percentage < -0.1: + direction = 'down' + else: + direction = 'stable' + + return { + 'change': change, + 'percentage': percentage, + 'direction': direction + } + + +def is_site_accessible(site_name: str, last_success: datetime = None) -> bool: + """Check if a site is likely accessible based on recent success.""" + if not last_success: + return True # Assume accessible if no data + + # Consider site inaccessible if no success in last 24 hours + return (datetime.now() - last_success) < timedelta(hours=24) + + +def get_retry_delay(attempt: int, base_delay: float = 1.0, max_delay: float = 60.0) -> float: + """Calculate exponential backoff delay with jitter.""" + import random + delay = min(base_delay * (2 ** attempt), max_delay) + jitter = random.uniform(0, delay * 0.1) # Add 10% jitter + return delay + jitter + + +def clean_product_name(name: str) -> str: + """Clean and normalize product name.""" + import re + # Remove extra whitespace and normalize + name = re.sub(r'\s+', ' ', name.strip()) + # Remove special characters that might cause issues + name = re.sub(r'[^\w\s\-\(\)&]', '', name) + return name + + +def is_valid_price(price: float) -> bool: + """Check if a price is valid (positive and reasonable).""" + return price > 0 and price < 10000 # Max £10,000 seems reasonable for catering supplies + + +def get_price_alert_message(product_name: str, site_name: str, current_price: float, + target_price: float, currency: str = 'GBP') -> str: + """Generate price alert message.""" + current_formatted = format_price(current_price, currency) + target_formatted = format_price(target_price, currency) + + return (f"Price Alert: {product_name} is now {current_formatted} on {site_name}, " + f"which is at or below your target price of {target_formatted}!") + + +def group_results_by_status(results: Dict[str, Dict[str, Any]]) -> Dict[str, List]: + """Group scraping results by success/failure status.""" + grouped = { + 'successful': [], + 'failed': [], + 'blocked': [] + } + + for site_name, result in results.items(): + if result.get('success'): + grouped['successful'].append({ + 'site': site_name, + 'price': result.get('price'), + 'currency': result.get('currency', 'GBP') + }) + elif 'blocked' in str(result.get('error', '')).lower() or '403' in str(result.get('error', '')): + grouped['blocked'].append({ + 'site': site_name, + 'error': result.get('error') + }) + else: + grouped['failed'].append({ + 'site': site_name, + 'error': result.get('error') + }) + + return grouped diff --git a/src/web_ui.py b/src/web_ui.py new file mode 100644 index 0000000..abd6923 --- /dev/null +++ b/src/web_ui.py @@ -0,0 +1,271 @@ +""" +Web UI for the price tracker application +""" + +from flask import Flask, render_template, request, jsonify, redirect, url_for, flash, send_from_directory +from flask_wtf import FlaskForm +from wtforms import StringField, FloatField, TextAreaField, SubmitField, URLField +from wtforms.validators import DataRequired, NumberRange, URL, Optional +import json +import asyncio +from datetime import datetime, timedelta +import plotly +import plotly.graph_objs as go +import pandas as pd +import os + +from .database import DatabaseManager +from .config import Config +from .scraper_manager import ScraperManager +from .notification import NotificationManager +from .utils import format_price, group_results_by_status + + +def create_app(): + """Create Flask application.""" + # Get the project root directory (parent of src) + project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + template_dir = os.path.join(project_root, 'templates') + + app = Flask(__name__, template_folder=template_dir) + app.config['SECRET_KEY'] = 'your-secret-key-change-this' + + # Initialize components + config = Config() + db_manager = DatabaseManager(config.database_path) + scraper_manager = ScraperManager(config) + notification_manager = NotificationManager(config) + + class ProductForm(FlaskForm): + name = StringField('Product Name', validators=[DataRequired()]) + description = TextAreaField('Description') + target_price = FloatField('Target Price (£)', validators=[Optional(), NumberRange(min=0)]) + jjfoodservice_url = URLField('JJ Food Service URL', validators=[Optional(), URL()]) + atoz_catering_url = URLField('A to Z Catering URL', validators=[Optional(), URL()]) + amazon_uk_url = URLField('Amazon UK URL', validators=[Optional(), URL()]) + submit = SubmitField('Add Product') + + @app.route('/') + def index(): + """Home page showing all products.""" + products = db_manager.get_all_products() + + # Get latest prices for each product + for product in products: + latest_prices = db_manager.get_latest_prices(product['id']) + product['latest_prices'] = latest_prices + + # Find best current price + if latest_prices: + best_price = min(latest_prices.values(), key=lambda x: x['price']) + product['best_price'] = best_price + else: + product['best_price'] = None + + return render_template('index.html', products=products) + + @app.route('/add_product', methods=['GET', 'POST']) + def add_product(): + """Add a new product to track.""" + form = ProductForm() + + if form.validate_on_submit(): + urls = {} + if form.jjfoodservice_url.data: + urls['jjfoodservice'] = form.jjfoodservice_url.data + if form.atoz_catering_url.data: + urls['atoz_catering'] = form.atoz_catering_url.data + if form.amazon_uk_url.data: + urls['amazon_uk'] = form.amazon_uk_url.data + + if not urls: + flash('Please provide at least one URL to track.', 'error') + return render_template('add_product.html', form=form) + + try: + product_id = db_manager.add_product( + name=form.name.data, + description=form.description.data, + target_price=form.target_price.data, + urls=urls + ) + flash(f'Product "{form.name.data}" added successfully!', 'success') + return redirect(url_for('product_detail', product_id=product_id)) + except Exception as e: + flash(f'Error adding product: {str(e)}', 'error') + + return render_template('add_product.html', form=form) + + @app.route('/product/') + def product_detail(product_id): + """Show detailed information for a product.""" + product = db_manager.get_product(product_id) + if not product: + flash('Product not found.', 'error') + return redirect(url_for('index')) + + # Get price history + price_history = db_manager.get_price_history(product_id, days=30) + latest_prices = db_manager.get_latest_prices(product_id) + price_stats = db_manager.get_price_statistics(product_id, days=30) + + # Create price chart + chart_json = create_price_chart(price_history, product['name']) + + return render_template('product_detail.html', + product=product, + price_history=price_history, + latest_prices=latest_prices, + price_stats=price_stats, + chart_json=chart_json) + + @app.route('/scrape/', methods=['POST']) + def scrape_product(product_id): + """Manually trigger scraping for a specific product.""" + product = db_manager.get_product(product_id) + if not product: + return jsonify({'error': 'Product not found'}), 404 + + try: + # Run scraping in a new event loop (since we're in Flask) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + results = loop.run_until_complete(scraper_manager.scrape_product(product)) + + # Save results to database + for site_name, result in results.items(): + if result['success']: + db_manager.save_price_history( + product_id=product_id, + site_name=site_name, + price=result['price'], + availability=result.get('availability', True), + timestamp=datetime.now() + ) + + loop.close() + + return jsonify({ + 'success': True, + 'results': results, + 'message': 'Scraping completed successfully' + }) + + except Exception as e: + return jsonify({'error': str(e)}), 500 + + @app.route('/scrape_all', methods=['POST']) + def scrape_all_products(): + """Trigger scraping for all products.""" + try: + products = db_manager.get_all_products() + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + results = loop.run_until_complete(scraper_manager.scrape_all_products(products)) + + # Save results to database + total_updated = 0 + for product_id, site_results in results.items(): + for site_name, result in site_results.items(): + if result['success']: + db_manager.save_price_history( + product_id=product_id, + site_name=site_name, + price=result['price'], + availability=result.get('availability', True), + timestamp=datetime.now() + ) + total_updated += 1 + + loop.close() + + return jsonify({ + 'success': True, + 'total_updated': total_updated, + 'message': f'Updated prices for {total_updated} product-site combinations' + }) + + except Exception as e: + return jsonify({'error': str(e)}), 500 + + @app.route('/api/products') + def api_products(): + """API endpoint to get all products.""" + products = db_manager.get_all_products() + return jsonify(products) + + @app.route('/api/product//prices') + def api_product_prices(product_id): + """API endpoint to get price history for a product.""" + days = request.args.get('days', 30, type=int) + price_history = db_manager.get_price_history(product_id, days) + return jsonify(price_history) + + @app.route('/settings') + def settings(): + """Settings page.""" + return render_template('settings.html', config=config) + + @app.route('/test_notifications', methods=['POST']) + def test_notifications(): + """Test notification system.""" + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + result = loop.run_until_complete(notification_manager.send_test_notification()) + loop.close() + + return jsonify(result) + except Exception as e: + return jsonify({'error': str(e)}), 500 + + @app.route('/favicon.ico') + def favicon(): + """Serve the favicon.""" + return send_from_directory(os.path.join(app.root_path, 'static'), + 'favicon.ico', mimetype='image/vnd.microsoft.icon') + + def create_price_chart(price_history, product_name): + """Create a price history chart using Plotly.""" + if not price_history: + return json.dumps({}) + + # Convert to DataFrame for easier manipulation + df = pd.DataFrame(price_history) + df['timestamp'] = pd.to_datetime(df['timestamp']) + + # Create traces for each site + traces = [] + sites = df['site_name'].unique() + + colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd'] + + for i, site in enumerate(sites): + site_data = df[df['site_name'] == site].sort_values('timestamp') + + trace = go.Scatter( + x=site_data['timestamp'], + y=site_data['price'], + mode='lines+markers', + name=site.title(), + line=dict(color=colors[i % len(colors)], width=2), + marker=dict(size=6) + ) + traces.append(trace) + + layout = go.Layout( + title=f'Price History - {product_name}', + xaxis=dict(title='Date'), + yaxis=dict(title='Price (USD)'), + hovermode='closest', + margin=dict(l=50, r=50, t=50, b=50) + ) + + fig = go.Figure(data=traces, layout=layout) + return json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder) + + return app diff --git a/static/favicon.ico b/static/favicon.ico new file mode 100644 index 0000000..0f1e27b --- /dev/null +++ b/static/favicon.ico @@ -0,0 +1,2 @@ +# Simple placeholder favicon +# This prevents 404 errors in the browser logs diff --git a/templates/add_product.html b/templates/add_product.html new file mode 100644 index 0000000..ac43c38 --- /dev/null +++ b/templates/add_product.html @@ -0,0 +1,184 @@ +{% extends "base.html" %} + +{% block title %}Add Product - Price Tracker{% endblock %} + +{% block content %} +
+
+
+
+

+ Add New Product +

+
+
+
+ {{ form.hidden_tag() }} + +
+
+ {{ form.name.label(class="form-label fw-bold") }} + {{ form.name(class="form-control form-control-lg") }} + {% if form.name.errors %} +
+ {% for error in form.name.errors %} +
{{ error }}
+ {% endfor %} +
+ {% endif %} +
+
+ {{ form.target_price.label(class="form-label fw-bold") }} +
+ £ + {{ form.target_price(class="form-control form-control-lg") }} +
+ {% if form.target_price.errors %} +
+ {% for error in form.target_price.errors %} +
{{ error }}
+ {% endfor %} +
+ {% endif %} + Optional: Alert when price drops below this +
+
+ +
+ {{ form.description.label(class="form-label fw-bold") }} + {{ form.description(class="form-control", rows="3") }} + {% if form.description.errors %} +
+ {% for error in form.description.errors %} +
{{ error }}
+ {% endfor %} +
+ {% endif %} + Optional: Brief description of the product +
+ +
+ +

+ Product URLs +

+

Add URLs from the sites you want to track. At least one URL is required.

+ +
+
+ {{ form.jjfoodservice_url.label(class="form-label fw-bold") }} +
+ + JJ Food Service + + {{ form.jjfoodservice_url(class="form-control", placeholder="https://www.jjfoodservice.com/...") }} +
+ {% if form.jjfoodservice_url.errors %} +
+ {% for error in form.jjfoodservice_url.errors %} +
{{ error }}
+ {% endfor %} +
+ {% endif %} +
+ +
+ {{ form.atoz_catering_url.label(class="form-label fw-bold") }} +
+ + A to Z Catering + + {{ form.atoz_catering_url(class="form-control", placeholder="https://www.atoz-catering.co.uk/...") }} +
+ {% if form.atoz_catering_url.errors %} +
+ {% for error in form.atoz_catering_url.errors %} +
{{ error }}
+ {% endfor %} +
+ {% endif %} +
+ +
+ {{ form.amazon_uk_url.label(class="form-label fw-bold") }} +
+ + Amazon UK + + {{ form.amazon_uk_url(class="form-control", placeholder="https://www.amazon.co.uk/...") }} +
+ {% if form.amazon_uk_url.errors %} +
+ {% for error in form.amazon_uk_url.errors %} +
{{ error }}
+ {% endfor %} +
+ {% endif %} +
+
+ +
+ + Tips: +
    +
  • Make sure URLs point to the specific product page
  • +
  • Test URLs in your browser first to ensure they work
  • +
  • Some sites may block automated requests - we'll handle this gracefully
  • +
  • For best results, use direct product page URLs
  • +
+
+ +
+ + Cancel + + {{ form.submit(class="btn btn-primary btn-lg") }} +
+
+
+
+
+
+ +
+
+
+
+
+ How to Find Product URLs +
+
+
+
+
+
JJ Food Service
+

+ Navigate to the specific product page on JJ Food Service and copy the URL. + Make sure you're logged in for accurate pricing. +

+ +
A to Z Catering
+

+ Go to the product page on A to Z Catering and copy the URL. + URLs typically contain "/products/product/" followed by the product name. +

+
+
+
Amazon UK
+

+ Navigate to the product page on Amazon.co.uk and copy the URL. + The URL should contain "/dp/" followed by the product identifier. +

+ +
Note
+

+ We focus on UK catering supply websites that work well with automated price tracking. + This provides reliable price monitoring for your business needs. +

+
+
+
+
+
+
+{% endblock %} diff --git a/templates/base.html b/templates/base.html new file mode 100644 index 0000000..0255808 --- /dev/null +++ b/templates/base.html @@ -0,0 +1,225 @@ + + + + + + {% block title %}Price Tracker{% endblock %} + + + + + + + +
+ {% with messages = get_flashed_messages(with_categories=true) %} + {% if messages %} + {% for category, message in messages %} + + {% endfor %} + {% endif %} + {% endwith %} + + {% block content %}{% endblock %} +
+ +
+
+

© 2025 Price Tracker. Built with Beautiful Soup & Flask.

+
+
+ + + + + {% block scripts %}{% endblock %} + + diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..49a183a --- /dev/null +++ b/templates/index.html @@ -0,0 +1,184 @@ +{% extends "base.html" %} + +{% block title %}Dashboard - Price Tracker{% endblock %} + +{% block content %} +
+

+ Dashboard +

+ + Add Product + +
+ +{% if not products %} +
+
+
+ +

No Products Yet

+

Start tracking prices by adding your first product!

+ + Add Your First Product + +
+
+
+{% else %} +
+ {% for product in products %} +
+
+
+
+
{{ product.name }}
+ {% if product.target_price %} + + Target: £{{ "%.2f"|format(product.target_price) }} + + {% endif %} +
+ + {% if product.description %} +

{{ product.description[:100] }}{% if product.description|length > 100 %}...{% endif %}

+ {% endif %} + + +
+ Tracking on:
+ {% for site_name in product.urls.keys() %} + {{ site_name.title() }} + {% endfor %} +
+ + + {% if product.latest_prices %} +
+ {% for site_name, price_data in product.latest_prices.items() %} +
+
+ {{ site_name.title() }} +
+ £{{ "%.2f"|format(price_data.price) }} +
{{ price_data.timestamp[:10] }} +
+
+
+ {% endfor %} +
+ + + {% if product.best_price %} +
+ + Best Price: £{{ "%.2f"|format(product.best_price.price) }} + {% if product.target_price and product.best_price.price <= product.target_price %} + + Target Reached! + + {% endif %} +
+ {% endif %} + {% else %} +
+ + No price data yet. Click "Scrape Now" to get prices. +
+ {% endif %} + + +
+
+ + Details + + +
+
+
+ + + +
+
+ {% endfor %} +
+ + +
+
+
+
+ +

{{ products|length }}

+

Products Tracked

+
+
+
+
+
+
+ +

+ {% if products %} + {% set total_urls = 0 %} + {% for product in products %} + {% set total_urls = total_urls + product.urls|length %} + {% endfor %} + {{ total_urls }} + {% else %} + 0 + {% endif %} +

+

Total URLs

+
+
+
+
+
+
+ +

+ {% set alerts = [] %} + {% for product in products %} + {% if product.target_price and product.best_price and product.best_price.price <= product.target_price %} + {% set _ = alerts.append(1) %} + {% endif %} + {% endfor %} + {{ alerts|length }} +

+

Price Alerts

+
+
+
+
+
+
+ +

+ {% set total_savings = 0 %} + {% for product in products %} + {% if product.target_price and product.best_price %} + {% set savings = product.target_price - product.best_price.price %} + {% if savings > 0 %} + {% set total_savings = total_savings + savings %} + {% endif %} + {% endif %} + {% endfor %} + £{{ "%.0f"|format(total_savings) }} +

+

Potential Savings

+
+
+
+
+{% endif %} +{% endblock %} diff --git a/templates/product_detail.html b/templates/product_detail.html new file mode 100644 index 0000000..676c128 --- /dev/null +++ b/templates/product_detail.html @@ -0,0 +1,234 @@ +{% extends "base.html" %} + +{% block title %}{{ product.name }} - Price Tracker{% endblock %} + +{% block content %} +
+
+

{{ product.name }}

+ {% if product.description %} +

{{ product.description }}

+ {% endif %} +
+
+ + + Back to Dashboard + +
+
+ +
+ +
+
+
+
+ Current Prices +
+
+
+ {% if latest_prices %} + {% set price_list = latest_prices.values() | list %} + {% set min_price = price_list | min(attribute='price') %} + {% set max_price = price_list | max(attribute='price') %} + + {% for site_name, price_data in latest_prices.items() %} +
+
+ {{ site_name.title() }} + {% if not price_data.availability %} + Out of Stock + {% endif %} + {% if price_data.price == min_price.price %} + + Best Price + + {% endif %} +
+
+
£{{ "%.2f"|format(price_data.price) }}
+ {{ price_data.timestamp[:10] }} +
+
+ {% endfor %} + + {% if product.target_price %} +
+
+ Target Price: + £{{ "%.2f"|format(product.target_price) }} +
+ + {% if min_price.price <= product.target_price %} +
+ + Target Reached! Best price is at or below your target. +
+ {% else %} +
+ + You could save £{{ "%.2f"|format(min_price.price - product.target_price) }} + when price drops to target. +
+ {% endif %} + {% endif %} + + {% else %} +

+
+ No price data available yet.
+ +

+ {% endif %} +
+
+ + +
+
+
+ Tracked URLs +
+
+
+ {% for site_name, url in product.urls.items() %} +
+ {{ site_name.title() }} + + View + +
+ {% endfor %} +
+
+
+ + +
+
+
+
+ Price History (Last 30 Days) +
+
+
+ {% if price_history %} +
+ {% else %} +

+
+ No price history available yet. Price data will appear here after scraping. +

+ {% endif %} +
+
+ + + {% if price_stats %} +
+
+
+ Price Statistics (Last 30 Days) +
+
+
+
+ {% for site_name, stats in price_stats.items() %} +
+
+
+
+ {{ site_name.title() }} +
+
+
+ Min Price +
£{{ "%.2f"|format(stats.min_price) }}
+
+
+ Max Price +
£{{ "%.2f"|format(stats.max_price) }}
+
+
+ Avg Price +
£{{ "%.2f"|format(stats.avg_price) }}
+
+
+ Data Points +
{{ stats.data_points }}
+
+
+
+
+
+ {% endfor %} +
+
+
+ {% endif %} + + + {% if price_history %} +
+
+
+ Recent Price Updates +
+
+
+
+ + + + + + + + + + + {% for entry in price_history[:20] %} + + + + + + + {% endfor %} + +
SitePriceAvailableDate
+ {{ entry.site_name.title() }} + £{{ "%.2f"|format(entry.price) }} + {% if entry.availability %} + Available + {% else %} + Out of Stock + {% endif %} + {{ entry.timestamp[:16] }}
+
+ {% if price_history|length > 20 %} +

+ Showing 20 most recent entries of {{ price_history|length }} total. +

+ {% endif %} +
+
+ {% endif %} +
+
+{% endblock %} + +{% block scripts %} +{% if chart_json %} + +{% endif %} +{% endblock %} diff --git a/templates/settings.html b/templates/settings.html new file mode 100644 index 0000000..10de9a4 --- /dev/null +++ b/templates/settings.html @@ -0,0 +1,217 @@ +{% extends "base.html" %} + +{% block title %}Settings - Price Tracker{% endblock %} + +{% block content %} +
+
+

+ Settings +

+ + +
+
+
+ Scraping Configuration +
+
+
+
+
+
Request Settings
+
    +
  • Delay between requests: {{ config.delay_between_requests }}s
  • +
  • Max concurrent requests: {{ config.max_concurrent_requests }}
  • +
  • Request timeout: {{ config.timeout }}s
  • +
  • Retry attempts: {{ config.retry_attempts }}
  • +
+
+
+
User Agents
+

{{ config.user_agents|length }} user agents configured

+
+ View user agents +
+ {% for ua in config.user_agents %} +
{{ ua[:80] }}...
+ {% endfor %} +
+
+
+
+
+
+ + +
+
+
+ Supported Sites +
+
+
+
+ {% for site_name, site_config in config.sites_config.items() %} +
+
+
+
+ {{ site_name.title() }} + {% if site_config.enabled %} + Enabled + {% else %} + Disabled + {% endif %} +
+

+ Base URL: {{ site_config.base_url }}
+ Price selectors: {{ site_config.selectors.price|length }}
+ Title selectors: {{ site_config.selectors.title|length }} +

+
+
+
+ {% endfor %} +
+
+
+ + +
+
+
+ Notification Settings +
+ +
+
+
+
+
+ Email Notifications + {% if config.notification_config.email.enabled %} + Enabled + {% else %} + Disabled + {% endif %} +
+ {% if config.notification_config.email.enabled %} +
    +
  • SMTP Server: {{ config.notification_config.email.smtp_server }}
  • +
  • Port: {{ config.notification_config.email.smtp_port }}
  • +
  • Sender: {{ config.notification_config.email.sender_email }}
  • +
  • Recipient: {{ config.notification_config.email.recipient_email }}
  • +
+ {% else %} +

Email notifications are disabled. Configure in config.json to enable.

+ {% endif %} +
+
+
+ Webhook Notifications + {% if config.notification_config.webhook.enabled %} + Enabled + {% else %} + Disabled + {% endif %} +
+ {% if config.notification_config.webhook.enabled %} +

+ Webhook URL:
+ {{ config.notification_config.webhook.url }} +

+ {% else %} +

Webhook notifications are disabled. Configure in config.json to enable.

+ {% endif %} +
+
+
+
+ + +
+
+
+ Database Information +
+
+
+

Database Path: {{ config.database_path }}

+

+ The SQLite database stores all product information and price history. +

+
+
+
+ + +
+
+
+
+ Quick Actions +
+
+
+
+ + + +
+
+
+ + +
+
+
+ Configuration Help +
+
+
+
Configuration File
+

+ Settings are stored in config.json. + Edit this file to customize scraping behavior, add new sites, or configure notifications. +

+ +
Adding New Sites
+

+ To add support for new e-commerce sites, add a new section to the "sites" + configuration with CSS selectors for price, title, and availability. +

+ +
Email Setup
+

+ For Gmail, use smtp.gmail.com:587 and an app-specific password. + Enable "Less secure app access" or use OAuth2. +

+ +
Webhooks
+

+ Webhook notifications send JSON payloads to your specified URL. + Useful for integrating with Slack, Discord, or custom applications. +

+
+
+
+
+{% endblock %} + +{% block scripts %} + +{% endblock %}