v1
This commit is contained in:
5
.funcignore
Normal file
5
.funcignore
Normal file
@@ -0,0 +1,5 @@
|
||||
.git*
|
||||
.vscode
|
||||
local.settings.json
|
||||
test
|
||||
.venv
|
||||
5
.gitignore
vendored
5
.gitignore
vendored
@@ -1 +1,6 @@
|
||||
/aio/__pycache__
|
||||
|
||||
.python_packages
|
||||
.venv
|
||||
local.settings.json
|
||||
local.settings copy.json
|
||||
|
||||
254
aio/__init__.py
254
aio/__init__.py
@@ -13,12 +13,14 @@ import azure.functions as func
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
def main(myTimer: func.TimerRequest) -> None:
|
||||
logging.info("Processing a request to fetch app registrations.")
|
||||
logging.info("Processing a request to fetch app registrations and other credentials.")
|
||||
|
||||
app_registrations = get_app_registrations()
|
||||
if app_registrations:
|
||||
entra_id_accounts = get_entra_id_accounts_password_expiry()
|
||||
|
||||
if app_registrations or entra_id_accounts:
|
||||
sorted_app_registrations = sort_app_registrations(app_registrations)
|
||||
send_notifications(sorted_app_registrations)
|
||||
send_notifications(sorted_app_registrations, entra_id_accounts)
|
||||
|
||||
def get_app_registrations():
|
||||
logging.info("Authenticating to Microsoft Graph API")
|
||||
@@ -75,11 +77,93 @@ def get_app_registrations():
|
||||
logging.error(f"Error fetching app registrations: {e}")
|
||||
return []
|
||||
|
||||
def get_entra_id_accounts_password_expiry():
|
||||
logging.info("Fetching Entra ID accounts password expiry")
|
||||
|
||||
# Azure AD app credentials from environment variables
|
||||
client_id = os.getenv('AZURE_CLIENT_ID')
|
||||
client_secret = os.getenv('AZURE_CLIENT_SECRET')
|
||||
tenant_id = os.getenv('AZURE_TENANT_ID')
|
||||
|
||||
# Get the list of specific accounts to monitor
|
||||
# Format should be comma-separated UPNs (user principal names)
|
||||
accounts_to_monitor = os.getenv('MONITORED_ACCOUNTS', '')
|
||||
accounts_list = [account.strip() for account in accounts_to_monitor.split(',') if account.strip()]
|
||||
|
||||
if not accounts_list:
|
||||
logging.warning("No specific Entra ID accounts configured for monitoring")
|
||||
return []
|
||||
|
||||
logging.info(f"Configured to monitor {len(accounts_list)} specific Entra ID accounts")
|
||||
|
||||
authority = f"https://login.microsoftonline.com/{tenant_id}"
|
||||
scope = ["https://graph.microsoft.com/.default"]
|
||||
|
||||
# Create a confidential client application
|
||||
app = msal.ConfidentialClientApplication(
|
||||
client_id,
|
||||
authority=authority,
|
||||
client_credential=client_secret
|
||||
)
|
||||
|
||||
# Acquire a token
|
||||
result = app.acquire_token_for_client(scopes=scope)
|
||||
|
||||
if "access_token" in result:
|
||||
logging.info("Successfully authenticated to Microsoft Graph API")
|
||||
access_token = result["access_token"]
|
||||
else:
|
||||
logging.error("Failed to authenticate to Microsoft Graph API")
|
||||
logging.error(result.get("error"))
|
||||
logging.error(result.get("error_description"))
|
||||
logging.error(result.get("correlation_id"))
|
||||
return []
|
||||
|
||||
# Create headers for the request
|
||||
headers = {
|
||||
"Authorization": f"Bearer {access_token}",
|
||||
"ConsistencyLevel": "eventual"
|
||||
}
|
||||
|
||||
# Collect account information for each user
|
||||
entra_id_accounts = []
|
||||
|
||||
for upn in accounts_list:
|
||||
try:
|
||||
# Use the user's UPN to query specific user details - now including mail field
|
||||
graph_url = f"https://graph.microsoft.com/v1.0/users/{upn}?$select=id,displayName,userPrincipalName,mail,passwordPolicies,passwordProfile"
|
||||
|
||||
logging.info(f"Fetching details for account: {upn}")
|
||||
response = requests.get(graph_url, headers=headers)
|
||||
|
||||
if response.status_code == 200:
|
||||
account = response.json()
|
||||
entra_id_accounts.append(account)
|
||||
logging.info(f"Successfully fetched details for account: {upn}")
|
||||
else:
|
||||
logging.warning(f"Failed to fetch details for account {upn}: {response.status_code} {response.reason}")
|
||||
logging.warning(f"Response: {response.text[:200]}...")
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
logging.error(f"Error fetching details for account {upn}: {e}")
|
||||
|
||||
logging.info(f"Fetched {len(entra_id_accounts)} Entra ID accounts out of {len(accounts_list)} configured")
|
||||
return entra_id_accounts
|
||||
|
||||
def sort_app_registrations(app_registrations):
|
||||
current_date = datetime.now(timezone.utc)
|
||||
for app in app_registrations:
|
||||
for credential in app["passwordCredentials"]:
|
||||
expiry_date_str = credential["endDateTime"]
|
||||
|
||||
# Include the credential display name or ID for identification
|
||||
credential_name = credential.get("displayName", "")
|
||||
if not credential_name:
|
||||
# Use the last 4 characters of the keyId as an identifier if no display name
|
||||
credential_name = f"Secret {credential.get('keyId', '')[-4:]}"
|
||||
|
||||
credential["name"] = credential_name
|
||||
|
||||
try:
|
||||
if '.' in expiry_date_str:
|
||||
expiry_date_str = expiry_date_str.split('.')[0] + '.' + expiry_date_str.split('.')[1][:6] + 'Z'
|
||||
@@ -95,12 +179,12 @@ def sort_app_registrations(app_registrations):
|
||||
sorted_apps = sorted(app_registrations, key=lambda x: (min([cred["days_to_expiry"] for cred in x["passwordCredentials"]]) if x["passwordCredentials"] else float('inf')), reverse=False)
|
||||
return sorted_apps
|
||||
|
||||
def generate_html(app_registrations):
|
||||
def generate_html(app_registrations, entra_id_accounts):
|
||||
current_time = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
|
||||
html = f"""
|
||||
<html>
|
||||
<head>
|
||||
<title>App Registrations</title>
|
||||
<title>App Registrations and Credentials</title>
|
||||
<style>
|
||||
body {{
|
||||
font-family: Arial, sans-serif;
|
||||
@@ -135,24 +219,27 @@ def generate_html(app_registrations):
|
||||
.red {{
|
||||
background-color: #f8d7da;
|
||||
}}
|
||||
.blue {{
|
||||
background-color: #cce5ff;
|
||||
}}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<div class="intro">
|
||||
<h2>Azure App Registration Expiry Notification</h2>
|
||||
<p>This is an automated notification regarding expiring Azure App Registrations that you own or manage.</p>
|
||||
<h2>Azure App Registration and Credential Expiry Notification</h2>
|
||||
<p>This is an automated notification regarding expiring Azure App Registrations and Entra ID account passwords that you own or manage.</p>
|
||||
|
||||
<p><strong>Why am I receiving this?</strong><br>
|
||||
You are receiving this email because you are listed as an owner of one or more Azure App Registrations that are approaching their expiration date or have already expired.</p>
|
||||
You are receiving this email because you are listed as an owner of one or more Azure App Registrations or Entra ID accounts that are approaching their expiration date or have already expired.</p>
|
||||
|
||||
<p><strong>Required Actions:</strong></p>
|
||||
<ul>
|
||||
<li>Review the list of app registrations below</li>
|
||||
<li>For any expiring or expired registrations:
|
||||
<li>Review the list of app registrations and accounts below</li>
|
||||
<li>For any expiring or expired credentials:
|
||||
<ul>
|
||||
<li>Verify if the app registration is still needed</li>
|
||||
<li>Verify if the credential is still needed</li>
|
||||
<li>If needed, renew the credentials before they expire</li>
|
||||
<li>If not needed, consider removing the app registration</li>
|
||||
<li>If not needed, consider removing the credential</li>
|
||||
</ul>
|
||||
</li>
|
||||
</ul>
|
||||
@@ -163,6 +250,7 @@ def generate_html(app_registrations):
|
||||
<li style="background-color: #fff3cd; padding: 3px;">Yellow: Between 8-30 days until expiry</li>
|
||||
<li style="background-color: #ffeeba; padding: 3px;">Orange: 7 days or less until expiry</li>
|
||||
<li style="background-color: #f8d7da; padding: 3px;">Red: Expired</li>
|
||||
<li style="background-color: #cce5ff; padding: 3px;">Blue: No expiration set / Password never expires</li>
|
||||
</ul>
|
||||
|
||||
<p>If you need assistance, please contact the IT Support team.</p>
|
||||
@@ -173,12 +261,14 @@ def generate_html(app_registrations):
|
||||
<table>
|
||||
<tr>
|
||||
<th>Display Name</th>
|
||||
<th>Secret Name</th>
|
||||
<th>Expiry Date</th>
|
||||
<th>Days to Expiry</th>
|
||||
<th>Owners</th>
|
||||
</tr>
|
||||
"""
|
||||
|
||||
# Process app registrations
|
||||
for app in app_registrations:
|
||||
owners = app.get('owners', [])
|
||||
owner_upns = [owner.get('userPrincipalName') for owner in owners if owner.get('userPrincipalName')]
|
||||
@@ -187,6 +277,7 @@ def generate_html(app_registrations):
|
||||
for credential in app.get('passwordCredentials', []):
|
||||
expiry_date = credential.get('expiry_date')
|
||||
days_to_expiry = credential.get('days_to_expiry')
|
||||
secret_name = credential.get('name', 'Secret')
|
||||
|
||||
if days_to_expiry is not None:
|
||||
if days_to_expiry > 30:
|
||||
@@ -205,11 +296,118 @@ def generate_html(app_registrations):
|
||||
html += f"""
|
||||
<tr class="{color_class}">
|
||||
<td>{app['displayName']}</td>
|
||||
<td>{expiry_date.split('T')[0]}</td>
|
||||
<td>{secret_name}</td>
|
||||
<td>{expiry_date.split('T')[0] if expiry_date else 'N/A'}</td>
|
||||
<td>{days_to_expiry}</td>
|
||||
<td>{owner_list}</td>
|
||||
</tr>
|
||||
"""
|
||||
|
||||
# Process Entra ID accounts section
|
||||
html += """
|
||||
</table>
|
||||
<h1>Entra ID Accounts</h1>
|
||||
<table>
|
||||
<tr>
|
||||
<th>Display Name</th>
|
||||
<th>User Principal Name</th>
|
||||
<th>Password Expiry Date</th>
|
||||
<th>Status</th>
|
||||
</tr>
|
||||
"""
|
||||
|
||||
# Check if entra_id_accounts is not empty and has items
|
||||
if entra_id_accounts and len(entra_id_accounts) > 0:
|
||||
for account in entra_id_accounts:
|
||||
try:
|
||||
# Safe access to display name and userPrincipalName
|
||||
display_name = account.get('displayName', 'Unknown')
|
||||
upn = account.get('userPrincipalName', 'Unknown')
|
||||
|
||||
# Get the clean email - use mail property if available, otherwise use UPN
|
||||
email = account.get('mail') or upn
|
||||
|
||||
# If it's a guest account with #EXT#, display a cleaner version
|
||||
if '#EXT#' in upn and not account.get('mail'):
|
||||
# Extract the email portion before the #EXT# part
|
||||
try:
|
||||
# Format is typically: username_domain.com#EXT#@tenant.onmicrosoft.com
|
||||
# We want to convert to: username@domain.com
|
||||
parts = upn.split('#EXT#')[0]
|
||||
if '_' in parts:
|
||||
username, domain = parts.rsplit('_', 1)
|
||||
email = f"{username}@{domain}"
|
||||
except:
|
||||
# If parsing fails, keep the original
|
||||
email = upn
|
||||
|
||||
# Check password policies to see if password never expires
|
||||
password_policies = account.get('passwordPolicies', '')
|
||||
if password_policies is None:
|
||||
password_policies = ''
|
||||
password_never_expires = 'DisablePasswordExpiration' in password_policies
|
||||
|
||||
# Safe access to passwordProfile and passwordExpirationDateTime
|
||||
password_profile = account.get('passwordProfile')
|
||||
password_expiry_date_str = None
|
||||
|
||||
if password_profile and isinstance(password_profile, dict):
|
||||
password_expiry_date_str = password_profile.get('passwordExpirationDateTime')
|
||||
|
||||
if password_never_expires:
|
||||
color_class = "blue"
|
||||
expiry_date_display = "N/A"
|
||||
status = "Password Never Expires"
|
||||
elif password_expiry_date_str:
|
||||
password_expiry_date = datetime.strptime(password_expiry_date_str, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc)
|
||||
expiry_date_display = password_expiry_date.strftime('%Y-%m-%d')
|
||||
days_to_expiry = (password_expiry_date - datetime.now(timezone.utc)).days
|
||||
|
||||
if days_to_expiry > 30:
|
||||
color_class = "green"
|
||||
status = f"{days_to_expiry} days remaining"
|
||||
elif 7 < days_to_expiry <= 30:
|
||||
color_class = "yellow"
|
||||
status = f"{days_to_expiry} days remaining"
|
||||
elif 1 <= days_to_expiry <= 7:
|
||||
color_class = "orange"
|
||||
status = f"{days_to_expiry} days remaining"
|
||||
else:
|
||||
color_class = "red"
|
||||
status = "EXPIRED"
|
||||
else:
|
||||
color_class = "blue"
|
||||
expiry_date_display = "N/A"
|
||||
status = "No Expiration Set"
|
||||
|
||||
html += f"""
|
||||
<tr class="{color_class}">
|
||||
<td>{display_name}</td>
|
||||
<td>{email}</td>
|
||||
<td>{expiry_date_display}</td>
|
||||
<td>{status}</td>
|
||||
</tr>
|
||||
"""
|
||||
except Exception as e:
|
||||
# Log full exception details for better debugging
|
||||
logging.error(f"Error processing Entra ID account {account.get('userPrincipalName', 'Unknown')}: {e}")
|
||||
logging.error(f"Account data: {account}")
|
||||
|
||||
# Add a row for the account with error information
|
||||
html += f"""
|
||||
<tr class="red">
|
||||
<td>{account.get('displayName', 'Unknown')}</td>
|
||||
<td>{account.get('userPrincipalName', 'Unknown')}</td>
|
||||
<td>Error</td>
|
||||
<td>Failed to process account information</td>
|
||||
</tr>
|
||||
"""
|
||||
else:
|
||||
html += """
|
||||
<tr>
|
||||
<td colspan="4">No Entra ID accounts found or unable to access accounts</td>
|
||||
</tr>
|
||||
"""
|
||||
|
||||
html += """
|
||||
</table>
|
||||
@@ -219,7 +417,7 @@ def generate_html(app_registrations):
|
||||
|
||||
return html
|
||||
|
||||
def send_notifications(app_registrations):
|
||||
def send_notifications(app_registrations, entra_id_accounts):
|
||||
# Email credentials from environment variables
|
||||
smtp_server = os.getenv('SMTP_SERVER')
|
||||
smtp_port = int(os.getenv('SMTP_PORT'))
|
||||
@@ -230,7 +428,7 @@ def send_notifications(app_registrations):
|
||||
to_email = os.getenv('TO_EMAIL')
|
||||
|
||||
# Generate HTML content
|
||||
html_content = generate_html(app_registrations)
|
||||
html_content = generate_html(app_registrations, entra_id_accounts)
|
||||
|
||||
# Collect unique owner email addresses
|
||||
unique_owner_emails = set()
|
||||
@@ -239,22 +437,40 @@ def send_notifications(app_registrations):
|
||||
for owner in owners:
|
||||
email = owner.get('userPrincipalName')
|
||||
if email:
|
||||
# Clean up guest user email addresses
|
||||
if '#EXT#' in email:
|
||||
try:
|
||||
# Format is typically: username_domain.com#EXT#@tenant.onmicrosoft.com
|
||||
# We want to convert to: username@domain.com
|
||||
parts = email.split('#EXT#')[0]
|
||||
if '_' in parts:
|
||||
username, domain = parts.rsplit('_', 1)
|
||||
email = f"{username}@{domain}"
|
||||
except Exception as e:
|
||||
logging.warning(f"Error cleaning up guest email {email}: {e}")
|
||||
|
||||
unique_owner_emails.add(email)
|
||||
|
||||
# Create email message
|
||||
subject = "App Registration Expiry Notification"
|
||||
subject = "App Registration and Credential Expiry Notification"
|
||||
msg = MIMEText(html_content, 'html')
|
||||
msg['Subject'] = subject
|
||||
msg['From'] = formataddr((from_name, from_email))
|
||||
msg['To'] = to_email
|
||||
msg['Cc'] = ', '.join(unique_owner_emails)
|
||||
|
||||
# Only include CC if there are owner emails
|
||||
if unique_owner_emails:
|
||||
msg['Cc'] = ', '.join(unique_owner_emails)
|
||||
recipients = [to_email] + list(unique_owner_emails)
|
||||
else:
|
||||
recipients = [to_email]
|
||||
|
||||
try:
|
||||
logging.info(f"Sending email to {to_email} with CC to {', '.join(unique_owner_emails)}")
|
||||
logging.info(f"Sending email to {to_email}" + (f" with CC to {', '.join(unique_owner_emails)}" if unique_owner_emails else ""))
|
||||
with smtplib.SMTP(smtp_server, smtp_port) as server:
|
||||
server.starttls()
|
||||
server.login(smtp_username, smtp_password)
|
||||
server.sendmail(from_email, [to_email] + list(unique_owner_emails), msg.as_string())
|
||||
server.sendmail(from_email, recipients, msg.as_string())
|
||||
logging.info("Successfully sent email")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to send email: {e}")
|
||||
@@ -37,21 +37,39 @@ stages:
|
||||
displayName: 'Deploy Function App Code'
|
||||
steps:
|
||||
- checkout: self
|
||||
- task: UsePythonVersion@0
|
||||
inputs:
|
||||
versionSpec: '3.11'
|
||||
addToPath: true
|
||||
- script: |
|
||||
python3.11 -m venv .venv
|
||||
python -m pip install --upgrade pip
|
||||
python -m venv .venv
|
||||
source .venv/bin/activate
|
||||
pip install -r requirements.txt
|
||||
displayName: 'Install dependencies'
|
||||
- task: ArchiveFiles@2
|
||||
|
||||
# Create a proper structure for the function app
|
||||
- script: |
|
||||
# Create the zip file with proper structure
|
||||
echo "Creating function app archive..."
|
||||
zip -r $(Build.ArtifactStagingDirectory)/functionapp.zip .
|
||||
ls -la $(Build.ArtifactStagingDirectory)/
|
||||
displayName: 'Archive function app files'
|
||||
|
||||
# Publish the artifact with a specific name
|
||||
- task: PublishPipelineArtifact@1
|
||||
inputs:
|
||||
rootFolderOrFile: '$(System.DefaultWorkingDirectory)'
|
||||
includeRootFolder: false
|
||||
archiveType: 'zip'
|
||||
archiveFile: '$(Build.ArtifactStagingDirectory)/$(Build.BuildId).zip'
|
||||
replaceExistingArchive: true
|
||||
- task: AzureWebApp@1
|
||||
targetPath: '$(Build.ArtifactStagingDirectory)/functionapp.zip'
|
||||
artifact: 'functionapp'
|
||||
publishLocation: 'pipeline'
|
||||
displayName: 'Publish Function App Artifact'
|
||||
|
||||
# Deploy using the AzureFunctionApp task which is better for Function Apps
|
||||
- task: AzureFunctionApp@1
|
||||
inputs:
|
||||
azureSubscription: $(azureServiceConnection)
|
||||
appType: 'functionapp'
|
||||
appType: 'functionApp'
|
||||
appName: $(functionAppName)
|
||||
package: '$(Build.ArtifactStagingDirectory)/$(Build.BuildId).zip'
|
||||
package: '$(Build.ArtifactStagingDirectory)/functionapp.zip'
|
||||
deploymentMethod: 'zipDeploy'
|
||||
displayName: 'Deploy Function App'
|
||||
Reference in New Issue
Block a user