Sending Custom Payload Via Kinesis to OpenObserve

TLDR Asher struggled to send a custom payload via Kinesis to OpenObserve and see it there. With assistance, Prabhat provided code examples and solutions to handle the payload format for successfully transmitting the information.

Photo of Asher
Asher
Wed, 01 Nov 2023 22:58:45 UTC

Is there an example somewhere of sending a custom payload via Kinesis to OpenObserve? My Kinesis Firehose says successful, but not seeing anything in open observe. Tried a bunch of different approaches but nothing seems to be working. Normal things from AWS directly like log groups work, but when trying to send custom payloads to kinesis and then to OO, no luck. I read in the OO docs that maybe should be base64 encoded. Is there any documentation on custom Kineses Firehose payloads and what format they should be to Kinesis so they stream into OpenObserve correctly? This is an AWS Lambda excerpt: ``` records = [] # Extract the records from the event for record in event['Records']: message_body = record['body'] print(f"SQS message {record['messageId']}: {message_body}") # Prepare the message for Kinesis Firehose firehose_record = { 'Data': message_body + "\n" } records.append(firehose_record) # Put records into Firehose stream if records: response = firehose_client.put_record_batch( DeliveryStreamName=FIREHOSE_STREAM_NAME, Records=records ) print(f"Firehose Response: {json.dumps(response)}") return { 'statusCode': 200, 'body': json.dumps(f"Successfully processed {len(event['Records'])} messages.") }```

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 22:59:22 UTC

Kinesis firehose sends data in a different format

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 22:59:49 UTC

You can find details about it here -

Photo of Asher
Asher
Wed, 01 Nov 2023 23:03:02 UTC

Right, in this case I'm sending to Kinsesis Firehose in the format it expects a blob. It should encode it on its own before sending to OO, no?

Photo of Asher
Asher
Wed, 01 Nov 2023 23:03:07 UTC

Or do I have to encode it myself first?

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:03:16 UTC

ah sorry

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:03:32 UTC

what I gave you s about sending from kinesis firehose to OpenObserve

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:03:44 UTC

I think you are trying to send from lambda to firehose

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:04:18 UTC

Are you trying to use kinesis directly?

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:04:22 UTC

or kinesis firehose?

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:04:28 UTC

Ah, I see the code

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:04:30 UTC

its firehose

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:04:50 UTC

Let me get you a sample code

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:05:08 UTC

Are you able to send from lambda to firehose?

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:05:20 UTC

d you get successful response from firehose?

Photo of Asher
Asher
Wed, 01 Nov 2023 23:06:12 UTC

For example my payload to Kinesis Firehose is: `[{'Data': "Hello from SQS, trying to put this in the logs. Why isn't this working!!\n"}]`

Photo of Asher
Asher
Wed, 01 Nov 2023 23:06:34 UTC

Yes, lambda is sending fine, says success and everything

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:07:35 UTC

so firehose accepts it but you don't see that in OpenObserve

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:08:19 UTC

By default messages from firehose will not go to OpenObserve immediately as firehose batches them. By default for 5 MB or 5 minutes

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:08:51 UTC

So if you have configured things correctly then you should be able to see data in OpenObserve after 5 minutes

Photo of Asher
Asher
Wed, 01 Nov 2023 23:09:08 UTC

I thought so too but has been many minutes now

Photo of Asher
Asher
Wed, 01 Nov 2023 23:09:13 UTC

and many tries

Photo of Asher
Asher
Wed, 01 Nov 2023 23:09:53 UTC

Yes, and other things are going in fine from log groups, etc to the same firehose, just not my custom messages

Photo of Asher
Asher
Wed, 01 Nov 2023 23:11:00 UTC

Do I need more things in the data payload like json components, timestamp, etc, etc.?

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:13:02 UTC

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:13:10 UTC

```response = client.put_record_batch( DeliveryStreamName='string', Records=[ { 'Data': b'bytes' }, ] )```

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:13:18 UTC

this is how it needs to look

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:13:34 UTC

and you got it right

Photo of Asher
Asher
Wed, 01 Nov 2023 23:14:13 UTC

Yes, typical cryptic AWS docs. Should I add the newline to the message. Do I need to base64 encode beforehand?

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:14:44 UTC

It seems that Data is not supposed to be string but `bytes`

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:15:26 UTC

Let me write a quick sample

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:15:32 UTC

and check for myself

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:15:42 UTC

but what are you trying to do?

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:15:50 UTC

I mean what is your final goal?

Photo of Asher
Asher
Wed, 01 Nov 2023 23:15:53 UTC

Thanks :slightly_smiling_face: You should also add a canonical example in the docs for custom streaming.

Photo of Asher
Asher
Wed, 01 Nov 2023 23:16:02 UTC

I have an SQS filled with logs from all over

Photo of Asher
Asher
Wed, 01 Nov 2023 23:16:20 UTC

that SQS is being processed via lambda to send them over to kinesis firehose

Photo of Asher
Asher
Wed, 01 Nov 2023 23:16:28 UTC

which is already hooked up to stream into Open Observe

Photo of Asher
Asher
Wed, 01 Nov 2023 23:17:16 UTC

and plan is throughout AWS where it's native send to firehose, and where it's not, send directly to firehose via SQS and lambda.

Photo of Asher
Asher
Wed, 01 Nov 2023 23:17:48 UTC

for example, CloudWatch Log Groups is native and is hooked into the same firehose and works fine

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:18:07 UTC

got it

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:21:57 UTC

Are you triggering lambda from SQS or polling SQS using lambda periodically?

Photo of Asher
Asher
Wed, 01 Nov 2023 23:25:43 UTC

It's triggering from SQS, already sending the messages directly to lambda via event.

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:25:52 UTC

ok

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:49:34 UTC

so the reason it is failing is that you are sending custom data to kinesis firehose

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:49:36 UTC

```{ "records": [ { "data": "VGhpcyBpcyBhIHNpbXBsZSBzdHJpbmcu" } ] }```

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:49:56 UTC

this is the format in which custom data that you send will be sent to OpenObserve

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:50:28 UTC

AWS services send data in difefernt formats

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:51:13 UTC

Currently OpenObserve can parse cloudwatch logs and VPC flow logs through firehose. Each of these had ti be implemented separately in OpenObserve

Photo of Prabhat
Prabhat
Wed, 01 Nov 2023 23:51:33 UTC

you would be better off sending data directly to OPenObserve from lambda in this case for now

Photo of Prabhat
Prabhat
Thu, 02 Nov 2023 00:05:36 UTC

```import json import boto3 import os def send_to_firehose(data): # Initialize a Kinesis Firehose client in the us-west-2 region firehose = boto3.client('firehose', region_name='us-west-2') # Get the delivery stream name from environment variables delivery_stream_name = 'asher' # Prepare the records for batch sending records = [{'Data': bytes(record, 'utf-8')} for record in data] # Send the records to Kinesis Firehose response = firehose.put_record_batch( DeliveryStreamName=delivery_stream_name, Records=records ) print("Number of records sent:", len(records)) return response def lambda_handler(event, context): # Define the data to be sent to Kinesis Firehose data = ["This is a simple string.", "Another string.", "Yet another one."] response = send_to_firehose(data) return { 'statusCode': 200, 'body': json.dumps(f'{len(data)} records sent to Kinesis Firehose successfully!') } if __name__ == "__main__": # Test the function locally event = {} # Define your test event if necessary context = {} # Define your test context if necessary response = lambda_handler(event, context) print("Lambda function response:", response)```

Photo of Prabhat
Prabhat
Thu, 02 Nov 2023 00:05:42 UTC

I used this code for testing

Photo of Prabhat
Prabhat
Thu, 02 Nov 2023 00:05:49 UTC

you can try runningthis locally on your laptop

Photo of Prabhat
Prabhat
Thu, 02 Nov 2023 00:06:16 UTC

It will send data to Firehose. However since the format is not expected OpenObserve rejects it

Photo of Prabhat
Prabhat
Thu, 02 Nov 2023 00:07:01 UTC

In the issue you will notice the 2 different formats of data `cloudwatch` and `vpc flow logs`

Photo of Prabhat
Prabhat
Thu, 02 Nov 2023 00:07:30 UTC

Each AWS service send s data according to its own format

Photo of Asher
Asher
Thu, 02 Nov 2023 00:18:19 UTC

Right, but can't I just emulate that format and put my own message and specify my own values for the values it would normally send.

Photo of Asher
Asher
Thu, 02 Nov 2023 00:18:25 UTC

Open Observe somehow decodes it and renders it

Photo of Asher
Asher
Thu, 02 Nov 2023 00:19:30 UTC

so I presume I should be able to encode the message in the same way and you can then decode it as expected and keep the single firehouse.

Photo of Asher
Asher
Thu, 02 Nov 2023 00:20:04 UTC

You just need a doc example showing what it should look like beforehand, the structure. How to encode it so it's valid for Open Observe to decode, and that's that, would work with OO :slightly_smiling_face:

Photo of Asher
Asher
Thu, 02 Nov 2023 00:20:57 UTC

Yes, if I have to curl I will, but was hoping to use Firehose for everything. Please add to your queue as it's super useful and nice to send custom things through kinesis :slightly_smiling_face:

Photo of Asher
Asher
Thu, 02 Nov 2023 00:21:50 UTC

Thanks for the quick assistance on this matter, now won't have to spend any more hours on it. I'm not crazy :slightly_smiling_face: . It was sending, just not being parsed on OO side since it didn't match the formats you currently support and have it mapped to.

Photo of Prabhat
Prabhat
Thu, 02 Nov 2023 00:29:09 UTC

You could emulate the format and it would work

Photo of Prabhat
Prabhat
Thu, 02 Nov 2023 00:29:20 UTC

Will get you some docs on it

Photo of Prabhat
Prabhat
Thu, 02 Nov 2023 00:30:12 UTC

We just did not expect anyone to send messages directly to Firehose and then to open observe

Photo of Asher
Asher
Thu, 02 Nov 2023 01:46:15 UTC

Thanks, would love some docs. I'd like not to get into the habit of additional curls, and I can easily ask GPT to match the format with the information I'm sending :slightly_smiling_face: Haha, that's interesting. It seemed like the most logical thing to do honestly. Especially if one already has a hose running.

Photo of Asher
Asher
Thu, 02 Nov 2023 12:28:08 UTC

Hello hello! Any luck on that doc? Or think I'll have to curl in the meantime?

Photo of Prabhat
Prabhat
Thu, 02 Nov 2023 13:32:35 UTC

We fixed the kinesis data response format issue.

Photo of Prabhat
Prabhat
Thu, 02 Nov 2023 13:32:40 UTC

getting you docs shortly

Photo of Prabhat
Prabhat
Thu, 02 Nov 2023 14:34:00 UTC

I need to run some more tests on that

Photo of Prabhat
Prabhat
Thu, 02 Nov 2023 16:19:24 UTC

```import json import base64 import boto3 import os def send_to_firehose(data): # Initialize a Kinesis Firehose client in the us-west-2 region firehose = boto3.client('firehose', region_name='us-west-2') # Get the delivery stream name from environment variables delivery_stream_name = 'asher' # Prepare the records for batch sending records = [] for record in data: encoded_value = json.dumps(record).encode('utf-8') records.append( {'Data': bytes(encoded_value)}) # records = [{'Data': bytes(base64.b64encode(record.encode('utf-8')), 'utf-8')} for record in data] # Send the records to Kinesis Firehose response = firehose.put_record_batch( DeliveryStreamName=delivery_stream_name, Records=records ) print("Number of records sent:", len(records)) return response def lambda_handler(event, context): # Define the data to be sent to Kinesis Firehose data = [ { "message": "Hello from Lambda 1"} ] response = send_to_firehose(data) return { 'statusCode': 200, 'body': json.dumps(f'{len(data)} records sent to Kinesis Firehose successfully!') } if __name__ == "__main__": # Test the function locally event = {} # Define your test event if necessary context = {} # Define your test context if necessary response = lambda_handler(event, context) print("Lambda function response:", response)```

Photo of Prabhat
Prabhat
Thu, 02 Nov 2023 16:19:28 UTC

You could use this example

Photo of Prabhat
Prabhat
Thu, 02 Nov 2023 16:19:53 UTC

This script sends data to Kinesis firehose

Photo of Prabhat
Prabhat
Thu, 02 Nov 2023 16:20:07 UTC

which you can then point to OpenObserve which will accept data

Photo of Prabhat
Prabhat
Thu, 02 Nov 2023 16:24:37 UTC

Essentially the data that you are sending to OpenObserve has to be in this format: ```{ "records": [ { "data": "eyAibW9yZSI6ICJkYXRhIiB9" } ], "requestId": "46f65bae-3b62-4d32-a022-c5952f3a0c3f", "timestamp": 1698890892000 }```

Photo of Prabhat
Prabhat
Thu, 02 Nov 2023 16:25:34 UTC

`data` in this case will be `base64_encode(string(json_document))`

Photo of Asher
Asher
Fri, 03 Nov 2023 11:43:49 UTC

Thank you!!!!

Photo of Asher
Asher
Fri, 03 Nov 2023 12:56:44 UTC

I tried that I believe, still not seeing any items in. Maybe you can catch what I'm doing wrong. I tried both variant below with and without 'body'

Photo of Asher
Asher
Fri, 03 Nov 2023 12:57:00 UTC

```import json import os import boto3 # Initialize the Firehose client firehose_client = boto3.client('firehose') # The name of your Firehose stream FIREHOSE_STREAM_NAME = os.environ['LOG_OO_FIREHOSE_STREAM_NAME'] def main_handler(event, context): try: print(FIREHOSE_STREAM_NAME) records = [] # Extract the records from the event for record in event['Records']: encoded_message_body = json.dumps(record['body']).encode('utf-8') print(f"SQS message {record['messageId']}: {record['body']}") # Prepare the message for Kinesis Firehose firehose_record = { 'Data': bytes(encoded_message_body) } records.append(firehose_record) print("Records are:") print(records) # Put records into Firehose stream if records: response = firehose_client.put_record_batch( DeliveryStreamName=FIREHOSE_STREAM_NAME, Records=records ) return { 'statusCode': 200, 'body': json.dumps(f"Successfully processed {len(event['Records'])} messages.") } except Exception as e: print(f"Error: {str(e)}") return { 'statusCode': 500, 'body': json.dumps("An error occurred.") }```

Photo of Asher
Asher
Fri, 03 Nov 2023 12:57:25 UTC

Response without body was: ```START RequestId: REDACTED Version: $LATEST LOG_BRB_TO_OPENOBSERVE SQS message REDACTED Hello from SQS, trying to put this in the logs. Why isn't this working!! Records are: [{'Data': b'{"messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78", "receiptHandle": "MessageReceiptHandle", "body": "Hello from SQS, trying to put this in the logs. Why isn\'t this working!!", "attributes": {"ApproximateReceiveCount": "1", "SentTimestamp": "1523232000000", "SenderId": "123456789012", "ApproximateFirstReceiveTimestamp": "1523232000001"}, "messageAttributes": {}, "md5OfBody": "{{{md5_of_body}}}", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:MyQueue", "awsRegion": "us-east-1"}'}] END RequestId: REDACTED REPORT RequestId: REDACTAED Duration: 241.25 ms Billed Duration: 242 ms Memory Size: 128 MB Max Memory Used: 73 MB Init Duration: 362.72 ms```

Photo of Asher
Asher
Fri, 03 Nov 2023 12:57:44 UTC

Response with body was: ``` START RequestId: REDACTED Version: $LATEST LOG_BRB_TO_OPENOBSERVE SQS message REDACTED Hello from SQS, trying to put this in the logs. Why isn't this working!! Records are: [{'Data': b'"Hello from SQS, trying to put this in the logs. Why isn\'t this working!!"'}] END RequestId: REDACTED REPORT RequestId: REDACTED Duration: 235.79 ms Billed Duration: 236 ms Memory Size: 128 MB Max Memory Used: 73 MB Init Duration: 303.27 ms```

Photo of Asher
Asher
Fri, 03 Nov 2023 12:58:27 UTC

They both gave responses ```Response { "statusCode": 200, "body": "\"Successfully processed 1 messages.\"" }```

Photo of Asher
Asher
Fri, 03 Nov 2023 12:59:03 UTC

Nothing shows in the stream

Photo of Prabhat
Prabhat
Fri, 03 Nov 2023 13:06:51 UTC

Give me a couple hours. Will test this and come back.

Photo of Prabhat
Prabhat
Fri, 03 Nov 2023 21:05:14 UTC

Here is working code for you: ```import json import os import boto3 # Initialize the Firehose client firehose_client = boto3.client('firehose') # The name of your Firehose stream # FIREHOSE_STREAM_NAME = os.environ['LOG_OO_FIREHOSE_STREAM_NAME'] FIREHOSE_STREAM_NAME = 'asher' def main_handler(event, context): try: # print(FIREHOSE_STREAM_NAME) records = [] # print("Event is:", event) # Extract the records from the event for record in event['Records']: # record is stringified json. Let's parse it record = json.loads(record['Data']) record_data_to_send = { "body": record['body'] } encoded_record_data_to_send = json.dumps( record_data_to_send).encode('utf-8') # encoded_message_body = json.dumps(record['body']).encode('utf-8') # print(f"SQS message {record['messageId']}: {record['body']}") # Prepare the message for Kinesis Firehose firehose_record = { 'Data': bytes(encoded_record_data_to_send) } records.append(firehose_record) print("Records are:") print(records) # Put records into Firehose stream if records: response = firehose_client.put_record_batch( DeliveryStreamName=FIREHOSE_STREAM_NAME, Records=records ) return { 'statusCode': 200, 'body': json.dumps(f"Successfully processed {len(event['Records'])} messages.") } except Exception as e: print(f"Error: {str(e)}") return { 'statusCode': 500, 'body': json.dumps("An error occurred.") } if __name__ == "__main__": # Test the function locally event = { "Records": [{'Data': b'{"messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78", "receiptHandle": "MessageReceiptHandle", "body": "Hello from SQS, trying to put this in the logs. Why isn\'t this working!!", "attributes": {"ApproximateReceiveCount": "1", "SentTimestamp": "1523232000000", "SenderId": "123456789012", "ApproximateFirstReceiveTimestamp": "1523232000001"}, "messageAttributes": {}, "md5OfBody": "{{{md5_of_body}}}", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:MyQueue", "awsRegion": "us-east-1"}'}] } # Define your test event if necessary context = {} # Define your test context if necessary response = main_handler(event, context) print("Lambda function response:", response)```

Photo of Prabhat
Prabhat
Fri, 03 Nov 2023 21:06:55 UTC

Your records was: `[{'Data': b'"Hello from SQS, trying to put this in the logs. Why isn\'t this working!!"'}]`

Photo of Prabhat
Prabhat
Fri, 03 Nov 2023 21:07:10 UTC

What was needed was: `[{'Data': b'{"body": "Hello from SQS, trying to put this in the logs. Why isn\'t this working!!"}'}]`

Photo of Asher
Asher
Sat, 04 Nov 2023 17:56:29 UTC

Worked!!!! Thank you so much. Above and beyond! Write a doc article on it for Kinesis streams, a custom section :slightly_smiling_face:

Photo of Asher
Asher
Sat, 04 Nov 2023 17:56:43 UTC

:hugging_face:

Photo of Asher
Asher
Sat, 04 Nov 2023 18:17:57 UTC

Next step will be to figure out how I can send structure JSON in the body or replacement to body so the fields as JSON show up as columns :slightly_smiling_face:. If I replace body with just the object doesn't work, but good enough for now. Just curious for the future as you're able, thanks!!! For example, now we get in the log: {"_timestamp":1699121596482414,"body":"{\"log-type\":\"event_log\",\"payload\":{\"EventLogId\":0,\"EventDate\":\"2023-11-04T11:12:49.0100876-07:00\",\"Login\":\"452\",\"EventCode\":\"API\",\"Description\":\"Payload {\\\"productCode\\\":\\\"REDATED\\\",\\\"promoCode\\\":\\\"\\\",\\\"userLogin\\\":\\\"REDACTED\\\",\\\"userPassword\\\":\\\"REDACTED\\\",\\\"customerReferenceNumber\\\":\\\"REDACTED\\\",\\\"replaceServiceNumberWithCRN\\\":false,\\\"flightDetails\\\":\\\"Flight Details\\\",\\\"departureDt\\\":\\\"09/09/2023 10:20:00\\\",\\\"lastArrivalDt\\\":\\\"09/09/2023 15:50:00\\\",\\\"currencyCode\\\":\\\"USD\\\",\\\"REDACTED\\\":\\\"\\\",\\\"passengerList\\\":[{\\\"orderSequence\\\":1,\\\"lastName\\\":\\\"REDACTED\\\",\\\"firstName\\\":\\\"ASD QWEDRF\\\",\\\"airlineCode\\\":\\\"REDACTED\\\",\\\"REDACTED\\\":\\\"REDACTED\\\",\\\"REDACTED\\\":\\\"REDACTED\\\",\\\"sendSMS\\\":true}]}\"}}"}

Photo of Prabhat
Prabhat
Sat, 04 Nov 2023 18:19:02 UTC

'Data' should be json

Photo of Prabhat
Prabhat
Sat, 04 Nov 2023 18:19:15 UTC

Then it will work with any number of fields

Photo of Prabhat
Prabhat
Sat, 04 Nov 2023 18:19:38 UTC

Add more fields alongside body

Photo of Asher
Asher
Sat, 04 Nov 2023 18:21:32 UTC

But what if I just want to assign it. For example if the actual message body consists of a JSON object like so:

Photo of Asher
Asher
Sat, 04 Nov 2023 18:22:47 UTC

I thought I could just replace the `record_data_to_send` with the JSON object and do everything else as we had, but that doesn't stream in for some reason.

Photo of Asher
Asher
Sat, 04 Nov 2023 18:23:02 UTC

But if you're saying it should work, I'll test a bit more with some variations and see.

Photo of Asher
Asher
Sat, 04 Nov 2023 18:24:18 UTC

Technically: ``` record_data_to_send = { "body": record['body'] }``` should be replaceable with ``` record_data_to_send = record['body']``` if record['body'] is a valid JSON object, right?

Photo of Asher
Asher
Sat, 04 Nov 2023 18:24:47 UTC

So maybe it's a string already and need to decode it before assignment. Will investigate.

Photo of Asher
Asher
Sat, 04 Nov 2023 18:25:10 UTC

But hey, good stuff either way. If I get the json columns mapping that's just nice gravy. Since then can sort, search, columns, etc.

Photo of Asher
Asher
Sat, 04 Nov 2023 18:25:28 UTC

And who knows, maybe one day you'll add DataDog Logs like Patterns, then I'll be super happy :))

Photo of Prabhat
Prabhat
Sat, 04 Nov 2023 18:31:38 UTC

Am out. Will respond shortly.

Photo of Prabhat
Prabhat
Sat, 04 Nov 2023 19:47:10 UTC

Try this: ```import json import os import boto3 # Initialize the Firehose client firehose_client = boto3.client('firehose') # The name of your Firehose stream # FIREHOSE_STREAM_NAME = os.environ['LOG_OO_FIREHOSE_STREAM_NAME'] FIREHOSE_STREAM_NAME = 'asher' def main_handler(event, context): try: # print(FIREHOSE_STREAM_NAME) records = [] # Extract the records from the event for record in event['Records']: # record is stringified json. Let's parse it record = json.loads(record['Data']) record_data_to_send = record['body'] encoded_record_data_to_send = json.dumps( record_data_to_send).encode('utf-8') # Prepare the message for Kinesis Firehose firehose_record = { 'Data': bytes(encoded_record_data_to_send) } records.append(firehose_record) print("Records are:") print(records) # Put records into Firehose stream if records: response = firehose_client.put_record_batch( DeliveryStreamName=FIREHOSE_STREAM_NAME, Records=records ) return { 'statusCode': 200, 'body': json.dumps(f"Successfully processed {len(event['Records'])} messages.") } except Exception as e: print(f"Error: {str(e)}") return { 'statusCode': 500, 'body': json.dumps("An error occurred.") } if __name__ == "__main__": # Test the function locally log_message = { "hello": "world", "asher": "snyder", "king": "of the world" } sqs_message = { "messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78", "receiptHandle": "MessageReceiptHandle", "body": log_message, "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1523232000000", "SenderId": "123456789012", "ApproximateFirstReceiveTimestamp": "1523232000001" }, "messageAttributes": {}, "md5OfBody": "{{{md5_of_body}}}", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:MyQueue", "awsRegion": "us-east-1" } event = { "Records": [{'Data': bytes(json.dumps(sqs_message).encode('utf-8'))}] } # Define your test event if necessary context = {} # Define your test context if necessary response = main_handler(event, context) print("Lambda function response:", response)```

Photo of Prabhat
Prabhat
Sat, 04 Nov 2023 19:47:35 UTC

Photo of Asher
Asher
Sat, 04 Nov 2023 22:57:41 UTC

Wow, thanks so much. Will try it out and let you know!!! Really appreciate the help here. Above and beyond, really. Thanks!!!

Photo of Prabhat
Prabhat
Mon, 06 Nov 2023 04:38:07 UTC

Also, if you are not able to do it in your lambda function or from wherever you are sending data, you can always use VRL functions with OpenObserve to parse and massage the data within OpenObserve