This commit is contained in:
Oli Passey
2024-12-19 16:46:59 +00:00
parent 361bf80a18
commit 868cf236ab
6 changed files with 522 additions and 0 deletions

83
azure_client.py Normal file
View File

@@ -0,0 +1,83 @@
import os
from dotenv import load_dotenv
import logging
import json
import msal
import requests
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def get_app_registrations():
logging.info("Authenticating to Microsoft Graph API")
# Azure AD app credentials from environment variables
client_id = os.getenv('AZURE_CLIENT_ID')
client_secret = os.getenv('AZURE_CLIENT_SECRET')
tenant_id = os.getenv('AZURE_TENANT_ID')
authority = f"https://login.microsoftonline.com/{tenant_id}"
scope = ["https://graph.microsoft.com/.default"]
# Create a confidential client application
app = msal.ConfidentialClientApplication(
client_id,
authority=authority,
client_credential=client_secret
)
# Acquire a token
result = app.acquire_token_for_client(scopes=scope)
if "access_token" in result:
logging.info("Successfully authenticated to Microsoft Graph API")
access_token = result["access_token"]
else:
logging.error("Failed to authenticate to Microsoft Graph API")
logging.error(result.get("error"))
logging.error(result.get("error_description"))
logging.error(result.get("correlation_id"))
return []
# Fetch app registrations with owners
# Updated URL to include both app registration and owner data
graph_url = (
"https://graph.microsoft.com/v1.0/applications"
"?$select=id,appId,displayName,passwordCredentials"
"&$expand=owners($select=userPrincipalName)"
)
headers = {
"Authorization": f"Bearer {access_token}",
"ConsistencyLevel": "eventual"
}
try:
response = requests.get(graph_url, headers=headers)
response.raise_for_status()
# Debug log the raw response
#logging.info(f"API Response Status: {response.status_code}")
#logging.debug(f"API Response: {response.text[:1000]}...") # First 1000 chars
app_registrations = response.json().get('value', [])
logging.info(f"Fetched {len(app_registrations)} app registrations")
# Debug log the first app registration
if app_registrations:
logging.info(f"Sample app data: {json.dumps(app_registrations[0], indent=2)}")
return app_registrations
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching app registrations: {e}")
return []
if __name__ == "__main__":
app_registrations = get_app_registrations()
# Write to JSON file for inspection
with open('debug_app_registrations.json', 'w') as f:
json.dump(app_registrations, f, indent=2)

151
data_export.py Normal file
View File

@@ -0,0 +1,151 @@
import json
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def write_to_json(app_registrations, filename='app_registrations.json'):
"""
Write app registration data to a JSON file.
:param app_registrations: List of app registration data
:param filename: Name of the JSON file to write to
"""
try:
with open(filename, 'w') as f:
json.dump(app_registrations, f, indent=4)
logging.info(f"App registration data successfully written to {filename}")
except Exception as e:
logging.error(f"Failed to write app registration data to {filename}: {e}")
def generate_html(app_registrations):
"""
Generate an HTML representation of the app registration data.
:param app_registrations: List of app registration data
:return: HTML string
"""
current_time = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
html = f"""
<html>
<head>
<title>App Registrations</title>
<style>
table {{
width: 100%;
border-collapse: collapse;
}}
th, td {{
border: 1px solid black;
padding: 8px;
text-align: left;
}}
th {{
background-color: #f2f2f2;
}}
.green {{
background-color: #d4edda;
}}
.yellow {{
background-color: #fff3cd;
}}
.orange {{
background-color: #ffeeba;
}}
.red {{
background-color: #f8d7da;
}}
</style>
</head>
<body>
<h1>App Registrations</h1>
<p>Exported on: {current_time}</p>
<table>
<tr>
<th>Display Name</th>
<th>Expiry Date</th>
<th>Days to Expiry</th>
<th>Owners</th>
</tr>
"""
for app in app_registrations:
password_credentials = app.get('passwordCredentials', [])
if not password_credentials:
continue
expiry_date = password_credentials[0].get('endDateTime')
if expiry_date:
# Clean up the date string
if expiry_date.endswith('ZZ'):
expiry_date = expiry_date[:-1]
elif expiry_date.endswith('Z'):
expiry_date = expiry_date[:-1]
# Truncate the fractional seconds part to 6 digits if present
if '.' in expiry_date:
expiry_date = expiry_date.split('.')[0] + '.' + expiry_date.split('.')[1][:6] + 'Z'
else:
expiry_date += '.000000Z'
try:
expiry_date = datetime.strptime(expiry_date, '%Y-%m-%dT%H:%M:%S.%fZ')
except ValueError as e:
logging.error(f"Error parsing expiry date for {app['displayName']}: {e}")
continue
days_to_expiry = (expiry_date - datetime.utcnow()).days
# Determine row color class
if days_to_expiry > 30:
color_class = "green"
elif 7 < days_to_expiry <= 30:
color_class = "yellow"
elif 1 <= days_to_expiry <= 7:
color_class = "orange"
else:
color_class = "red"
days_to_expiry = "EXPIRED"
# Get owner information
owners = app.get('owners', [])
owner_upns = [owner.get('userPrincipalName') for owner in owners if owner.get('userPrincipalName')]
owner_list = ', '.join(owner_upns) if owner_upns else 'No owners'
html += f"""
<tr class="{color_class}">
<td>{app['displayName']}</td>
<td>{expiry_date.strftime('%Y-%m-%d')}</td>
<td>{days_to_expiry}</td>
<td>{owner_list}</td>
</tr>
"""
html += """
</table>
</body>
</html>
"""
return html
# Example usage
if __name__ == "__main__":
# Sample app registration data
app_registrations = [
{
"displayName": "App1",
"passwordCredentials": [{"endDateTime": "2024-12-31T23:59:59.9999999Z"}]
},
{
"displayName": "App2",
"passwordCredentials": [{"endDateTime": "2025-01-15T23:59:59.9999999Z"}]
}
]
# Write to JSON
write_to_json(app_registrations)
# Generate HTML
html_content = generate_html(app_registrations)
with open('app_registrations.html', 'w') as f:
f.write(html_content)
logging.info("HTML content successfully written to app_registrations.html")

16
main.py Normal file
View File

@@ -0,0 +1,16 @@
from azure_client import get_app_registrations
from sharepoint_client import store_app_registrations # Uncomment this line
from notification import send_notifications
from data_export import write_to_json, generate_html
def main():
app_registrations = get_app_registrations()
store_app_registrations(app_registrations)
write_to_json(app_registrations)
html_content = generate_html(app_registrations)
with open('app_registrations.html', 'w') as f:
f.write(html_content)
send_notifications(app_registrations)
if __name__ == "__main__":
main()

142
notification.py Normal file
View File

@@ -0,0 +1,142 @@
import os
from dotenv import load_dotenv
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
from datetime import datetime
import requests
import json
import logging
from data_export import generate_html
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def send_notifications(app_registrations):
# Email credentials from environment variables
smtp_server = os.getenv('SMTP_SERVER')
smtp_port = int(os.getenv('SMTP_PORT'))
smtp_username = os.getenv('SMTP_USERNAME')
smtp_password = os.getenv('SMTP_PASSWORD')
from_email = os.getenv('FROM_EMAIL')
from_name = os.getenv('FROM_NAME')
to_email = os.getenv('TO_EMAIL')
# Teams webhook URL from environment variables
teams_webhook_url = os.getenv('TEAMS_WEBHOOK_URL')
# Get the current date
current_date = datetime.utcnow()
notification_periods = [60, 30, 7, 1]
# Generate HTML content
html_content = generate_html(app_registrations)
# Send notification email and Teams message
for app in app_registrations:
# Debug log the entire app object
#logging.info(f"Processing app: {json.dumps(app, indent=2)}")
password_credentials = app.get('passwordCredentials', [])
if not password_credentials:
logging.warning(f"No password credentials found for {app['displayName']}")
continue
expiry_date = password_credentials[0].get('endDateTime')
if expiry_date:
# Clean up the date string
if expiry_date.endswith('ZZ'):
expiry_date = expiry_date[:-1]
elif expiry_date.endswith('Z'):
expiry_date = expiry_date[:-1]
# Truncate the fractional seconds part to 6 digits if present
if '.' in expiry_date:
expiry_date = expiry_date.split('.')[0] + '.' + expiry_date.split('.')[1][:6] + 'Z'
else:
expiry_date += '.000000Z'
try:
expiry_date = datetime.strptime(expiry_date, '%Y-%m-%dT%H:%M:%S.%fZ')
except ValueError as e:
logging.error(f"Error parsing expiry date for {app['displayName']}: {e}")
continue
days_to_expiry = (expiry_date - current_date).days
if days_to_expiry in notification_periods or days_to_expiry < 0:
subject = f"App Registration Expiry Notification: {app['displayName']}"
body = f"The app registration {app['displayName']} is set to expire in {days_to_expiry} days on {expiry_date.strftime('%Y-%m-%d')}.<br><br>{html_content}"
# Fetch and debug log owner information
owners = app.get('owners', [])
logging.info(f"Found owners for {app['displayName']}: {json.dumps(owners, indent=2)}")
# Get CC emails from owners
cc_emails = []
for owner in owners:
email = owner.get('userPrincipalName') or owner.get('mail')
if email:
cc_emails.append(email)
logging.info(f"Added owner email for {app['displayName']}: {email}")
# Create email message
msg = MIMEText(body, 'html')
msg['Subject'] = subject
msg['From'] = formataddr((from_name, from_email))
msg['To'] = to_email
# Add CC recipients if any found
if cc_emails:
msg['Cc'] = ', '.join(cc_emails)
logging.info(f"Added CC recipients for {app['displayName']}: {cc_emails}")
try:
# Include CC recipients in sendmail
all_recipients = [to_email] + cc_emails
logging.info(f"Sending email for {app['displayName']} to all recipients: {all_recipients}")
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.starttls()
server.login(smtp_username, smtp_password)
server.sendmail(from_email, all_recipients, msg.as_string())
logging.info(f"Successfully sent email for {app['displayName']}")
except Exception as e:
logging.error(f"Failed to send email for {app['displayName']}: {e}")
# Send Teams notification
teams_message = {
"@type": "MessageCard",
"@context": "http://schema.org/extensions",
"summary": subject,
"themeColor": "0076D7",
"title": subject,
"sections": [{
"activityTitle": f"App Registration Expiry Notification",
"text": body
}]
}
try:
response = requests.post(teams_webhook_url, headers={"Content-Type": "application/json"}, json=teams_message)
response.raise_for_status()
logging.info(f"Teams notification sent for {app['displayName']}")
except requests.exceptions.RequestException as e:
logging.error(f"Failed to send Teams notification for {app['displayName']}: {e}")
if __name__ == "__main__":
# Sample app registration data for testing
app_registrations = [
{
"displayName": "App1",
"passwordCredentials": [{"endDateTime": "2024-12-31T23:59:59.9999999Z"}],
"owners": [{"userPrincipalName": "owner1@example.com"}]
},
{
"displayName": "App2",
"passwordCredentials": [{"endDateTime": "2025-01-15T23:59:59.9999999Z"}],
"owners": [{"userPrincipalName": "owner2@example.com"}]
}
]
# Send notifications
send_notifications(app_registrations)

5
requirements.txt Normal file
View File

@@ -0,0 +1,5 @@
azure-identity==1.19.0
azure-mgmt-resource==23.2.0
msal==1.24.0
python-dotenv==1.0.0
requests==2.31.0

125
sharepoint_client.py Normal file
View File

@@ -0,0 +1,125 @@
import os
from dotenv import load_dotenv
import requests
from datetime import datetime
import logging
import json
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def update_excel_file(file_id, data):
"""
Update Excel Online file with new data using OneDrive for Business API
"""
client_id = os.getenv('AZURE_CLIENT_ID')
client_secret = os.getenv('AZURE_CLIENT_SECRET')
tenant_id = os.getenv('AZURE_TENANT_ID')
user_email = os.getenv('USER_EMAIL')
# Get access token
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
token_data = {
'grant_type': 'client_credentials',
'client_id': client_id,
'client_secret': client_secret,
'scope': 'https://graph.microsoft.com/.default'
}
token_response = requests.post(token_url, data=token_data)
access_token = token_response.json()['access_token']
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json'
}
try:
# Get the drive ID first
drives_url = f"https://graph.microsoft.com/v1.0/users/{user_email}/drive"
drive_response = requests.get(drives_url, headers=headers)
drive_response.raise_for_status()
drive_id = drive_response.json().get('id')
logging.info(f"Found drive ID: {drive_id}")
# Get the file details to verify it exists
file_url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{file_id}"
file_response = requests.get(file_url, headers=headers)
file_response.raise_for_status()
logging.info(f"Found file: {file_response.json().get('name')}")
# Clear existing data (except header)
clear_url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{file_id}/workbook/worksheets/Sheet1/range(address='A2:D1000')"
clear_data = {
"values": [[""]*4]*999
}
response = requests.patch(clear_url, headers=headers, json=clear_data)
response.raise_for_status()
logging.info("Cleared existing data from Excel file")
# Write new data
update_url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{file_id}/workbook/worksheets/Sheet1/range(address='A2:D{len(data)+1}')"
update_data = {
"values": data
}
response = requests.patch(update_url, headers=headers, json=update_data)
response.raise_for_status()
logging.info(f"Successfully wrote {len(data)} rows to Excel file")
except requests.exceptions.RequestException as e:
logging.error(f"Error updating Excel file: {e}")
if hasattr(e.response, 'text'):
logging.error(f"Response content: {e.response.text}")
raise
def store_app_registrations(app_registrations):
# Get current date
current_date = datetime.utcnow()
# Prepare data for Excel
excel_data = []
for app in app_registrations:
password_credentials = app.get('passwordCredentials', [])
if not password_credentials:
logging.warning(f"No password credentials found for {app['displayName']}")
continue
expiry_date = password_credentials[0].get('endDateTime')
if expiry_date:
if (expiry_date.endswith('ZZ')):
expiry_date = expiry_date[:-1]
elif expiry_date.endswith('Z'):
expiry_date = expiry_date[:-1]
try:
expiry_date_obj = datetime.strptime(expiry_date.split('.')[0], '%Y-%m-%dT%H:%M:%S')
days_to_expiry = (expiry_date_obj - current_date).days
# Get owner information
owners = app.get('owners', [])
owner_upns = [owner.get('userPrincipalName') for owner in owners if owner.get('userPrincipalName')]
owner_list = ', '.join(owner_upns) if owner_upns else 'No owners'
excel_data.append([
app['displayName'],
expiry_date_obj.strftime('%Y-%m-%d'),
str(days_to_expiry),
owner_list
])
except ValueError as e:
logging.error(f"Error parsing expiry date for {app['displayName']}: {e}")
continue
# Update Excel file
try:
excel_file_id = os.getenv('EXCEL_FILE_ID')
update_excel_file(excel_file_id, excel_data)
logging.info("Successfully updated Excel file")
except Exception as e:
logging.error(f"Failed to update Excel file: {e}")
logging.info("Finished processing app registrations")