AWS Lambda script to delete Inactive Users

1 year ago 79
BOOK THIS SPACE FOR AD
ARTICLE AD

I was doing learning withing AWS and playing around with AWS lambda functions. This paritcular lambda functions utilizes boto3 library to delete all inactive user account in a aws account.

We may also convert the lambda function in the form of stackset to deploy it across all aws accounts.

import json
import boto3
from datetime import datetime
from datetime import timedelta
from botocore.exceptions import ClientError
import requests
import os

date_now = datetime.now()
iam_client = boto3.client('iam')
iam = boto3.resource('iam')
max_idle_days = 30
max_items = 20

##this is after error debug of not able to send msg to slack

#slack

url = os.environ['WEBHOOK_URL']
slack_channel = os.environ['SLACK_CHANNEL_NAME']
slack_emoji = ":aws-iam:"
slack_bot_username = "IAM Bot" # Slackbot Username
slack_message_title = ""

headers = {
'Content-Type': "application/json",
'User-Agent': "PostmanRuntime/7.19.0",
'Accept': "*/*",
'Cache-Control': "no-cache",
'Host': "hooks.slack.com",
'Accept-Encoding': "gzip, deflate",
'Content-Length': "497",
'Connection': "keep-alive",
'cache-control': "no-cache"
}

def get_sensored_access_key(access_key):
first_four = access_key[:4]
last_four = access_key[-4:]
return first_four + "*********" + last_four

def get_account_alias():
aliases = iam_client.list_account_aliases()['AccountAliases']
alias = ""
if len(aliases) == 0:
alias = id = boto3.client('sts').get_caller_identity().get('Account')
else:
alias = aliases[0]
return alias

def get_slack_payload(user, is_access_key, access_key="", arn="", diff=-1):

account_alias = get_account_alias()
payload = ""
if diff == -1:
diff = "Never"
else:
diff = str(diff) + " days"

payload = """{
\n\t\"channel\": \"#""" + slack_channel + """\",
\n\t\"username\": \"""" + slack_bot_username + """\",
\n\t\"icon_emoji\": \"""" + slack_emoji + """\",
\n\t\"attachments\":[\n
{\n
\"fallback\":\"Access Key Deactivated\",\n
\"pretext\":\"Access Key Deactivated\",\n
\"color\":\"#34bb13\",\n
\"fields\":[\n
{\n
\"value\":\"*Account:* """ + account_alias + """\n*User:* """ + user + """\n*ARN:* """ + arn + """\n*Access Key:* """ + access_key + """\n*Last Accessed:* """ + diff + """ \"\n
}\n
]\n
}\n
]\n
}"""

return payload

####main code #####
def lambda_handler(event, context):
try:
print("Running Lambda")
res_users = iam_client.list_users(
MaxItems=max_items
)
for user in res_users['Users']:
#print (user)
check_access_keys(user)

##to send general slack alert
webhook = 'https://hooks.slack.com/services/abc'
slack_data = {'text': "Lambda Ran"}

r = requests.post(
webhook, data=json.dumps(slack_data),
headers={'Content-Type': 'application/json'}
)
if r.status_code != 200:
raise ValueError(
'Request to slack returned an error %s, the response is:\n%s'
% (r.status_code, r.text))

except ClientError as error:
print('An error occurred while fetching user list.', error)

if res_users['IsTruncated']:
while res_users['IsTruncated']:
marker = res_users['Marker']
try:
res_users = iam_client.list_users(Marker=marker,MaxItems=max_items)
for user in res_users['Users']:
#check_login_profile(user)

check_access_keys(user)
except ClientError as error:
print('An error occurred while fetching user list.', error)

def check_access_keys(userData):
created_date = datetime.now()
last_used_date = datetime.now()
access_key_id = None

username = userData['UserName']
user_arn = userData['Arn']

# Below we are checking for access keys last usage
try:

res_keys = iam_client.list_access_keys(UserName=username,MaxItems=2)
#print(res_keys)
if 'AccessKeyMetadata' in res_keys:
for key in res_keys['AccessKeyMetadata']:
if 'CreateDate' in key:
created_date = res_keys['AccessKeyMetadata'][0]['CreateDate'].replace(tzinfo=None)
if 'AccessKeyId' in key:
access_key_id = key['AccessKeyId']
#print(access_key_id)

res_last_used_key = iam_client.get_access_key_last_used(AccessKeyId=access_key_id)
#print(res_last_used_key)
if 'LastUsedDate' in res_last_used_key['AccessKeyLastUsed']:
last_used_date = res_last_used_key['AccessKeyLastUsed']['LastUsedDate'].replace(tzinfo=None)

else:
last_used_date = created_date

difference = date_now - last_used_date

access_key_status = key['Status'] # Get status of the access keys

###working
if difference.days > max_idle_days and access_key_status == "Inactive":
access_key=iam.AccessKey(username,access_key_id)
#for logging help in cloudwatch
print(username)
print(access_key_id+ " deleted")

#Deactivate Access key

print("last access key block")
response = requests.request("POST", url, data=get_slack_payload(username, True, get_sensored_access_key(access_key_id), user_arn, diff=difference.days), headers=headers)
if response.status_code != 200:
raise ValueError(
'Request to slack returned an error %s, the response is:\n%s'
% (response.status_code, response.text))

response = access_key.delete()

except ClientError as error:
print('An error occurred while listing access keys', error)

So here the logic has been like if an access token has been inactive for more than 30 days, we will perform the cleanup and directly delete it. Do note that here we have assumed that, we are deleting the users whose access token is inactive. We may also change the logic to delete the access key which might be active but which are not used for more than 30 days. We just need to do some basic changes in the logic which anyone reading may easily figure out.

Feel free to let me know in comments if any help is needed to modify the lambda function.

Read Entire Article