diff --git a/azure_client.py b/azure_client.py new file mode 100644 index 0000000..bd67483 --- /dev/null +++ b/azure_client.py @@ -0,0 +1,83 @@ +import os +from dotenv import load_dotenv +import logging +import json +import msal +import requests + +# Load environment variables +load_dotenv() + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +def get_app_registrations(): + logging.info("Authenticating to Microsoft Graph API") + + # Azure AD app credentials from environment variables + client_id = os.getenv('AZURE_CLIENT_ID') + client_secret = os.getenv('AZURE_CLIENT_SECRET') + tenant_id = os.getenv('AZURE_TENANT_ID') + + authority = f"https://login.microsoftonline.com/{tenant_id}" + scope = ["https://graph.microsoft.com/.default"] + + # Create a confidential client application + app = msal.ConfidentialClientApplication( + client_id, + authority=authority, + client_credential=client_secret + ) + + # Acquire a token + result = app.acquire_token_for_client(scopes=scope) + + if "access_token" in result: + logging.info("Successfully authenticated to Microsoft Graph API") + access_token = result["access_token"] + else: + logging.error("Failed to authenticate to Microsoft Graph API") + logging.error(result.get("error")) + logging.error(result.get("error_description")) + logging.error(result.get("correlation_id")) + return [] + + # Fetch app registrations with owners + # Updated URL to include both app registration and owner data + graph_url = ( + "https://graph.microsoft.com/v1.0/applications" + "?$select=id,appId,displayName,passwordCredentials" + "&$expand=owners($select=userPrincipalName)" + ) + + headers = { + "Authorization": f"Bearer {access_token}", + "ConsistencyLevel": "eventual" + } + + try: + response = requests.get(graph_url, headers=headers) + response.raise_for_status() + + # Debug log the raw response + #logging.info(f"API Response Status: {response.status_code}") + #logging.debug(f"API Response: {response.text[:1000]}...") # First 1000 chars + + app_registrations = response.json().get('value', []) + logging.info(f"Fetched {len(app_registrations)} app registrations") + + # Debug log the first app registration + if app_registrations: + logging.info(f"Sample app data: {json.dumps(app_registrations[0], indent=2)}") + + return app_registrations + + except requests.exceptions.RequestException as e: + logging.error(f"Error fetching app registrations: {e}") + return [] + +if __name__ == "__main__": + app_registrations = get_app_registrations() + # Write to JSON file for inspection + with open('debug_app_registrations.json', 'w') as f: + json.dump(app_registrations, f, indent=2) \ No newline at end of file diff --git a/data_export.py b/data_export.py new file mode 100644 index 0000000..0eed4f4 --- /dev/null +++ b/data_export.py @@ -0,0 +1,151 @@ +import json +import logging +from datetime import datetime + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +def write_to_json(app_registrations, filename='app_registrations.json'): + """ + Write app registration data to a JSON file. + + :param app_registrations: List of app registration data + :param filename: Name of the JSON file to write to + """ + try: + with open(filename, 'w') as f: + json.dump(app_registrations, f, indent=4) + logging.info(f"App registration data successfully written to {filename}") + except Exception as e: + logging.error(f"Failed to write app registration data to {filename}: {e}") + +def generate_html(app_registrations): + """ + Generate an HTML representation of the app registration data. + + :param app_registrations: List of app registration data + :return: HTML string + """ + current_time = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') + html = f""" + + + App Registrations + + + +

App Registrations

+

Exported on: {current_time}

+ + + + + + + + """ + + for app in app_registrations: + password_credentials = app.get('passwordCredentials', []) + if not password_credentials: + continue + + expiry_date = password_credentials[0].get('endDateTime') + if expiry_date: + # Clean up the date string + if expiry_date.endswith('ZZ'): + expiry_date = expiry_date[:-1] + elif expiry_date.endswith('Z'): + expiry_date = expiry_date[:-1] + # Truncate the fractional seconds part to 6 digits if present + if '.' in expiry_date: + expiry_date = expiry_date.split('.')[0] + '.' + expiry_date.split('.')[1][:6] + 'Z' + else: + expiry_date += '.000000Z' + try: + expiry_date = datetime.strptime(expiry_date, '%Y-%m-%dT%H:%M:%S.%fZ') + except ValueError as e: + logging.error(f"Error parsing expiry date for {app['displayName']}: {e}") + continue + days_to_expiry = (expiry_date - datetime.utcnow()).days + + # Determine row color class + if days_to_expiry > 30: + color_class = "green" + elif 7 < days_to_expiry <= 30: + color_class = "yellow" + elif 1 <= days_to_expiry <= 7: + color_class = "orange" + else: + color_class = "red" + days_to_expiry = "EXPIRED" + + # Get owner information + owners = app.get('owners', []) + owner_upns = [owner.get('userPrincipalName') for owner in owners if owner.get('userPrincipalName')] + owner_list = ', '.join(owner_upns) if owner_upns else 'No owners' + + html += f""" + + + + + + + """ + + html += """ +
Display NameExpiry DateDays to ExpiryOwners
{app['displayName']}{expiry_date.strftime('%Y-%m-%d')}{days_to_expiry}{owner_list}
+ + + """ + + return html + +# Example usage +if __name__ == "__main__": + # Sample app registration data + app_registrations = [ + { + "displayName": "App1", + "passwordCredentials": [{"endDateTime": "2024-12-31T23:59:59.9999999Z"}] + }, + { + "displayName": "App2", + "passwordCredentials": [{"endDateTime": "2025-01-15T23:59:59.9999999Z"}] + } + ] + + # Write to JSON + write_to_json(app_registrations) + + # Generate HTML + html_content = generate_html(app_registrations) + with open('app_registrations.html', 'w') as f: + f.write(html_content) + logging.info("HTML content successfully written to app_registrations.html") \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..f958748 --- /dev/null +++ b/main.py @@ -0,0 +1,16 @@ +from azure_client import get_app_registrations +from sharepoint_client import store_app_registrations # Uncomment this line +from notification import send_notifications +from data_export import write_to_json, generate_html + +def main(): + app_registrations = get_app_registrations() + store_app_registrations(app_registrations) + write_to_json(app_registrations) + html_content = generate_html(app_registrations) + with open('app_registrations.html', 'w') as f: + f.write(html_content) + send_notifications(app_registrations) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/notification.py b/notification.py new file mode 100644 index 0000000..4ec60be --- /dev/null +++ b/notification.py @@ -0,0 +1,142 @@ +import os +from dotenv import load_dotenv +import smtplib +from email.mime.text import MIMEText +from email.utils import formataddr +from datetime import datetime +import requests +import json +import logging +from data_export import generate_html + +# Load environment variables +load_dotenv() + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +def send_notifications(app_registrations): + # Email credentials from environment variables + smtp_server = os.getenv('SMTP_SERVER') + smtp_port = int(os.getenv('SMTP_PORT')) + smtp_username = os.getenv('SMTP_USERNAME') + smtp_password = os.getenv('SMTP_PASSWORD') + from_email = os.getenv('FROM_EMAIL') + from_name = os.getenv('FROM_NAME') + to_email = os.getenv('TO_EMAIL') + + # Teams webhook URL from environment variables + teams_webhook_url = os.getenv('TEAMS_WEBHOOK_URL') + + # Get the current date + current_date = datetime.utcnow() + notification_periods = [60, 30, 7, 1] + + # Generate HTML content + html_content = generate_html(app_registrations) + + # Send notification email and Teams message + for app in app_registrations: + # Debug log the entire app object + #logging.info(f"Processing app: {json.dumps(app, indent=2)}") + + password_credentials = app.get('passwordCredentials', []) + if not password_credentials: + logging.warning(f"No password credentials found for {app['displayName']}") + continue + + expiry_date = password_credentials[0].get('endDateTime') + if expiry_date: + # Clean up the date string + if expiry_date.endswith('ZZ'): + expiry_date = expiry_date[:-1] + elif expiry_date.endswith('Z'): + expiry_date = expiry_date[:-1] + # Truncate the fractional seconds part to 6 digits if present + if '.' in expiry_date: + expiry_date = expiry_date.split('.')[0] + '.' + expiry_date.split('.')[1][:6] + 'Z' + else: + expiry_date += '.000000Z' + try: + expiry_date = datetime.strptime(expiry_date, '%Y-%m-%dT%H:%M:%S.%fZ') + except ValueError as e: + logging.error(f"Error parsing expiry date for {app['displayName']}: {e}") + continue + days_to_expiry = (expiry_date - current_date).days + if days_to_expiry in notification_periods or days_to_expiry < 0: + subject = f"App Registration Expiry Notification: {app['displayName']}" + body = f"The app registration {app['displayName']} is set to expire in {days_to_expiry} days on {expiry_date.strftime('%Y-%m-%d')}.

{html_content}" + + # Fetch and debug log owner information + owners = app.get('owners', []) + logging.info(f"Found owners for {app['displayName']}: {json.dumps(owners, indent=2)}") + + # Get CC emails from owners + cc_emails = [] + for owner in owners: + email = owner.get('userPrincipalName') or owner.get('mail') + if email: + cc_emails.append(email) + logging.info(f"Added owner email for {app['displayName']}: {email}") + + # Create email message + msg = MIMEText(body, 'html') + msg['Subject'] = subject + msg['From'] = formataddr((from_name, from_email)) + msg['To'] = to_email + + # Add CC recipients if any found + if cc_emails: + msg['Cc'] = ', '.join(cc_emails) + logging.info(f"Added CC recipients for {app['displayName']}: {cc_emails}") + + try: + # Include CC recipients in sendmail + all_recipients = [to_email] + cc_emails + logging.info(f"Sending email for {app['displayName']} to all recipients: {all_recipients}") + + with smtplib.SMTP(smtp_server, smtp_port) as server: + server.starttls() + server.login(smtp_username, smtp_password) + server.sendmail(from_email, all_recipients, msg.as_string()) + logging.info(f"Successfully sent email for {app['displayName']}") + except Exception as e: + logging.error(f"Failed to send email for {app['displayName']}: {e}") + + # Send Teams notification + teams_message = { + "@type": "MessageCard", + "@context": "http://schema.org/extensions", + "summary": subject, + "themeColor": "0076D7", + "title": subject, + "sections": [{ + "activityTitle": f"App Registration Expiry Notification", + "text": body + }] + } + + try: + response = requests.post(teams_webhook_url, headers={"Content-Type": "application/json"}, json=teams_message) + response.raise_for_status() + logging.info(f"Teams notification sent for {app['displayName']}") + except requests.exceptions.RequestException as e: + logging.error(f"Failed to send Teams notification for {app['displayName']}: {e}") + +if __name__ == "__main__": + # Sample app registration data for testing + app_registrations = [ + { + "displayName": "App1", + "passwordCredentials": [{"endDateTime": "2024-12-31T23:59:59.9999999Z"}], + "owners": [{"userPrincipalName": "owner1@example.com"}] + }, + { + "displayName": "App2", + "passwordCredentials": [{"endDateTime": "2025-01-15T23:59:59.9999999Z"}], + "owners": [{"userPrincipalName": "owner2@example.com"}] + } + ] + + # Send notifications + send_notifications(app_registrations) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..08651da --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +azure-identity==1.19.0 +azure-mgmt-resource==23.2.0 +msal==1.24.0 +python-dotenv==1.0.0 +requests==2.31.0 \ No newline at end of file diff --git a/sharepoint_client.py b/sharepoint_client.py new file mode 100644 index 0000000..0139f55 --- /dev/null +++ b/sharepoint_client.py @@ -0,0 +1,125 @@ +import os +from dotenv import load_dotenv +import requests +from datetime import datetime +import logging +import json + +# Load environment variables +load_dotenv() + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +def update_excel_file(file_id, data): + """ + Update Excel Online file with new data using OneDrive for Business API + """ + client_id = os.getenv('AZURE_CLIENT_ID') + client_secret = os.getenv('AZURE_CLIENT_SECRET') + tenant_id = os.getenv('AZURE_TENANT_ID') + user_email = os.getenv('USER_EMAIL') + + # Get access token + token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token" + token_data = { + 'grant_type': 'client_credentials', + 'client_id': client_id, + 'client_secret': client_secret, + 'scope': 'https://graph.microsoft.com/.default' + } + + token_response = requests.post(token_url, data=token_data) + access_token = token_response.json()['access_token'] + + headers = { + 'Authorization': f'Bearer {access_token}', + 'Content-Type': 'application/json' + } + + try: + # Get the drive ID first + drives_url = f"https://graph.microsoft.com/v1.0/users/{user_email}/drive" + drive_response = requests.get(drives_url, headers=headers) + drive_response.raise_for_status() + drive_id = drive_response.json().get('id') + logging.info(f"Found drive ID: {drive_id}") + + # Get the file details to verify it exists + file_url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{file_id}" + file_response = requests.get(file_url, headers=headers) + file_response.raise_for_status() + logging.info(f"Found file: {file_response.json().get('name')}") + + # Clear existing data (except header) + clear_url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{file_id}/workbook/worksheets/Sheet1/range(address='A2:D1000')" + clear_data = { + "values": [[""]*4]*999 + } + response = requests.patch(clear_url, headers=headers, json=clear_data) + response.raise_for_status() + logging.info("Cleared existing data from Excel file") + + # Write new data + update_url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{file_id}/workbook/worksheets/Sheet1/range(address='A2:D{len(data)+1}')" + update_data = { + "values": data + } + response = requests.patch(update_url, headers=headers, json=update_data) + response.raise_for_status() + logging.info(f"Successfully wrote {len(data)} rows to Excel file") + + except requests.exceptions.RequestException as e: + logging.error(f"Error updating Excel file: {e}") + if hasattr(e.response, 'text'): + logging.error(f"Response content: {e.response.text}") + raise + +def store_app_registrations(app_registrations): + # Get current date + current_date = datetime.utcnow() + + # Prepare data for Excel + excel_data = [] + for app in app_registrations: + password_credentials = app.get('passwordCredentials', []) + if not password_credentials: + logging.warning(f"No password credentials found for {app['displayName']}") + continue + + expiry_date = password_credentials[0].get('endDateTime') + if expiry_date: + if (expiry_date.endswith('ZZ')): + expiry_date = expiry_date[:-1] + elif expiry_date.endswith('Z'): + expiry_date = expiry_date[:-1] + + try: + expiry_date_obj = datetime.strptime(expiry_date.split('.')[0], '%Y-%m-%dT%H:%M:%S') + days_to_expiry = (expiry_date_obj - current_date).days + + # Get owner information + owners = app.get('owners', []) + owner_upns = [owner.get('userPrincipalName') for owner in owners if owner.get('userPrincipalName')] + owner_list = ', '.join(owner_upns) if owner_upns else 'No owners' + + excel_data.append([ + app['displayName'], + expiry_date_obj.strftime('%Y-%m-%d'), + str(days_to_expiry), + owner_list + ]) + + except ValueError as e: + logging.error(f"Error parsing expiry date for {app['displayName']}: {e}") + continue + + # Update Excel file + try: + excel_file_id = os.getenv('EXCEL_FILE_ID') + update_excel_file(excel_file_id, excel_data) + logging.info("Successfully updated Excel file") + except Exception as e: + logging.error(f"Failed to update Excel file: {e}") + + logging.info("Finished processing app registrations") \ No newline at end of file