simplify
This commit is contained in:
239
azure_client.py
239
azure_client.py
@@ -1,239 +0,0 @@
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
import logging
|
||||
import json
|
||||
import msal
|
||||
import requests
|
||||
from datetime import datetime, timezone
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
def get_app_registrations():
|
||||
logging.info("Authenticating to Microsoft Graph API")
|
||||
|
||||
# Azure AD app credentials from environment variables
|
||||
client_id = os.getenv('AZURE_CLIENT_ID')
|
||||
client_secret = os.getenv('AZURE_CLIENT_SECRET')
|
||||
tenant_id = os.getenv('AZURE_TENANT_ID')
|
||||
|
||||
authority = f"https://login.microsoftonline.com/{tenant_id}"
|
||||
scope = ["https://graph.microsoft.com/.default"]
|
||||
|
||||
# Create a confidential client application
|
||||
app = msal.ConfidentialClientApplication(
|
||||
client_id,
|
||||
authority=authority,
|
||||
client_credential=client_secret
|
||||
)
|
||||
|
||||
# Acquire a token
|
||||
result = app.acquire_token_for_client(scopes=scope)
|
||||
|
||||
if "access_token" in result:
|
||||
logging.info("Successfully authenticated to Microsoft Graph API")
|
||||
access_token = result["access_token"]
|
||||
else:
|
||||
logging.error("Failed to authenticate to Microsoft Graph API")
|
||||
logging.error(result.get("error"))
|
||||
logging.error(result.get("error_description"))
|
||||
logging.error(result.get("correlation_id"))
|
||||
return []
|
||||
|
||||
# Fetch app registrations with owners
|
||||
# Updated URL to include both app registration and owner data
|
||||
graph_url = (
|
||||
"https://graph.microsoft.com/v1.0/applications"
|
||||
"?$select=id,appId,displayName,passwordCredentials"
|
||||
"&$expand=owners($select=userPrincipalName)"
|
||||
)
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {access_token}",
|
||||
"ConsistencyLevel": "eventual"
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.get(graph_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
# Debug log the raw response
|
||||
#logging.info(f"API Response Status: {response.status_code}")
|
||||
#logging.debug(f"API Response: {response.text[:1000]}...") # First 1000 chars
|
||||
|
||||
app_registrations = response.json().get('value', [])
|
||||
logging.info(f"Fetched {len(app_registrations)} app registrations")
|
||||
return app_registrations
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
logging.error(f"Error fetching app registrations: {e}")
|
||||
return []
|
||||
|
||||
def sort_app_registrations(app_registrations):
|
||||
current_date = datetime.now(timezone.utc)
|
||||
for app in app_registrations:
|
||||
if app["passwordCredentials"]:
|
||||
expiry_date_str = app["passwordCredentials"][0]["endDateTime"]
|
||||
try:
|
||||
if '.' in expiry_date_str:
|
||||
expiry_date_str = expiry_date_str.split('.')[0] + '.' + expiry_date_str.split('.')[1][:6] + 'Z'
|
||||
if expiry_date_str.endswith('ZZ'):
|
||||
expiry_date_str = expiry_date_str[:-1]
|
||||
expiry_date = datetime.strptime(expiry_date_str, '%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=timezone.utc)
|
||||
except ValueError:
|
||||
expiry_date = datetime.strptime(expiry_date_str, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc)
|
||||
days_to_expiry = (expiry_date - current_date).days
|
||||
app["days_to_expiry"] = days_to_expiry
|
||||
app["expiry_date"] = expiry_date.isoformat()
|
||||
else:
|
||||
app["days_to_expiry"] = None
|
||||
app["expiry_date"] = None
|
||||
|
||||
sorted_apps = sorted(app_registrations, key=lambda x: (x["days_to_expiry"] is None, x["days_to_expiry"]), reverse=False)
|
||||
return sorted_apps
|
||||
|
||||
def generate_html(app_registrations):
|
||||
current_time = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
|
||||
html = f"""
|
||||
<html>
|
||||
<head>
|
||||
<title>App Registrations</title>
|
||||
<style>
|
||||
body {{
|
||||
font-family: Arial, sans-serif;
|
||||
margin: 20px;
|
||||
color: #333;
|
||||
}}
|
||||
.intro {{
|
||||
margin-bottom: 20px;
|
||||
line-height: 1.5;
|
||||
}}
|
||||
table {{
|
||||
width: 100%;
|
||||
border-collapse: collapse;
|
||||
}}
|
||||
th, td {{
|
||||
border: 1px solid black;
|
||||
padding: 8px;
|
||||
text-align: left;
|
||||
}}
|
||||
th {{
|
||||
background-color: #f2f2f2;
|
||||
}}
|
||||
.green {{
|
||||
background-color: #d4edda;
|
||||
}}
|
||||
.yellow {{
|
||||
background-color: #fff3cd;
|
||||
}}
|
||||
.orange {{
|
||||
background-color: #ffeeba;
|
||||
}}
|
||||
.red {{
|
||||
background-color: #f8d7da;
|
||||
}}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<div class="intro">
|
||||
<h2>Azure App Registration Expiry Notification</h2>
|
||||
<p>This is an automated notification regarding expiring Azure App Registrations that you own or manage.</p>
|
||||
|
||||
<p><strong>Why am I receiving this?</strong><br>
|
||||
You are receiving this email because you are listed as an owner of one or more Azure App Registrations that are approaching their expiration date or have already expired.</p>
|
||||
|
||||
<p><strong>Required Actions:</strong></p>
|
||||
<ul>
|
||||
<li>Review the list of app registrations below</li>
|
||||
<li>For any expiring or expired registrations:
|
||||
<ul>
|
||||
<li>Verify if the app registration is still needed</li>
|
||||
<li>If needed, renew the credentials before they expire</li>
|
||||
<li>If not needed, consider removing the app registration</li>
|
||||
</ul>
|
||||
</li>
|
||||
</ul>
|
||||
|
||||
<p><strong>Color Coding:</strong></p>
|
||||
<ul>
|
||||
<li style="background-color: #d4edda; padding: 3px;">Green: More than 30 days until expiry</li>
|
||||
<li style="background-color: #fff3cd; padding: 3px;">Yellow: Between 8-30 days until expiry</li>
|
||||
<li style="background-color: #ffeeba; padding: 3px;">Orange: 7 days or less until expiry</li>
|
||||
<li style="background-color: #f8d7da; padding: 3px;">Red: Expired</li>
|
||||
</ul>
|
||||
|
||||
<p>If you need assistance, please contact the IT Support team.</p>
|
||||
</div>
|
||||
|
||||
<h1>App Registrations</h1>
|
||||
<p>Exported on: {current_time}</p>
|
||||
<table>
|
||||
<tr>
|
||||
<th>Display Name</th>
|
||||
<th>Expiry Date</th>
|
||||
<th>Days to Expiry</th>
|
||||
<th>Owners</th>
|
||||
</tr>
|
||||
"""
|
||||
|
||||
for app in app_registrations:
|
||||
password_credentials = app.get('passwordCredentials', [])
|
||||
if not password_credentials:
|
||||
continue
|
||||
|
||||
expiry_date = password_credentials[0].get('endDateTime')
|
||||
if expiry_date:
|
||||
try:
|
||||
if '.' in expiry_date:
|
||||
expiry_date = expiry_date.split('.')[0] + '.' + expiry_date.split('.')[1][:6] + 'Z'
|
||||
if expiry_date.endswith('ZZ'):
|
||||
expiry_date = expiry_date[:-1]
|
||||
expiry_date = datetime.strptime(expiry_date, '%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=timezone.utc)
|
||||
except ValueError:
|
||||
expiry_date = datetime.strptime(expiry_date, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc)
|
||||
days_to_expiry = (expiry_date - datetime.now(timezone.utc)).days
|
||||
|
||||
if days_to_expiry > 30:
|
||||
color_class = "green"
|
||||
elif 7 < days_to_expiry <= 30:
|
||||
color_class = "yellow"
|
||||
elif 1 <= days_to_expiry <= 7:
|
||||
color_class = "orange"
|
||||
else:
|
||||
color_class = "red"
|
||||
days_to_expiry = "EXPIRED"
|
||||
|
||||
owners = app.get('owners', [])
|
||||
owner_upns = [owner.get('userPrincipalName') for owner in owners if owner.get('userPrincipalName')]
|
||||
owner_list = ', '.join(owner_upns) if owner_upns else 'No owners'
|
||||
|
||||
html += f"""
|
||||
<tr class="{color_class}">
|
||||
<td>{app['displayName']}</td>
|
||||
<td>{expiry_date.strftime('%Y-%m-%d')}</td>
|
||||
<td>{days_to_expiry}</td>
|
||||
<td>{owner_list}</td>
|
||||
</tr>
|
||||
"""
|
||||
|
||||
html += """
|
||||
</table>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
return html
|
||||
|
||||
if __name__ == "__main__":
|
||||
app_registrations = get_app_registrations()
|
||||
sorted_app_registrations = sort_app_registrations(app_registrations)
|
||||
# Write to JSON file for inspection
|
||||
with open('debug_app_registrations.json', 'w') as f:
|
||||
json.dump(sorted_app_registrations, f, indent=2)
|
||||
# Write to HTML file for inspection
|
||||
html_output = generate_html(sorted_app_registrations)
|
||||
with open('app_registrations.html', 'w') as f:
|
||||
f.write(html_output)
|
||||
234
data_export.py
234
data_export.py
@@ -1,234 +0,0 @@
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
# def write_to_json(app_registrations, filename='app_registrations.json'):
|
||||
# """
|
||||
# Write app registration data to a JSON file.
|
||||
|
||||
# :param app_registrations: List of app registration data
|
||||
# :param filename: Name of the JSON file to write to
|
||||
# """
|
||||
# try:
|
||||
# with open(filename, 'w') as f:
|
||||
# json.dump(app_registrations, f, indent=4)
|
||||
# logging.info(f"App registration data successfully written to {filename}")
|
||||
# except Exception as e:
|
||||
# logging.error(f"Failed to write app registration data to {filename}: {e}")
|
||||
|
||||
def generate_html(app_registrations):
|
||||
"""
|
||||
Generate an HTML representation of the app registration data.
|
||||
|
||||
:param app_registrations: List of app registration data
|
||||
:return: HTML string
|
||||
"""
|
||||
current_time = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
|
||||
html = f"""
|
||||
<html>
|
||||
<head>
|
||||
<title>App Registrations</title>
|
||||
<style>
|
||||
body {{
|
||||
font-family: Arial, sans-serif;
|
||||
margin: 20px;
|
||||
color: #333;
|
||||
}}
|
||||
.intro {{
|
||||
margin-bottom: 20px;
|
||||
line-height: 1.5;
|
||||
}}
|
||||
table {{
|
||||
width: 100%;
|
||||
border-collapse: collapse;
|
||||
}}
|
||||
th, td {{
|
||||
border: 1px solid black;
|
||||
padding: 8px;
|
||||
text-align: left;
|
||||
}}
|
||||
th {{
|
||||
background-color: #f2f2f2;
|
||||
}}
|
||||
.green {{
|
||||
background-color: #d4edda;
|
||||
}}
|
||||
.yellow {{
|
||||
background-color: #fff3cd;
|
||||
}}
|
||||
.orange {{
|
||||
background-color: #ffeeba;
|
||||
}}
|
||||
.red {{
|
||||
background-color: #f8d7da;
|
||||
}}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<div class="intro">
|
||||
<h2>Azure App Registration Expiry Notification</h2>
|
||||
<p>This is an automated notification regarding expiring Azure App Registrations that you own or manage.</p>
|
||||
|
||||
<p><strong>Why am I receiving this?</strong><br>
|
||||
You are receiving this email because you are listed as an owner of one or more Azure App Registrations that are approaching their expiration date or have already expired.</p>
|
||||
|
||||
<p><strong>Required Actions:</strong></p>
|
||||
<ul>
|
||||
<li>Review the list of app registrations below</li>
|
||||
<li>For any expiring or expired registrations:
|
||||
<ul>
|
||||
<li>Verify if the app registration is still needed</li>
|
||||
<li>If needed, renew the credentials before they expire</li>
|
||||
<li>If not needed, consider removing the app registration</li>
|
||||
</ul>
|
||||
</li>
|
||||
</ul>
|
||||
|
||||
<p><strong>Color Coding:</strong></p>
|
||||
<ul>
|
||||
<li style="background-color: #d4edda; padding: 3px;">Green: More than 30 days until expiry</li>
|
||||
<li style="background-color: #fff3cd; padding: 3px;">Yellow: Between 8-30 days until expiry</li>
|
||||
<li style="background-color: #ffeeba; padding: 3px;">Orange: 7 days or less until expiry</li>
|
||||
<li style="background-color: #f8d7da; padding: 3px;">Red: Expired</li>
|
||||
</ul>
|
||||
|
||||
<p>If you need assistance, please contact the IT Support team.</p>
|
||||
</div>
|
||||
|
||||
<h1>App Registrations</h1>
|
||||
<p>Exported on: {current_time}</p>
|
||||
<table>
|
||||
<tr>
|
||||
<th>Display Name</th>
|
||||
<th>Expiry Date</th>
|
||||
<th>Days to Expiry</th>
|
||||
<th>Owners</th>
|
||||
</tr>
|
||||
"""
|
||||
|
||||
for app in app_registrations:
|
||||
password_credentials = app.get('passwordCredentials', [])
|
||||
if not password_credentials:
|
||||
continue
|
||||
|
||||
expiry_date = password_credentials[0].get('endDateTime')
|
||||
if expiry_date:
|
||||
# Clean up the date string
|
||||
if expiry_date.endswith('ZZ'):
|
||||
expiry_date = expiry_date[:-1]
|
||||
elif expiry_date.endswith('Z'):
|
||||
expiry_date = expiry_date[:-1]
|
||||
# Truncate the fractional seconds part to 6 digits if present
|
||||
if '.' in expiry_date:
|
||||
expiry_date = expiry_date.split('.')[0] + '.' + expiry_date.split('.')[1][:6] + 'Z'
|
||||
else:
|
||||
expiry_date += '.000000Z'
|
||||
try:
|
||||
expiry_date = datetime.strptime(expiry_date, '%Y-%m-%dT%H:%M:%S.%fZ')
|
||||
except ValueError as e:
|
||||
logging.error(f"Error parsing expiry date for {app['displayName']}: {e}")
|
||||
continue
|
||||
days_to_expiry = (expiry_date - datetime.utcnow()).days
|
||||
|
||||
# Determine row color class
|
||||
if days_to_expiry > 30:
|
||||
color_class = "green"
|
||||
elif 7 < days_to_expiry <= 30:
|
||||
color_class = "yellow"
|
||||
elif 1 <= days_to_expiry <= 7:
|
||||
color_class = "orange"
|
||||
else:
|
||||
color_class = "red"
|
||||
days_to_expiry = "EXPIRED"
|
||||
|
||||
# Get owner information
|
||||
owners = app.get('owners', [])
|
||||
owner_upns = [owner.get('userPrincipalName') for owner in owners if owner.get('userPrincipalName')]
|
||||
owner_list = ', '.join(owner_upns) if owner_upns else 'No owners'
|
||||
|
||||
html += f"""
|
||||
<tr class="{color_class}">
|
||||
<td>{app['displayName']}</td>
|
||||
<td>{expiry_date.strftime('%Y-%m-%d')}</td>
|
||||
<td>{days_to_expiry}</td>
|
||||
<td>{owner_list}</td>
|
||||
</tr>
|
||||
"""
|
||||
|
||||
html += """
|
||||
</table>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
return html
|
||||
|
||||
def generate_expiry_text(app_name, days_to_expiry, expiry_date):
|
||||
if days_to_expiry > 30:
|
||||
color = "#28a745" # green
|
||||
elif days_to_expiry > 7:
|
||||
color = "#ffc107" # yellow
|
||||
elif days_to_expiry > 0:
|
||||
color = "#ff9800" # orange
|
||||
else:
|
||||
color = "#dc3545" # red
|
||||
days_to_expiry = "EXPIRED"
|
||||
|
||||
return f"""
|
||||
<div style="font-size: 26px; margin-bottom: 20px;">
|
||||
<p>The app registration <strong>{app_name}</strong> is set to expire in
|
||||
<span style="color: {color}; font-weight: bold; font-size: 28px;">
|
||||
{days_to_expiry}
|
||||
</span>
|
||||
days on <strong>{expiry_date.strftime('%Y-%m-%d')}</strong></p>
|
||||
</div>
|
||||
"""
|
||||
|
||||
def sort_app_registrations(app_registrations):
|
||||
current_date = datetime.utcnow()
|
||||
for app in app_registrations:
|
||||
expiry_date_str = app["passwordCredentials"][0]["endDateTime"]
|
||||
expiry_date = datetime.strptime(expiry_date_str.split('.')[0], '%Y-%m-%dT%H:%M:%S')
|
||||
days_to_expiry = (expiry_date - current_date).days
|
||||
app["days_to_expiry"] = days_to_expiry
|
||||
app["expiry_date"] = expiry_date
|
||||
|
||||
sorted_apps = sorted(app_registrations, key=lambda x: x["days_to_expiry"], reverse=False)
|
||||
return sorted_apps
|
||||
|
||||
# Example usage
|
||||
if __name__ == "__main__":
|
||||
# Sample app registration data
|
||||
app_registrations = [
|
||||
{
|
||||
"displayName": "App1",
|
||||
"passwordCredentials": [{"endDateTime": "2024-12-31T23:59:59.9999999Z"}]
|
||||
},
|
||||
{
|
||||
"displayName": "App2",
|
||||
"passwordCredentials": [{"endDateTime": "2023-12-31T23:59:59.9999999Z"}]
|
||||
},
|
||||
{
|
||||
"displayName": "App3",
|
||||
"passwordCredentials": [{"endDateTime": "2025-01-15T23:59:59.9999999Z"}]
|
||||
}
|
||||
]
|
||||
|
||||
sorted_apps = sort_app_registrations(app_registrations)
|
||||
html_output = ""
|
||||
for app in sorted_apps:
|
||||
html_output += generate_expiry_text(app["displayName"], app["days_to_expiry"], app["expiry_date"])
|
||||
|
||||
print(html_output)
|
||||
|
||||
# Write to JSON
|
||||
write_to_json(app_registrations)
|
||||
|
||||
# Generate HTML
|
||||
html_content = generate_html(app_registrations)
|
||||
with open('app_registrations.html', 'w') as f:
|
||||
f.write(html_content)
|
||||
logging.info("HTML content successfully written to app_registrations.html")
|
||||
75
getdrive.py
75
getdrive.py
@@ -1,75 +0,0 @@
|
||||
import os
|
||||
import requests
|
||||
import logging
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(asctime)s - %(message)s')
|
||||
|
||||
def get_access_token():
|
||||
client_id = os.getenv('AZURE_CLIENT_ID')
|
||||
client_secret = os.getenv('AZURE_CLIENT_SECRET')
|
||||
tenant_id = os.getenv('AZURE_TENANT_ID')
|
||||
|
||||
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
|
||||
token_data = {
|
||||
'grant_type': 'client_credentials',
|
||||
'client_id': client_id,
|
||||
'client_secret': client_secret,
|
||||
'scope': 'https://graph.microsoft.com/.default'
|
||||
}
|
||||
|
||||
token_response = requests.post(token_url, data=token_data)
|
||||
access_token = token_response.json().get('access_token')
|
||||
if not access_token:
|
||||
logging.error("Failed to obtain access token")
|
||||
logging.error(token_response.json())
|
||||
return None
|
||||
|
||||
return access_token
|
||||
|
||||
def list_drive_items(site_id, drive_id, parent_id=None):
|
||||
access_token = get_access_token()
|
||||
if not access_token:
|
||||
return None
|
||||
|
||||
headers = {
|
||||
'Authorization': f'Bearer {access_token}',
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
if parent_id:
|
||||
items_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/items/{parent_id}/children"
|
||||
else:
|
||||
items_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/root/children"
|
||||
|
||||
response = requests.get(items_url, headers=headers)
|
||||
if response.status_code != 200:
|
||||
logging.error(f"Failed to list drive items: {response.status_code}")
|
||||
logging.error(response.json())
|
||||
return None
|
||||
|
||||
items = response.json().get('value', [])
|
||||
if not items:
|
||||
logging.error("No items found in the drive")
|
||||
return None
|
||||
|
||||
for item in items:
|
||||
logging.info(f"Found item: {item.get('name')} with ID: {item.get('id')}")
|
||||
if item.get('folder'):
|
||||
# Recursively list items in the folder
|
||||
list_drive_items(site_id, drive_id, item.get('id'))
|
||||
|
||||
return items
|
||||
|
||||
if __name__ == "__main__":
|
||||
site_id = "opassey.sharepoint.com,8e038e2a-139b-4e46-893a-bcf76062e063,5dcd71be-16b7-42ba-add6-4068b88ef3aa"
|
||||
drive_id = os.getenv('SHAREPOINT_DRIVE_ID')
|
||||
items = list_drive_items(site_id, drive_id)
|
||||
if items:
|
||||
logging.info("Listed drive items successfully")
|
||||
else:
|
||||
logging.error("Failed to list drive items")
|
||||
66
getsites.py
66
getsites.py
@@ -1,66 +0,0 @@
|
||||
import os
|
||||
import requests
|
||||
import logging
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
def get_access_token():
|
||||
client_id = os.getenv('AZURE_CLIENT_ID')
|
||||
client_secret = os.getenv('AZURE_CLIENT_SECRET')
|
||||
tenant_id = os.getenv('AZURE_TENANT_ID')
|
||||
|
||||
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
|
||||
token_data = {
|
||||
'grant_type': 'client_credentials',
|
||||
'client_id': client_id,
|
||||
'client_secret': client_secret,
|
||||
'scope': 'https://graph.microsoft.com/.default'
|
||||
}
|
||||
|
||||
token_response = requests.post(token_url, data=token_data)
|
||||
access_token = token_response.json().get('access_token')
|
||||
if not access_token:
|
||||
logging.error("Failed to obtain access token")
|
||||
logging.error(token_response.json())
|
||||
return None
|
||||
|
||||
return access_token
|
||||
|
||||
def list_sites():
|
||||
access_token = get_access_token()
|
||||
if not access_token:
|
||||
return None
|
||||
|
||||
headers = {
|
||||
'Authorization': f'Bearer {access_token}',
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
sites_url = "https://graph.microsoft.com/v1.0/sites?search=*"
|
||||
response = requests.get(sites_url, headers=headers)
|
||||
if response.status_code != 200:
|
||||
logging.error(f"Failed to list sites: {response.status_code}")
|
||||
logging.error(response.json())
|
||||
return None
|
||||
|
||||
sites = response.json().get('value', [])
|
||||
if not sites:
|
||||
logging.error("No sites found")
|
||||
return None
|
||||
|
||||
for site in sites:
|
||||
logging.info(f"Found site: {site.get('name')} with ID: {site.get('id')}")
|
||||
|
||||
return sites
|
||||
|
||||
if __name__ == "__main__":
|
||||
sites = list_sites()
|
||||
if sites:
|
||||
logging.info("Listed sites successfully")
|
||||
else:
|
||||
logging.error("Failed to list sites")
|
||||
67
gettoken.py
67
gettoken.py
@@ -1,67 +0,0 @@
|
||||
import os
|
||||
import requests
|
||||
import logging
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
def get_access_token():
|
||||
client_id = os.getenv('AZURE_CLIENT_ID')
|
||||
client_secret = os.getenv('AZURE_CLIENT_SECRET')
|
||||
tenant_id = os.getenv('AZURE_TENANT_ID')
|
||||
|
||||
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
|
||||
token_data = {
|
||||
'grant_type': 'client_credentials',
|
||||
'client_id': client_id,
|
||||
'client_secret': client_secret,
|
||||
'scope': 'https://graph.microsoft.com/.default'
|
||||
}
|
||||
|
||||
token_response = requests.post(token_url, data=token_data)
|
||||
access_token = token_response.json().get('access_token')
|
||||
if not access_token:
|
||||
logging.error("Failed to obtain access token")
|
||||
logging.error(token_response.json())
|
||||
return None
|
||||
|
||||
return access_token
|
||||
|
||||
def get_drive_id(site_id):
|
||||
access_token = get_access_token()
|
||||
if not access_token:
|
||||
return None
|
||||
|
||||
headers = {
|
||||
'Authorization': f'Bearer {access_token}',
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
drives_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives"
|
||||
response = requests.get(drives_url, headers=headers)
|
||||
if response.status_code != 200:
|
||||
logging.error(f"Failed to get drives: {response.status_code}")
|
||||
logging.error(response.json())
|
||||
return None
|
||||
|
||||
drives = response.json().get('value', [])
|
||||
if not drives:
|
||||
logging.error("No drives found")
|
||||
return None
|
||||
|
||||
# Assuming you want the first drive in the list
|
||||
drive_id = drives[0].get('id')
|
||||
logging.info(f"Found drive ID: {drive_id}")
|
||||
return drive_id
|
||||
|
||||
if __name__ == "__main__":
|
||||
site_id = os.getenv('SHAREPOINT_SITE_ID')
|
||||
drive_id = get_drive_id(site_id)
|
||||
if drive_id:
|
||||
logging.info(f"Drive ID: {drive_id}")
|
||||
else:
|
||||
logging.error("Failed to obtain drive ID")
|
||||
16
main.py
16
main.py
@@ -1,16 +0,0 @@
|
||||
from azure_client import get_app_registrations
|
||||
from sharepoint_client import store_app_registrations
|
||||
from notification import send_notifications
|
||||
from data_export import write_to_json, generate_html
|
||||
|
||||
def main():
|
||||
app_registrations = get_app_registrations()
|
||||
store_app_registrations(app_registrations)
|
||||
write_to_json(app_registrations)
|
||||
html_content = generate_html(app_registrations)
|
||||
with open('app_registrations.html', 'w') as f:
|
||||
f.write(html_content)
|
||||
send_notifications(app_registrations)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
142
notification.py
142
notification.py
@@ -1,142 +0,0 @@
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
import smtplib
|
||||
from email.mime.text import MIMEText
|
||||
from email.utils import formataddr
|
||||
from datetime import datetime
|
||||
import requests
|
||||
import json
|
||||
import logging
|
||||
from data_export import generate_html, generate_expiry_text
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
def send_notifications(app_registrations):
|
||||
# Email credentials from environment variables
|
||||
smtp_server = os.getenv('SMTP_SERVER')
|
||||
smtp_port = int(os.getenv('SMTP_PORT'))
|
||||
smtp_username = os.getenv('SMTP_USERNAME')
|
||||
smtp_password = os.getenv('SMTP_PASSWORD')
|
||||
from_email = os.getenv('FROM_EMAIL')
|
||||
from_name = os.getenv('FROM_NAME')
|
||||
to_email = os.getenv('TO_EMAIL')
|
||||
|
||||
# Teams webhook URL from environment variables
|
||||
teams_webhook_url = os.getenv('TEAMS_WEBHOOK_URL')
|
||||
|
||||
# Get the current date
|
||||
current_date = datetime.utcnow()
|
||||
notification_periods = [60, 30, 7, 3, 2, 1]
|
||||
|
||||
# Generate HTML content
|
||||
html_content = generate_html(app_registrations)
|
||||
|
||||
# Send notification email and Teams message
|
||||
for app in app_registrations:
|
||||
# Debug log the entire app object
|
||||
#logging.info(f"Processing app: {json.dumps(app, indent=2)}")
|
||||
|
||||
password_credentials = app.get('passwordCredentials', [])
|
||||
if not password_credentials:
|
||||
logging.warning(f"No password credentials found for {app['displayName']}")
|
||||
continue
|
||||
|
||||
expiry_date = password_credentials[0].get('endDateTime')
|
||||
if expiry_date:
|
||||
# Clean up the date string
|
||||
if expiry_date.endswith('ZZ'):
|
||||
expiry_date = expiry_date[:-1]
|
||||
elif expiry_date.endswith('Z'):
|
||||
expiry_date = expiry_date[:-1]
|
||||
# Truncate the fractional seconds part to 6 digits if present
|
||||
if '.' in expiry_date:
|
||||
expiry_date = expiry_date.split('.')[0] + '.' + expiry_date.split('.')[1][:6] + 'Z'
|
||||
else:
|
||||
expiry_date += '.000000Z'
|
||||
try:
|
||||
expiry_date = datetime.strptime(expiry_date, '%Y-%m-%dT%H:%M:%S.%fZ')
|
||||
except ValueError as e:
|
||||
logging.error(f"Error parsing expiry date for {app['displayName']}: {e}")
|
||||
continue
|
||||
days_to_expiry = (expiry_date - current_date).days
|
||||
if days_to_expiry in notification_periods or days_to_expiry < 0:
|
||||
subject = f"App Registration Expiry Notification: {app['displayName']}"
|
||||
body = generate_expiry_text(app['displayName'], days_to_expiry, expiry_date) + html_content
|
||||
|
||||
# Fetch and debug log owner information
|
||||
owners = app.get('owners', [])
|
||||
#logging.info(f"Found owners for {app['displayName']}: {json.dumps(owners, indent=2)}")
|
||||
|
||||
# Get CC emails from owners
|
||||
cc_emails = []
|
||||
for owner in owners:
|
||||
email = owner.get('userPrincipalName') or owner.get('mail')
|
||||
if email:
|
||||
cc_emails.append(email)
|
||||
logging.info(f"Added owner email for {app['displayName']}: {email}")
|
||||
|
||||
# Create email message
|
||||
msg = MIMEText(body, 'html')
|
||||
msg['Subject'] = subject
|
||||
msg['From'] = formataddr((from_name, from_email))
|
||||
msg['To'] = to_email
|
||||
|
||||
# Add CC recipients if any found
|
||||
if cc_emails:
|
||||
msg['Cc'] = ', '.join(cc_emails)
|
||||
logging.info(f"Added CC recipients for {app['displayName']}: {cc_emails}")
|
||||
|
||||
try:
|
||||
# Include CC recipients in sendmail
|
||||
all_recipients = [to_email] + cc_emails
|
||||
logging.info(f"Sending email for {app['displayName']} to all recipients: {all_recipients}")
|
||||
|
||||
with smtplib.SMTP(smtp_server, smtp_port) as server:
|
||||
server.starttls()
|
||||
server.login(smtp_username, smtp_password)
|
||||
server.sendmail(from_email, all_recipients, msg.as_string())
|
||||
logging.info(f"Successfully sent email for {app['displayName']}")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to send email for {app['displayName']}: {e}")
|
||||
|
||||
# Send Teams notification
|
||||
teams_message = {
|
||||
"@type": "MessageCard",
|
||||
"@context": "http://schema.org/extensions",
|
||||
"summary": subject,
|
||||
"themeColor": "0076D7",
|
||||
"title": subject,
|
||||
"sections": [{
|
||||
"activityTitle": f"App Registration Expiry Notification",
|
||||
"text": body
|
||||
}]
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(teams_webhook_url, headers={"Content-Type": "application/json"}, json=teams_message)
|
||||
response.raise_for_status()
|
||||
logging.info(f"Teams notification sent for {app['displayName']}")
|
||||
except requests.exceptions.RequestException as e:
|
||||
logging.error(f"Failed to send Teams notification for {app['displayName']}: {e}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Sample app registration data for testing
|
||||
app_registrations = [
|
||||
{
|
||||
"displayName": "App1",
|
||||
"passwordCredentials": [{"endDateTime": "2024-12-31T23:59:59.9999999Z"}],
|
||||
"owners": [{"userPrincipalName": "owner1@example.com"}]
|
||||
},
|
||||
{
|
||||
"displayName": "App2",
|
||||
"passwordCredentials": [{"endDateTime": "2025-01-15T23:59:59.9999999Z"}],
|
||||
"owners": [{"userPrincipalName": "owner2@example.com"}]
|
||||
}
|
||||
]
|
||||
|
||||
# Send notifications
|
||||
send_notifications(app_registrations)
|
||||
@@ -1,180 +0,0 @@
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
import requests
|
||||
from datetime import datetime
|
||||
import logging
|
||||
import json
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
def get_access_token():
|
||||
client_id = os.getenv('AZURE_CLIENT_ID')
|
||||
client_secret = os.getenv('AZURE_CLIENT_SECRET')
|
||||
tenant_id = os.getenv('AZURE_TENANT_ID')
|
||||
|
||||
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
|
||||
token_data = {
|
||||
'grant_type': 'client_credentials',
|
||||
'client_id': client_id,
|
||||
'client_secret': client_secret,
|
||||
'scope': 'https://graph.microsoft.com/.default'
|
||||
}
|
||||
|
||||
token_response = requests.post(token_url, data=token_data)
|
||||
access_token = token_response.json().get('access_token')
|
||||
if not access_token:
|
||||
logging.error("Failed to obtain access token")
|
||||
logging.error(token_response.json())
|
||||
return None
|
||||
|
||||
return access_token
|
||||
|
||||
def get_drive_id(site_id):
|
||||
access_token = get_access_token()
|
||||
if not access_token:
|
||||
return None
|
||||
|
||||
headers = {
|
||||
'Authorization': f'Bearer {access_token}',
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
drives_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives"
|
||||
response = requests.get(drives_url, headers=headers)
|
||||
if response.status_code != 200:
|
||||
logging.error(f"Failed to get drives: {response.status_code}")
|
||||
logging.error(response.json())
|
||||
return None
|
||||
|
||||
drives = response.json().get('value', [])
|
||||
if not drives:
|
||||
logging.error("No drives found")
|
||||
return None
|
||||
|
||||
# Assuming you want the first drive in the list
|
||||
drive_id = drives[0].get('id')
|
||||
logging.info(f"Found drive ID: {drive_id}")
|
||||
return drive_id
|
||||
|
||||
def update_excel_file(site_id, drive_id, file_id, data):
|
||||
"""
|
||||
Update Excel Online file with new data using OneDrive for Business API
|
||||
"""
|
||||
client_id = os.getenv('AZURE_CLIENT_ID')
|
||||
client_secret = os.getenv('AZURE_CLIENT_SECRET')
|
||||
tenant_id = os.getenv('AZURE_TENANT_ID')
|
||||
|
||||
# Get access token
|
||||
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
|
||||
token_data = {
|
||||
'grant_type': 'client_credentials',
|
||||
'client_id': client_id,
|
||||
'client_secret': client_secret,
|
||||
'scope': 'https://graph.microsoft.com/.default'
|
||||
}
|
||||
|
||||
token_response = requests.post(token_url, data=token_data)
|
||||
access_token = token_response.json().get('access_token')
|
||||
if not access_token:
|
||||
logging.error("Failed to obtain access token")
|
||||
logging.error(token_response.json())
|
||||
return
|
||||
|
||||
headers = {
|
||||
'Authorization': f'Bearer {access_token}',
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
try:
|
||||
# Get the file details to verify it exists
|
||||
file_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/items/{file_id}"
|
||||
file_response = requests.get(file_url, headers=headers)
|
||||
file_response.raise_for_status()
|
||||
logging.info(f"Found file: {file_response.json().get('name')}")
|
||||
|
||||
# Clear existing data (except header)
|
||||
clear_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/items/{file_id}/workbook/worksheets/Sheet1/range(address='A2:D1000')"
|
||||
clear_data = {
|
||||
"values": [[""]*4]*999
|
||||
}
|
||||
response = requests.patch(clear_url, headers=headers, json=clear_data)
|
||||
response.raise_for_status()
|
||||
logging.info("Cleared existing data from Excel file")
|
||||
|
||||
# Write new data
|
||||
update_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/items/{file_id}/workbook/worksheets/Sheet1/range(address='A2:D{len(data)+1}')"
|
||||
update_data = {
|
||||
"values": data
|
||||
}
|
||||
response = requests.patch(update_url, headers=headers, json=update_data)
|
||||
response.raise_for_status()
|
||||
logging.info(f"Successfully wrote {len(data)} rows to Excel file")
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
logging.error(f"Error updating Excel file: {e}")
|
||||
if hasattr(e.response, 'text'):
|
||||
logging.error(f"Response content: {e.response.text}")
|
||||
raise
|
||||
|
||||
def store_app_registrations(app_registrations):
|
||||
# Get current date
|
||||
current_date = datetime.utcnow()
|
||||
|
||||
# Prepare data for Excel
|
||||
excel_data = []
|
||||
for app in app_registrations:
|
||||
password_credentials = app.get('passwordCredentials', [])
|
||||
if not password_credentials:
|
||||
logging.warning(f"No password credentials found for {app['displayName']}")
|
||||
continue
|
||||
|
||||
expiry_date = password_credentials[0].get('endDateTime')
|
||||
if expiry_date:
|
||||
if expiry_date.endswith('ZZ'):
|
||||
expiry_date = expiry_date[:-1]
|
||||
elif expiry_date.endswith('Z'):
|
||||
expiry_date = expiry_date[:-1]
|
||||
|
||||
try:
|
||||
expiry_date_obj = datetime.strptime(expiry_date.split('.')[0], '%Y-%m-%dT%H:%M:%S')
|
||||
days_to_expiry = (expiry_date_obj - current_date).days
|
||||
|
||||
# Get owner information
|
||||
owners = app.get('owners', [])
|
||||
owner_upns = [owner.get('userPrincipalName') for owner in owners if owner.get('userPrincipalName')]
|
||||
owner_list = ', '.join(owner_upns) if owner_upns else 'No owners'
|
||||
|
||||
excel_data.append([
|
||||
app['displayName'],
|
||||
expiry_date_obj.strftime('%Y-%m-%d'),
|
||||
str(days_to_expiry),
|
||||
owner_list
|
||||
])
|
||||
|
||||
except ValueError as e:
|
||||
logging.error(f"Error parsing expiry date for {app['displayName']}: {e}")
|
||||
continue
|
||||
|
||||
# Update Excel file
|
||||
try:
|
||||
site_id = os.getenv('SHAREPOINT_SITE_ID')
|
||||
drive_id = os.getenv('SHAREPOINT_DRIVE_ID')
|
||||
file_id = os.getenv('EXCEL_FILE_ID')
|
||||
update_excel_file(site_id, drive_id, file_id, excel_data)
|
||||
logging.info("Successfully updated Excel file")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to update Excel file: {e}")
|
||||
|
||||
logging.info("Finished processing app registrations")
|
||||
|
||||
if __name__ == "__main__":
|
||||
site_id = os.getenv('SHAREPOINT_SITE_ID')
|
||||
drive_id = get_drive_id(site_id)
|
||||
if drive_id:
|
||||
logging.info(f"Drive ID: {drive_id}")
|
||||
else:
|
||||
logging.error("Failed to obtain drive ID")
|
||||
@@ -1,64 +0,0 @@
|
||||
import os
|
||||
from datetime import datetime, timedelta
|
||||
from email.mime.text import MIMEText
|
||||
from email.utils import formataddr
|
||||
import smtplib
|
||||
import logging
|
||||
|
||||
def check_client_id_expiry():
|
||||
# Load environment variables
|
||||
client_id = os.getenv('AZURE_CLIENT_ID')
|
||||
client_secret = os.getenv('AZURE_CLIENT_SECRET')
|
||||
tenant_id = os.getenv('AZURE_TENANT_ID')
|
||||
smtp_server = os.getenv('SMTP_SERVER')
|
||||
smtp_port = int(os.getenv('SMTP_PORT'))
|
||||
smtp_username = os.getenv('SMTP_USERNAME')
|
||||
smtp_password = os.getenv('SMTP_PASSWORD')
|
||||
from_email = os.getenv('FROM_EMAIL')
|
||||
from_name = os.getenv('FROM_NAME')
|
||||
admin_email = os.getenv('ADMIN_EMAIL')
|
||||
|
||||
# Check if the client ID is expiring soon
|
||||
expiry_date_str = os.getenv('CLIENT_ID_EXPIRY_DATE')
|
||||
if not expiry_date_str:
|
||||
logging.error("CLIENT_ID_EXPIRY_DATE not set in environment variables")
|
||||
return
|
||||
|
||||
try:
|
||||
expiry_date = datetime.strptime(expiry_date_str, '%Y-%m-%d')
|
||||
except ValueError as e:
|
||||
logging.error(f"Error parsing CLIENT_ID_EXPIRY_DATE: {e}")
|
||||
return
|
||||
|
||||
days_to_expiry = (expiry_date - datetime.utcnow()).days
|
||||
|
||||
if days_to_expiry <= 30:
|
||||
subject = "Warning: Azure Client ID Expiry Notification"
|
||||
body = f"""
|
||||
<html>
|
||||
<body>
|
||||
<p>The Azure Client ID <strong>{client_id}</strong> is set to expire in
|
||||
<span style="color: red; font-weight: bold;">{days_to_expiry} days</span>
|
||||
on <strong>{expiry_date.strftime('%Y-%m-%d')}</strong>.</p>
|
||||
<p>Please take the necessary actions to renew the client ID before it expires.</p>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
# Create email message
|
||||
msg = MIMEText(body, 'html')
|
||||
msg['Subject'] = subject
|
||||
msg['From'] = formataddr((from_name, from_email))
|
||||
msg['To'] = admin_email
|
||||
|
||||
try:
|
||||
with smtplib.SMTP(smtp_server, smtp_port) as server:
|
||||
server.starttls()
|
||||
server.login(smtp_username, smtp_password)
|
||||
server.sendmail(from_email, admin_email, msg.as_string())
|
||||
logging.info(f"Successfully sent client ID expiry warning to {admin_email}")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to send client ID expiry warning: {e}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
check_client_id_expiry()
|
||||
Reference in New Issue
Block a user