将 JavaScript AWS Lambda 函数转换为 Python3 代码
import json import base64 import zlib import requests import datetime import hashlib import hmac import logging
endpoint = 'search-opensearch-p54yhgwanxupfliov2e63qnjeq.us-east-1.es.amazonaws.com' logFailedResponses = False
def handler(event, context): input_data = base64.b64decode(event['awslogs']['data']) decompressed_data = zlib.decompress(input_data, 16+zlib.MAX_WBITS) awslogs_data = json.loads(decompressed_data)
elasticsearch_bulk_data = transform(awslogs_data)
if not elasticsearch_bulk_data:
logging.info('Received a control message')
return 'Control message handled successfully'
response = post(elasticsearch_bulk_data)
if response.status_code != 200 or response.json().get('errors'):
logFailure(response.json(), elasticsearch_bulk_data)
raise Exception(response.text)
logging.info('Success: ' + response.text)
return 'Success'
def transform(payload): if payload['messageType'] == 'CONTROL_MESSAGE': return None
bulk_request_body = ''
for log_event in payload['logEvents']:
timestamp = datetime.datetime.fromtimestamp(log_event['timestamp']/1000.0)
index_name = 'cwl-' + timestamp.strftime('%Y.%m.%d')
source = buildSource(log_event['message'], log_event['extractedFields'])
source['@id'] = log_event['id']
source['@timestamp'] = timestamp.isoformat()
source['@message'] = log_event['message']
source['@owner'] = payload['owner']
source['@log_group'] = payload['logGroup']
source['@log_stream'] = payload['logStream']
action = { 'index': {} }
action['index']['_index'] = index_name
action['index']['_id'] = log_event['id']
bulk_request_body += json.dumps(action) + '\n' + json.dumps(source) + '\n'
return bulk_request_body
def buildSource(message, extractedFields): if extractedFields: source = {}
for key, value in extractedFields.items():
if value:
if isNumeric(value):
source[key] = 1 * value
continue
json_substring = extractJson(value)
if json_substring is not None:
source['$' + key] = json.loads(json_substring)
source[key] = value
return source
json_substring = extractJson(message)
if json_substring is not None:
return json.loads(json_substring)
return {}
def extractJson(message): json_start = message.find('{') if json_start < 0: return None json_substring = message[json_start:] return json_substring if isValidJson(json_substring) else None
def isValidJson(message): try: json.loads(message) return True except ValueError: return False
def isNumeric(n): return isinstance(n, (int, float))
def post(body): request_params = buildRequest(endpoint, body)
response = requests.post(request_params['url'], data=body, headers=request_params['headers'])
return response
def buildRequest(endpoint, body): endpoint_parts = endpoint.split('.') region = endpoint_parts[1] service = endpoint_parts[2] datetime_now = datetime.datetime.utcnow().strftime('%Y%m%dT%H%M%SZ') date = datetime_now[:8] k_date = hmac.new(('AWS4' + 'YOUR_AWS_SECRET_ACCESS_KEY').encode('utf-8'), date.encode('utf-8'), hashlib.sha256).digest() k_region = hmac.new(k_date, region.encode('utf-8'), hashlib.sha256).digest() k_service = hmac.new(k_region, service.encode('utf-8'), hashlib.sha256).digest() k_signing = hmac.new(k_service, b'aws4_request', hashlib.sha256).digest()
request = {
'method': 'POST',
'url': 'https://' + endpoint + '/_bulk',
'headers': {
'Content-Type': 'application/json',
'Host': endpoint,
'Content-Length': str(len(body)),
'X-Amz-Date': datetime_now,
'Authorization': ''
},
'data': body
}
canonical_headers = 'content-type:' + request['headers']['Content-Type'] + '\n' + 'host:' + request['headers']['Host'] + '\n' + 'x-amz-date:' + request['headers']['X-Amz-Date'] + '\n'
signed_headers = 'content-type;host;x-amz-date'
canonical_request = request['method'] + '\n' + '/_bulk' + '\n' + '\n' + canonical_headers + '\n' + signed_headers + '\n' + hashlib.sha256(request['data'].encode('utf-8')).hexdigest()
credential_scope = date + '/' + region + '/' + service + '/aws4_request'
string_to_sign = 'AWS4-HMAC-SHA256' + '\n' + datetime_now + '\n' + credential_scope + '\n' + hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()
signature = hmac.new(k_signing, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()
authorization_header = 'AWS4-HMAC-SHA256 Credential=' + 'YOUR_AWS_ACCESS_KEY_ID' + '/' + credential_scope + ', ' + 'SignedHeaders=' + signed_headers + ', ' + 'Signature=' + signature
request['headers']['Authorization'] = authorization_header
return request
def logFailure(error, failedItems): if logFailedResponses: logging.error('Error: ' + json.dumps(error, indent=2))
if failedItems and len(failedItems) > 0:
logging.error('Failed Items: ' + json.dumps(failedItems, indent=2))
原文地址: https://www.cveoy.top/t/topic/p234 著作权归作者所有。请勿转载和采集!