diff --git a/azure_client.py b/azure_client.py deleted file mode 100644 index c4b67ff..0000000 --- a/azure_client.py +++ /dev/null @@ -1,239 +0,0 @@ -import os -from dotenv import load_dotenv -import logging -import json -import msal -import requests -from datetime import datetime, timezone - -# Load environment variables -load_dotenv() - -# Configure logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - -def get_app_registrations(): - logging.info("Authenticating to Microsoft Graph API") - - # Azure AD app credentials from environment variables - client_id = os.getenv('AZURE_CLIENT_ID') - client_secret = os.getenv('AZURE_CLIENT_SECRET') - tenant_id = os.getenv('AZURE_TENANT_ID') - - authority = f"https://login.microsoftonline.com/{tenant_id}" - scope = ["https://graph.microsoft.com/.default"] - - # Create a confidential client application - app = msal.ConfidentialClientApplication( - client_id, - authority=authority, - client_credential=client_secret - ) - - # Acquire a token - result = app.acquire_token_for_client(scopes=scope) - - if "access_token" in result: - logging.info("Successfully authenticated to Microsoft Graph API") - access_token = result["access_token"] - else: - logging.error("Failed to authenticate to Microsoft Graph API") - logging.error(result.get("error")) - logging.error(result.get("error_description")) - logging.error(result.get("correlation_id")) - return [] - - # Fetch app registrations with owners - # Updated URL to include both app registration and owner data - graph_url = ( - "https://graph.microsoft.com/v1.0/applications" - "?$select=id,appId,displayName,passwordCredentials" - "&$expand=owners($select=userPrincipalName)" - ) - - headers = { - "Authorization": f"Bearer {access_token}", - "ConsistencyLevel": "eventual" - } - - try: - response = requests.get(graph_url, headers=headers) - response.raise_for_status() - - # Debug log the raw response - #logging.info(f"API Response Status: {response.status_code}") - #logging.debug(f"API Response: {response.text[:1000]}...") # First 1000 chars - - app_registrations = response.json().get('value', []) - logging.info(f"Fetched {len(app_registrations)} app registrations") - return app_registrations - - except requests.exceptions.RequestException as e: - logging.error(f"Error fetching app registrations: {e}") - return [] - -def sort_app_registrations(app_registrations): - current_date = datetime.now(timezone.utc) - for app in app_registrations: - if app["passwordCredentials"]: - expiry_date_str = app["passwordCredentials"][0]["endDateTime"] - try: - if '.' in expiry_date_str: - expiry_date_str = expiry_date_str.split('.')[0] + '.' + expiry_date_str.split('.')[1][:6] + 'Z' - if expiry_date_str.endswith('ZZ'): - expiry_date_str = expiry_date_str[:-1] - expiry_date = datetime.strptime(expiry_date_str, '%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=timezone.utc) - except ValueError: - expiry_date = datetime.strptime(expiry_date_str, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc) - days_to_expiry = (expiry_date - current_date).days - app["days_to_expiry"] = days_to_expiry - app["expiry_date"] = expiry_date.isoformat() - else: - app["days_to_expiry"] = None - app["expiry_date"] = None - - sorted_apps = sorted(app_registrations, key=lambda x: (x["days_to_expiry"] is None, x["days_to_expiry"]), reverse=False) - return sorted_apps - -def generate_html(app_registrations): - current_time = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S') - html = f""" - - - App Registrations - - - -
-

Azure App Registration Expiry Notification

-

This is an automated notification regarding expiring Azure App Registrations that you own or manage.

- -

Why am I receiving this?
- You are receiving this email because you are listed as an owner of one or more Azure App Registrations that are approaching their expiration date or have already expired.

- -

Required Actions:

- - -

Color Coding:

- - -

If you need assistance, please contact the IT Support team.

-
- -

App Registrations

-

Exported on: {current_time}

- - - - - - - - """ - - for app in app_registrations: - password_credentials = app.get('passwordCredentials', []) - if not password_credentials: - continue - - expiry_date = password_credentials[0].get('endDateTime') - if expiry_date: - try: - if '.' in expiry_date: - expiry_date = expiry_date.split('.')[0] + '.' + expiry_date.split('.')[1][:6] + 'Z' - if expiry_date.endswith('ZZ'): - expiry_date = expiry_date[:-1] - expiry_date = datetime.strptime(expiry_date, '%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=timezone.utc) - except ValueError: - expiry_date = datetime.strptime(expiry_date, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc) - days_to_expiry = (expiry_date - datetime.now(timezone.utc)).days - - if days_to_expiry > 30: - color_class = "green" - elif 7 < days_to_expiry <= 30: - color_class = "yellow" - elif 1 <= days_to_expiry <= 7: - color_class = "orange" - else: - color_class = "red" - days_to_expiry = "EXPIRED" - - owners = app.get('owners', []) - owner_upns = [owner.get('userPrincipalName') for owner in owners if owner.get('userPrincipalName')] - owner_list = ', '.join(owner_upns) if owner_upns else 'No owners' - - html += f""" - - - - - - - """ - - html += """ -
Display NameExpiry DateDays to ExpiryOwners
{app['displayName']}{expiry_date.strftime('%Y-%m-%d')}{days_to_expiry}{owner_list}
- - - """ - - return html - -if __name__ == "__main__": - app_registrations = get_app_registrations() - sorted_app_registrations = sort_app_registrations(app_registrations) - # Write to JSON file for inspection - with open('debug_app_registrations.json', 'w') as f: - json.dump(sorted_app_registrations, f, indent=2) - # Write to HTML file for inspection - html_output = generate_html(sorted_app_registrations) - with open('app_registrations.html', 'w') as f: - f.write(html_output) \ No newline at end of file diff --git a/data_export.py b/data_export.py deleted file mode 100644 index ab37fbd..0000000 --- a/data_export.py +++ /dev/null @@ -1,234 +0,0 @@ -import json -import logging -from datetime import datetime - -# Configure logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - -# def write_to_json(app_registrations, filename='app_registrations.json'): -# """ -# Write app registration data to a JSON file. - -# :param app_registrations: List of app registration data -# :param filename: Name of the JSON file to write to -# """ -# try: -# with open(filename, 'w') as f: -# json.dump(app_registrations, f, indent=4) -# logging.info(f"App registration data successfully written to {filename}") -# except Exception as e: -# logging.error(f"Failed to write app registration data to {filename}: {e}") - -def generate_html(app_registrations): - """ - Generate an HTML representation of the app registration data. - - :param app_registrations: List of app registration data - :return: HTML string - """ - current_time = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') - html = f""" - - - App Registrations - - - -
-

Azure App Registration Expiry Notification

-

This is an automated notification regarding expiring Azure App Registrations that you own or manage.

- -

Why am I receiving this?
- You are receiving this email because you are listed as an owner of one or more Azure App Registrations that are approaching their expiration date or have already expired.

- -

Required Actions:

- - -

Color Coding:

- - -

If you need assistance, please contact the IT Support team.

-
- -

App Registrations

-

Exported on: {current_time}

- - - - - - - - """ - - for app in app_registrations: - password_credentials = app.get('passwordCredentials', []) - if not password_credentials: - continue - - expiry_date = password_credentials[0].get('endDateTime') - if expiry_date: - # Clean up the date string - if expiry_date.endswith('ZZ'): - expiry_date = expiry_date[:-1] - elif expiry_date.endswith('Z'): - expiry_date = expiry_date[:-1] - # Truncate the fractional seconds part to 6 digits if present - if '.' in expiry_date: - expiry_date = expiry_date.split('.')[0] + '.' + expiry_date.split('.')[1][:6] + 'Z' - else: - expiry_date += '.000000Z' - try: - expiry_date = datetime.strptime(expiry_date, '%Y-%m-%dT%H:%M:%S.%fZ') - except ValueError as e: - logging.error(f"Error parsing expiry date for {app['displayName']}: {e}") - continue - days_to_expiry = (expiry_date - datetime.utcnow()).days - - # Determine row color class - if days_to_expiry > 30: - color_class = "green" - elif 7 < days_to_expiry <= 30: - color_class = "yellow" - elif 1 <= days_to_expiry <= 7: - color_class = "orange" - else: - color_class = "red" - days_to_expiry = "EXPIRED" - - # Get owner information - owners = app.get('owners', []) - owner_upns = [owner.get('userPrincipalName') for owner in owners if owner.get('userPrincipalName')] - owner_list = ', '.join(owner_upns) if owner_upns else 'No owners' - - html += f""" - - - - - - - """ - - html += """ -
Display NameExpiry DateDays to ExpiryOwners
{app['displayName']}{expiry_date.strftime('%Y-%m-%d')}{days_to_expiry}{owner_list}
- - - """ - - return html - -def generate_expiry_text(app_name, days_to_expiry, expiry_date): - if days_to_expiry > 30: - color = "#28a745" # green - elif days_to_expiry > 7: - color = "#ffc107" # yellow - elif days_to_expiry > 0: - color = "#ff9800" # orange - else: - color = "#dc3545" # red - days_to_expiry = "EXPIRED" - - return f""" -
-

The app registration {app_name} is set to expire in - - {days_to_expiry} - - days on {expiry_date.strftime('%Y-%m-%d')}

-
- """ - -def sort_app_registrations(app_registrations): - current_date = datetime.utcnow() - for app in app_registrations: - expiry_date_str = app["passwordCredentials"][0]["endDateTime"] - expiry_date = datetime.strptime(expiry_date_str.split('.')[0], '%Y-%m-%dT%H:%M:%S') - days_to_expiry = (expiry_date - current_date).days - app["days_to_expiry"] = days_to_expiry - app["expiry_date"] = expiry_date - - sorted_apps = sorted(app_registrations, key=lambda x: x["days_to_expiry"], reverse=False) - return sorted_apps - -# Example usage -if __name__ == "__main__": - # Sample app registration data - app_registrations = [ - { - "displayName": "App1", - "passwordCredentials": [{"endDateTime": "2024-12-31T23:59:59.9999999Z"}] - }, - { - "displayName": "App2", - "passwordCredentials": [{"endDateTime": "2023-12-31T23:59:59.9999999Z"}] - }, - { - "displayName": "App3", - "passwordCredentials": [{"endDateTime": "2025-01-15T23:59:59.9999999Z"}] - } - ] - - sorted_apps = sort_app_registrations(app_registrations) - html_output = "" - for app in sorted_apps: - html_output += generate_expiry_text(app["displayName"], app["days_to_expiry"], app["expiry_date"]) - - print(html_output) - - # Write to JSON - write_to_json(app_registrations) - - # Generate HTML - html_content = generate_html(app_registrations) - with open('app_registrations.html', 'w') as f: - f.write(html_content) - logging.info("HTML content successfully written to app_registrations.html") \ No newline at end of file diff --git a/getdrive.py b/getdrive.py deleted file mode 100644 index 056222c..0000000 --- a/getdrive.py +++ /dev/null @@ -1,75 +0,0 @@ -import os -import requests -import logging -from dotenv import load_dotenv - -# Load environment variables -load_dotenv() - -# Configure logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(asctime)s - %(message)s') - -def get_access_token(): - client_id = os.getenv('AZURE_CLIENT_ID') - client_secret = os.getenv('AZURE_CLIENT_SECRET') - tenant_id = os.getenv('AZURE_TENANT_ID') - - token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token" - token_data = { - 'grant_type': 'client_credentials', - 'client_id': client_id, - 'client_secret': client_secret, - 'scope': 'https://graph.microsoft.com/.default' - } - - token_response = requests.post(token_url, data=token_data) - access_token = token_response.json().get('access_token') - if not access_token: - logging.error("Failed to obtain access token") - logging.error(token_response.json()) - return None - - return access_token - -def list_drive_items(site_id, drive_id, parent_id=None): - access_token = get_access_token() - if not access_token: - return None - - headers = { - 'Authorization': f'Bearer {access_token}', - 'Content-Type': 'application/json' - } - - if parent_id: - items_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/items/{parent_id}/children" - else: - items_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/root/children" - - response = requests.get(items_url, headers=headers) - if response.status_code != 200: - logging.error(f"Failed to list drive items: {response.status_code}") - logging.error(response.json()) - return None - - items = response.json().get('value', []) - if not items: - logging.error("No items found in the drive") - return None - - for item in items: - logging.info(f"Found item: {item.get('name')} with ID: {item.get('id')}") - if item.get('folder'): - # Recursively list items in the folder - list_drive_items(site_id, drive_id, item.get('id')) - - return items - -if __name__ == "__main__": - site_id = "opassey.sharepoint.com,8e038e2a-139b-4e46-893a-bcf76062e063,5dcd71be-16b7-42ba-add6-4068b88ef3aa" - drive_id = os.getenv('SHAREPOINT_DRIVE_ID') - items = list_drive_items(site_id, drive_id) - if items: - logging.info("Listed drive items successfully") - else: - logging.error("Failed to list drive items") \ No newline at end of file diff --git a/getsites.py b/getsites.py deleted file mode 100644 index 5c8f10c..0000000 --- a/getsites.py +++ /dev/null @@ -1,66 +0,0 @@ -import os -import requests -import logging -from dotenv import load_dotenv - -# Load environment variables -load_dotenv() - -# Configure logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - -def get_access_token(): - client_id = os.getenv('AZURE_CLIENT_ID') - client_secret = os.getenv('AZURE_CLIENT_SECRET') - tenant_id = os.getenv('AZURE_TENANT_ID') - - token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token" - token_data = { - 'grant_type': 'client_credentials', - 'client_id': client_id, - 'client_secret': client_secret, - 'scope': 'https://graph.microsoft.com/.default' - } - - token_response = requests.post(token_url, data=token_data) - access_token = token_response.json().get('access_token') - if not access_token: - logging.error("Failed to obtain access token") - logging.error(token_response.json()) - return None - - return access_token - -def list_sites(): - access_token = get_access_token() - if not access_token: - return None - - headers = { - 'Authorization': f'Bearer {access_token}', - 'Content-Type': 'application/json' - } - - sites_url = "https://graph.microsoft.com/v1.0/sites?search=*" - response = requests.get(sites_url, headers=headers) - if response.status_code != 200: - logging.error(f"Failed to list sites: {response.status_code}") - logging.error(response.json()) - return None - - sites = response.json().get('value', []) - if not sites: - logging.error("No sites found") - return None - - for site in sites: - logging.info(f"Found site: {site.get('name')} with ID: {site.get('id')}") - - return sites - -if __name__ == "__main__": - sites = list_sites() - if sites: - logging.info("Listed sites successfully") - else: - logging.error("Failed to list sites") \ No newline at end of file diff --git a/gettoken.py b/gettoken.py deleted file mode 100644 index 7d6935f..0000000 --- a/gettoken.py +++ /dev/null @@ -1,67 +0,0 @@ -import os -import requests -import logging -from dotenv import load_dotenv - -# Load environment variables -load_dotenv() - -# Configure logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - -def get_access_token(): - client_id = os.getenv('AZURE_CLIENT_ID') - client_secret = os.getenv('AZURE_CLIENT_SECRET') - tenant_id = os.getenv('AZURE_TENANT_ID') - - token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token" - token_data = { - 'grant_type': 'client_credentials', - 'client_id': client_id, - 'client_secret': client_secret, - 'scope': 'https://graph.microsoft.com/.default' - } - - token_response = requests.post(token_url, data=token_data) - access_token = token_response.json().get('access_token') - if not access_token: - logging.error("Failed to obtain access token") - logging.error(token_response.json()) - return None - - return access_token - -def get_drive_id(site_id): - access_token = get_access_token() - if not access_token: - return None - - headers = { - 'Authorization': f'Bearer {access_token}', - 'Content-Type': 'application/json' - } - - drives_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives" - response = requests.get(drives_url, headers=headers) - if response.status_code != 200: - logging.error(f"Failed to get drives: {response.status_code}") - logging.error(response.json()) - return None - - drives = response.json().get('value', []) - if not drives: - logging.error("No drives found") - return None - - # Assuming you want the first drive in the list - drive_id = drives[0].get('id') - logging.info(f"Found drive ID: {drive_id}") - return drive_id - -if __name__ == "__main__": - site_id = os.getenv('SHAREPOINT_SITE_ID') - drive_id = get_drive_id(site_id) - if drive_id: - logging.info(f"Drive ID: {drive_id}") - else: - logging.error("Failed to obtain drive ID") \ No newline at end of file diff --git a/main.py b/main.py deleted file mode 100644 index 8820a2e..0000000 --- a/main.py +++ /dev/null @@ -1,16 +0,0 @@ -from azure_client import get_app_registrations -from sharepoint_client import store_app_registrations -from notification import send_notifications -from data_export import write_to_json, generate_html - -def main(): - app_registrations = get_app_registrations() - store_app_registrations(app_registrations) - write_to_json(app_registrations) - html_content = generate_html(app_registrations) - with open('app_registrations.html', 'w') as f: - f.write(html_content) - send_notifications(app_registrations) - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/notification.py b/notification.py deleted file mode 100644 index 9e68025..0000000 --- a/notification.py +++ /dev/null @@ -1,142 +0,0 @@ -import os -from dotenv import load_dotenv -import smtplib -from email.mime.text import MIMEText -from email.utils import formataddr -from datetime import datetime -import requests -import json -import logging -from data_export import generate_html, generate_expiry_text - -# Load environment variables -load_dotenv() - -# Configure logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - -def send_notifications(app_registrations): - # Email credentials from environment variables - smtp_server = os.getenv('SMTP_SERVER') - smtp_port = int(os.getenv('SMTP_PORT')) - smtp_username = os.getenv('SMTP_USERNAME') - smtp_password = os.getenv('SMTP_PASSWORD') - from_email = os.getenv('FROM_EMAIL') - from_name = os.getenv('FROM_NAME') - to_email = os.getenv('TO_EMAIL') - - # Teams webhook URL from environment variables - teams_webhook_url = os.getenv('TEAMS_WEBHOOK_URL') - - # Get the current date - current_date = datetime.utcnow() - notification_periods = [60, 30, 7, 3, 2, 1] - - # Generate HTML content - html_content = generate_html(app_registrations) - - # Send notification email and Teams message - for app in app_registrations: - # Debug log the entire app object - #logging.info(f"Processing app: {json.dumps(app, indent=2)}") - - password_credentials = app.get('passwordCredentials', []) - if not password_credentials: - logging.warning(f"No password credentials found for {app['displayName']}") - continue - - expiry_date = password_credentials[0].get('endDateTime') - if expiry_date: - # Clean up the date string - if expiry_date.endswith('ZZ'): - expiry_date = expiry_date[:-1] - elif expiry_date.endswith('Z'): - expiry_date = expiry_date[:-1] - # Truncate the fractional seconds part to 6 digits if present - if '.' in expiry_date: - expiry_date = expiry_date.split('.')[0] + '.' + expiry_date.split('.')[1][:6] + 'Z' - else: - expiry_date += '.000000Z' - try: - expiry_date = datetime.strptime(expiry_date, '%Y-%m-%dT%H:%M:%S.%fZ') - except ValueError as e: - logging.error(f"Error parsing expiry date for {app['displayName']}: {e}") - continue - days_to_expiry = (expiry_date - current_date).days - if days_to_expiry in notification_periods or days_to_expiry < 0: - subject = f"App Registration Expiry Notification: {app['displayName']}" - body = generate_expiry_text(app['displayName'], days_to_expiry, expiry_date) + html_content - - # Fetch and debug log owner information - owners = app.get('owners', []) - #logging.info(f"Found owners for {app['displayName']}: {json.dumps(owners, indent=2)}") - - # Get CC emails from owners - cc_emails = [] - for owner in owners: - email = owner.get('userPrincipalName') or owner.get('mail') - if email: - cc_emails.append(email) - logging.info(f"Added owner email for {app['displayName']}: {email}") - - # Create email message - msg = MIMEText(body, 'html') - msg['Subject'] = subject - msg['From'] = formataddr((from_name, from_email)) - msg['To'] = to_email - - # Add CC recipients if any found - if cc_emails: - msg['Cc'] = ', '.join(cc_emails) - logging.info(f"Added CC recipients for {app['displayName']}: {cc_emails}") - - try: - # Include CC recipients in sendmail - all_recipients = [to_email] + cc_emails - logging.info(f"Sending email for {app['displayName']} to all recipients: {all_recipients}") - - with smtplib.SMTP(smtp_server, smtp_port) as server: - server.starttls() - server.login(smtp_username, smtp_password) - server.sendmail(from_email, all_recipients, msg.as_string()) - logging.info(f"Successfully sent email for {app['displayName']}") - except Exception as e: - logging.error(f"Failed to send email for {app['displayName']}: {e}") - - # Send Teams notification - teams_message = { - "@type": "MessageCard", - "@context": "http://schema.org/extensions", - "summary": subject, - "themeColor": "0076D7", - "title": subject, - "sections": [{ - "activityTitle": f"App Registration Expiry Notification", - "text": body - }] - } - - try: - response = requests.post(teams_webhook_url, headers={"Content-Type": "application/json"}, json=teams_message) - response.raise_for_status() - logging.info(f"Teams notification sent for {app['displayName']}") - except requests.exceptions.RequestException as e: - logging.error(f"Failed to send Teams notification for {app['displayName']}: {e}") - -if __name__ == "__main__": - # Sample app registration data for testing - app_registrations = [ - { - "displayName": "App1", - "passwordCredentials": [{"endDateTime": "2024-12-31T23:59:59.9999999Z"}], - "owners": [{"userPrincipalName": "owner1@example.com"}] - }, - { - "displayName": "App2", - "passwordCredentials": [{"endDateTime": "2025-01-15T23:59:59.9999999Z"}], - "owners": [{"userPrincipalName": "owner2@example.com"}] - } - ] - - # Send notifications - send_notifications(app_registrations) \ No newline at end of file diff --git a/sharepoint_client.py b/sharepoint_client.py deleted file mode 100644 index 0c4bab8..0000000 --- a/sharepoint_client.py +++ /dev/null @@ -1,180 +0,0 @@ -import os -from dotenv import load_dotenv -import requests -from datetime import datetime -import logging -import json - -# Load environment variables -load_dotenv() - -# Configure logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - -def get_access_token(): - client_id = os.getenv('AZURE_CLIENT_ID') - client_secret = os.getenv('AZURE_CLIENT_SECRET') - tenant_id = os.getenv('AZURE_TENANT_ID') - - token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token" - token_data = { - 'grant_type': 'client_credentials', - 'client_id': client_id, - 'client_secret': client_secret, - 'scope': 'https://graph.microsoft.com/.default' - } - - token_response = requests.post(token_url, data=token_data) - access_token = token_response.json().get('access_token') - if not access_token: - logging.error("Failed to obtain access token") - logging.error(token_response.json()) - return None - - return access_token - -def get_drive_id(site_id): - access_token = get_access_token() - if not access_token: - return None - - headers = { - 'Authorization': f'Bearer {access_token}', - 'Content-Type': 'application/json' - } - - drives_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives" - response = requests.get(drives_url, headers=headers) - if response.status_code != 200: - logging.error(f"Failed to get drives: {response.status_code}") - logging.error(response.json()) - return None - - drives = response.json().get('value', []) - if not drives: - logging.error("No drives found") - return None - - # Assuming you want the first drive in the list - drive_id = drives[0].get('id') - logging.info(f"Found drive ID: {drive_id}") - return drive_id - -def update_excel_file(site_id, drive_id, file_id, data): - """ - Update Excel Online file with new data using OneDrive for Business API - """ - client_id = os.getenv('AZURE_CLIENT_ID') - client_secret = os.getenv('AZURE_CLIENT_SECRET') - tenant_id = os.getenv('AZURE_TENANT_ID') - - # Get access token - token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token" - token_data = { - 'grant_type': 'client_credentials', - 'client_id': client_id, - 'client_secret': client_secret, - 'scope': 'https://graph.microsoft.com/.default' - } - - token_response = requests.post(token_url, data=token_data) - access_token = token_response.json().get('access_token') - if not access_token: - logging.error("Failed to obtain access token") - logging.error(token_response.json()) - return - - headers = { - 'Authorization': f'Bearer {access_token}', - 'Content-Type': 'application/json' - } - - try: - # Get the file details to verify it exists - file_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/items/{file_id}" - file_response = requests.get(file_url, headers=headers) - file_response.raise_for_status() - logging.info(f"Found file: {file_response.json().get('name')}") - - # Clear existing data (except header) - clear_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/items/{file_id}/workbook/worksheets/Sheet1/range(address='A2:D1000')" - clear_data = { - "values": [[""]*4]*999 - } - response = requests.patch(clear_url, headers=headers, json=clear_data) - response.raise_for_status() - logging.info("Cleared existing data from Excel file") - - # Write new data - update_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/items/{file_id}/workbook/worksheets/Sheet1/range(address='A2:D{len(data)+1}')" - update_data = { - "values": data - } - response = requests.patch(update_url, headers=headers, json=update_data) - response.raise_for_status() - logging.info(f"Successfully wrote {len(data)} rows to Excel file") - - except requests.exceptions.RequestException as e: - logging.error(f"Error updating Excel file: {e}") - if hasattr(e.response, 'text'): - logging.error(f"Response content: {e.response.text}") - raise - -def store_app_registrations(app_registrations): - # Get current date - current_date = datetime.utcnow() - - # Prepare data for Excel - excel_data = [] - for app in app_registrations: - password_credentials = app.get('passwordCredentials', []) - if not password_credentials: - logging.warning(f"No password credentials found for {app['displayName']}") - continue - - expiry_date = password_credentials[0].get('endDateTime') - if expiry_date: - if expiry_date.endswith('ZZ'): - expiry_date = expiry_date[:-1] - elif expiry_date.endswith('Z'): - expiry_date = expiry_date[:-1] - - try: - expiry_date_obj = datetime.strptime(expiry_date.split('.')[0], '%Y-%m-%dT%H:%M:%S') - days_to_expiry = (expiry_date_obj - current_date).days - - # Get owner information - owners = app.get('owners', []) - owner_upns = [owner.get('userPrincipalName') for owner in owners if owner.get('userPrincipalName')] - owner_list = ', '.join(owner_upns) if owner_upns else 'No owners' - - excel_data.append([ - app['displayName'], - expiry_date_obj.strftime('%Y-%m-%d'), - str(days_to_expiry), - owner_list - ]) - - except ValueError as e: - logging.error(f"Error parsing expiry date for {app['displayName']}: {e}") - continue - - # Update Excel file - try: - site_id = os.getenv('SHAREPOINT_SITE_ID') - drive_id = os.getenv('SHAREPOINT_DRIVE_ID') - file_id = os.getenv('EXCEL_FILE_ID') - update_excel_file(site_id, drive_id, file_id, excel_data) - logging.info("Successfully updated Excel file") - except Exception as e: - logging.error(f"Failed to update Excel file: {e}") - - logging.info("Finished processing app registrations") - -if __name__ == "__main__": - site_id = os.getenv('SHAREPOINT_SITE_ID') - drive_id = get_drive_id(site_id) - if drive_id: - logging.info(f"Drive ID: {drive_id}") - else: - logging.error("Failed to obtain drive ID") \ No newline at end of file diff --git a/sys_status.py b/sys_status.py deleted file mode 100644 index 575a7c8..0000000 --- a/sys_status.py +++ /dev/null @@ -1,64 +0,0 @@ -import os -from datetime import datetime, timedelta -from email.mime.text import MIMEText -from email.utils import formataddr -import smtplib -import logging - -def check_client_id_expiry(): - # Load environment variables - client_id = os.getenv('AZURE_CLIENT_ID') - client_secret = os.getenv('AZURE_CLIENT_SECRET') - tenant_id = os.getenv('AZURE_TENANT_ID') - smtp_server = os.getenv('SMTP_SERVER') - smtp_port = int(os.getenv('SMTP_PORT')) - smtp_username = os.getenv('SMTP_USERNAME') - smtp_password = os.getenv('SMTP_PASSWORD') - from_email = os.getenv('FROM_EMAIL') - from_name = os.getenv('FROM_NAME') - admin_email = os.getenv('ADMIN_EMAIL') - - # Check if the client ID is expiring soon - expiry_date_str = os.getenv('CLIENT_ID_EXPIRY_DATE') - if not expiry_date_str: - logging.error("CLIENT_ID_EXPIRY_DATE not set in environment variables") - return - - try: - expiry_date = datetime.strptime(expiry_date_str, '%Y-%m-%d') - except ValueError as e: - logging.error(f"Error parsing CLIENT_ID_EXPIRY_DATE: {e}") - return - - days_to_expiry = (expiry_date - datetime.utcnow()).days - - if days_to_expiry <= 30: - subject = "Warning: Azure Client ID Expiry Notification" - body = f""" - - -

The Azure Client ID {client_id} is set to expire in - {days_to_expiry} days - on {expiry_date.strftime('%Y-%m-%d')}.

-

Please take the necessary actions to renew the client ID before it expires.

- - - """ - - # Create email message - msg = MIMEText(body, 'html') - msg['Subject'] = subject - msg['From'] = formataddr((from_name, from_email)) - msg['To'] = admin_email - - try: - with smtplib.SMTP(smtp_server, smtp_port) as server: - server.starttls() - server.login(smtp_username, smtp_password) - server.sendmail(from_email, admin_email, msg.as_string()) - logging.info(f"Successfully sent client ID expiry warning to {admin_email}") - except Exception as e: - logging.error(f"Failed to send client ID expiry warning: {e}") - -if __name__ == "__main__": - check_client_id_expiry() \ No newline at end of file