-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlambda_function.py
More file actions
150 lines (122 loc) · 4.93 KB
/
lambda_function.py
File metadata and controls
150 lines (122 loc) · 4.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#!/usr/bin/python
import sys
import boto3
import time
import random
import string
import os
import json
# Declare environment variables and load from Lambda
s3_bucket_for_logging = os.environ.get("s3_bucket_for_logging")
queue_url = os.environ.get("queue_url")
log_folder_prefix = os.environ.get("log_folder_prefix")
log_object_prefix = os.environ.get("log_object_prefix")
gzip_enabled = os.environ.get("gzip_enabled")
# Cleanup environment variables with bad trailing characters
if log_folder_prefix[-1] == '/':
log_folder_prefix = log_folder_prefix[0:(len(log_folder_prefix) - 1)]
if log_object_prefix[-1] == '_':
log_object_prefix = log_object_prefix[0:(len(log_object_prefix) - 1)]
def process_messages():
processed_msg_count = 0
s3_client = boto3.client('s3')
sqs_client = boto3.client('sqs')
try:
queue_attrib = sqs_client.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=[
'ApproximateNumberOfMessages'
]
)
except sqs_client.exceptions.QueueDoesNotExist as e:
print(f"QueueDoesNotExist: {e}")
raise e
except Exception as e:
print(f'Unexpected error from SQS client: {e}')
raise e
queue_size = int(queue_attrib['Attributes']['ApproximateNumberOfMessages'])
if queue_size > 0:
print(f"Queue Size: {queue_size}")
else:
print("Queue is empty. Finalizing and generating report...")
# Return report content
return {'queue_size': 0, 'batches': 0, 'processed_messages': 0, 'avg_batch_size': 0}
if gzip_enabled.lower() in ['true', '1', 't', 'y', 'yes']:
print("GZIP Compression Enabled.")
import zlib
object_ext = 'json.gz'
object_content_type = 'application/x-gzip'
else:
print("GZIP Compression Disabled.")
object_ext = 'json'
object_content_type = 'application/json'
batch = 0
while batch < queue_size:
batch += 1
msg_batch = sqs_client.receive_message(
QueueUrl=queue_url,
AttributeNames=[
'SentTimestamp'
],
MaxNumberOfMessages=10,
MessageAttributeNames=[
'All'
],
VisibilityTimeout=30,
WaitTimeSeconds=0
)
if 'Messages' in msg_batch:
msg_count = len(msg_batch['Messages'])
print(f"Batch size: {msg_count}")
for msg in msg_batch['Messages']:
#print(msg)
# Prepare S3 key
log_timestamp = time.gmtime(int(msg['Attributes']['SentTimestamp']) / 1000.)
s3_log_folder = f"{log_folder_prefix}/{log_timestamp.tm_year}/{log_timestamp.tm_mon}/{log_timestamp.tm_mday}/"
# Generate 16 character alphanumeric string for object name suffix
alphanum_key = ''.join(random.choices(string.ascii_letters + string.digits, k=16))
s3_object_name = f"{log_object_prefix}_{log_timestamp.tm_year}{log_timestamp.tm_mon}{log_timestamp.tm_mday}T{log_timestamp.tm_hour}{log_timestamp.tm_min}Z_{alphanum_key}.{object_ext}"
# Format message
this_msg = json.loads(msg['Body'])
this_msg = this_msg['Message']
if gzip_enabled.lower() in ['true', '1', 't', 'y', 'yes']:
# Compress message body
msg_body = zlib.compress(bytes(this_msg, 'utf-8'))
else:
msg_body = this_msg
try:
# Write log file to S3
s3_client.put_object(
Body=msg_body,
Bucket=s3_bucket_for_logging,
Key=s3_log_folder + s3_object_name,
ContentType=object_content_type
)
except Exception as e:
print(f'Unexpected error from S3 client: {e}')
raise e
try:
# Delete processed message from queue
sqs_client.delete_message(
QueueUrl=queue_url,
ReceiptHandle=msg['ReceiptHandle']
)
except Exception as e:
print(f'Unexpected error from SQS client: {e}')
raise e
# Add message total of this batch to report
processed_msg_count += len(msg_batch['Messages'])
else:
print("Batch size of 0. Finalizing and generating report...")
break
# Return report content
return {'queue_size': queue_size, 'batches': batch, 'processed_messages': processed_msg_count, 'avg_batch_size': round((processed_msg_count / batch), 3)}
def main():
report = process_messages()
print(report)
def lambda_handler(event, context):
report = process_messages()
return {
'report': report
}
if __name__ == '__main__': main()