import json import base64 import zlib import requests import datetime import hashlib import hmac import logging

endpoint = 'search-opensearch-p54yhgwanxupfliov2e63qnjeq.us-east-1.es.amazonaws.com' logFailedResponses = False

def handler(event, context): input_data = base64.b64decode(event['awslogs']['data']) decompressed_data = zlib.decompress(input_data, 16+zlib.MAX_WBITS) awslogs_data = json.loads(decompressed_data)

elasticsearch_bulk_data = transform(awslogs_data)

if not elasticsearch_bulk_data:
    logging.info('Received a control message')
    return 'Control message handled successfully'

response = post(elasticsearch_bulk_data)

if response.status_code != 200 or response.json().get('errors'):
    logFailure(response.json(), elasticsearch_bulk_data)
    raise Exception(response.text)

logging.info('Success: ' + response.text)
return 'Success'

def transform(payload): if payload['messageType'] == 'CONTROL_MESSAGE': return None

bulk_request_body = ''

for log_event in payload['logEvents']:
    timestamp = datetime.datetime.fromtimestamp(log_event['timestamp']/1000.0)

    index_name = 'cwl-' + timestamp.strftime('%Y.%m.%d')

    source = buildSource(log_event['message'], log_event['extractedFields'])
    source['@id'] = log_event['id']
    source['@timestamp'] = timestamp.isoformat()
    source['@message'] = log_event['message']
    source['@owner'] = payload['owner']
    source['@log_group'] = payload['logGroup']
    source['@log_stream'] = payload['logStream']

    action = { 'index': {} }
    action['index']['_index'] = index_name
    action['index']['_id'] = log_event['id']

    bulk_request_body += json.dumps(action) + '\n' + json.dumps(source) + '\n'

return bulk_request_body

def buildSource(message, extractedFields): if extractedFields: source = {}

    for key, value in extractedFields.items():
        if value:
            if isNumeric(value):
                source[key] = 1 * value
                continue

            json_substring = extractJson(value)
            if json_substring is not None:
                source['$' + key] = json.loads(json_substring)

            source[key] = value

    return source

json_substring = extractJson(message)
if json_substring is not None:
    return json.loads(json_substring)

return {}

def extractJson(message): json_start = message.find('{') if json_start < 0: return None json_substring = message[json_start:] return json_substring if isValidJson(json_substring) else None

def isValidJson(message): try: json.loads(message) return True except ValueError: return False

def isNumeric(n): return isinstance(n, (int, float))

def post(body): request_params = buildRequest(endpoint, body)

response = requests.post(request_params['url'], data=body, headers=request_params['headers'])

return response

def buildRequest(endpoint, body): endpoint_parts = endpoint.split('.') region = endpoint_parts[1] service = endpoint_parts[2] datetime_now = datetime.datetime.utcnow().strftime('%Y%m%dT%H%M%SZ') date = datetime_now[:8] k_date = hmac.new(('AWS4' + 'YOUR_AWS_SECRET_ACCESS_KEY').encode('utf-8'), date.encode('utf-8'), hashlib.sha256).digest() k_region = hmac.new(k_date, region.encode('utf-8'), hashlib.sha256).digest() k_service = hmac.new(k_region, service.encode('utf-8'), hashlib.sha256).digest() k_signing = hmac.new(k_service, b'aws4_request', hashlib.sha256).digest()

request = {
    'method': 'POST',
    'url': 'https://' + endpoint + '/_bulk',
    'headers': {
        'Content-Type': 'application/json',
        'Host': endpoint,
        'Content-Length': str(len(body)),
        'X-Amz-Date': datetime_now,
        'Authorization': ''
    },
    'data': body
}

canonical_headers = 'content-type:' + request['headers']['Content-Type'] + '\n' + 'host:' + request['headers']['Host'] + '\n' + 'x-amz-date:' + request['headers']['X-Amz-Date'] + '\n'

signed_headers = 'content-type;host;x-amz-date'

canonical_request = request['method'] + '\n' + '/_bulk' + '\n' + '\n' + canonical_headers + '\n' + signed_headers + '\n' + hashlib.sha256(request['data'].encode('utf-8')).hexdigest()

credential_scope = date + '/' + region + '/' + service + '/aws4_request'

string_to_sign = 'AWS4-HMAC-SHA256' + '\n' + datetime_now + '\n' + credential_scope + '\n' + hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()

signature = hmac.new(k_signing, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()

authorization_header = 'AWS4-HMAC-SHA256 Credential=' + 'YOUR_AWS_ACCESS_KEY_ID' + '/' + credential_scope + ', ' + 'SignedHeaders=' + signed_headers + ', ' + 'Signature=' + signature

request['headers']['Authorization'] = authorization_header

return request

def logFailure(error, failedItems): if logFailedResponses: logging.error('Error: ' + json.dumps(error, indent=2))

    if failedItems and len(failedItems) > 0:
        logging.error('Failed Items: ' + json.dumps(failedItems, indent=2))
将 JavaScript AWS Lambda 函数转换为 Python3 代码

原文地址: https://www.cveoy.top/t/topic/p234 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录