TLDR Asher struggled to send a custom payload via Kinesis to OpenObserve and see it there. With assistance, Prabhat provided code examples and solutions to handle the payload format for successfully transmitting the information.
Kinesis firehose sends data in a different format
You can find details about it here -
Right, in this case I'm sending to Kinsesis Firehose in the format it expects a blob. It should encode it on its own before sending to OO, no?
Or do I have to encode it myself first?
ah sorry
what I gave you s about sending from kinesis firehose to OpenObserve
I think you are trying to send from lambda to firehose
Are you trying to use kinesis directly?
or kinesis firehose?
Ah, I see the code
its firehose
Let me get you a sample code
Are you able to send from lambda to firehose?
d you get successful response from firehose?
For example my payload to Kinesis Firehose is: `[{'Data': "Hello from SQS, trying to put this in the logs. Why isn't this working!!\n"}]`
Yes, lambda is sending fine, says success and everything
so firehose accepts it but you don't see that in OpenObserve
By default messages from firehose will not go to OpenObserve immediately as firehose batches them. By default for 5 MB or 5 minutes
So if you have configured things correctly then you should be able to see data in OpenObserve after 5 minutes
I thought so too but has been many minutes now
and many tries
Yes, and other things are going in fine from log groups, etc to the same firehose, just not my custom messages
Do I need more things in the data payload like json components, timestamp, etc, etc.?
```response = client.put_record_batch( DeliveryStreamName='string', Records=[ { 'Data': b'bytes' }, ] )```
this is how it needs to look
and you got it right
Yes, typical cryptic AWS docs. Should I add the newline to the message. Do I need to base64 encode beforehand?
It seems that Data is not supposed to be string but `bytes`
Let me write a quick sample
and check for myself
but what are you trying to do?
I mean what is your final goal?
Thanks :slightly_smiling_face: You should also add a canonical example in the docs for custom streaming.
I have an SQS filled with logs from all over
that SQS is being processed via lambda to send them over to kinesis firehose
which is already hooked up to stream into Open Observe
and plan is throughout AWS where it's native send to firehose, and where it's not, send directly to firehose via SQS and lambda.
for example, CloudWatch Log Groups is native and is hooked into the same firehose and works fine
got it
Are you triggering lambda from SQS or polling SQS using lambda periodically?
It's triggering from SQS, already sending the messages directly to lambda via event.
ok
so the reason it is failing is that you are sending custom data to kinesis firehose
```{ "records": [ { "data": "VGhpcyBpcyBhIHNpbXBsZSBzdHJpbmcu" } ] }```
this is the format in which custom data that you send will be sent to OpenObserve
AWS services send data in difefernt formats
Currently OpenObserve can parse cloudwatch logs and VPC flow logs through firehose. Each of these had ti be implemented separately in OpenObserve
you would be better off sending data directly to OPenObserve from lambda in this case for now
```import json import boto3 import os def send_to_firehose(data): # Initialize a Kinesis Firehose client in the us-west-2 region firehose = boto3.client('firehose', region_name='us-west-2') # Get the delivery stream name from environment variables delivery_stream_name = 'asher' # Prepare the records for batch sending records = [{'Data': bytes(record, 'utf-8')} for record in data] # Send the records to Kinesis Firehose response = firehose.put_record_batch( DeliveryStreamName=delivery_stream_name, Records=records ) print("Number of records sent:", len(records)) return response def lambda_handler(event, context): # Define the data to be sent to Kinesis Firehose data = ["This is a simple string.", "Another string.", "Yet another one."] response = send_to_firehose(data) return { 'statusCode': 200, 'body': json.dumps(f'{len(data)} records sent to Kinesis Firehose successfully!') } if __name__ == "__main__": # Test the function locally event = {} # Define your test event if necessary context = {} # Define your test context if necessary response = lambda_handler(event, context) print("Lambda function response:", response)```
I used this code for testing
you can try runningthis locally on your laptop
It will send data to Firehose. However since the format is not expected OpenObserve rejects it
In the issue
Each AWS service send s data according to its own format
Right, but can't I just emulate that format and put my own message and specify my own values for the values it would normally send.
Open Observe somehow decodes it and renders it
so I presume I should be able to encode the message in the same way and you can then decode it as expected and keep the single firehouse.
You just need a doc example showing what it should look like beforehand, the structure. How to encode it so it's valid for Open Observe to decode, and that's that, would work with OO :slightly_smiling_face:
Yes, if I have to curl I will, but was hoping to use Firehose for everything. Please add to your queue as it's super useful and nice to send custom things through kinesis :slightly_smiling_face:
Thanks for the quick assistance on this matter, now won't have to spend any more hours on it. I'm not crazy :slightly_smiling_face: . It was sending, just not being parsed on OO side since it didn't match the formats you currently support and have it mapped to.
You could emulate the format and it would work
Will get you some docs on it
We just did not expect anyone to send messages directly to Firehose and then to open observe
Thanks, would love some docs. I'd like not to get into the habit of additional curls, and I can easily ask GPT to match the format with the information I'm sending :slightly_smiling_face: Haha, that's interesting. It seemed like the most logical thing to do honestly. Especially if one already has a hose running.
Hello hello! Any luck on that doc? Or think I'll have to curl in the meantime?
We fixed the kinesis data response format issue.
getting you docs shortly
I need to run some more tests on that
```import json import base64 import boto3 import os def send_to_firehose(data): # Initialize a Kinesis Firehose client in the us-west-2 region firehose = boto3.client('firehose', region_name='us-west-2') # Get the delivery stream name from environment variables delivery_stream_name = 'asher' # Prepare the records for batch sending records = [] for record in data: encoded_value = json.dumps(record).encode('utf-8') records.append( {'Data': bytes(encoded_value)}) # records = [{'Data': bytes(base64.b64encode(record.encode('utf-8')), 'utf-8')} for record in data] # Send the records to Kinesis Firehose response = firehose.put_record_batch( DeliveryStreamName=delivery_stream_name, Records=records ) print("Number of records sent:", len(records)) return response def lambda_handler(event, context): # Define the data to be sent to Kinesis Firehose data = [ { "message": "Hello from Lambda 1"} ] response = send_to_firehose(data) return { 'statusCode': 200, 'body': json.dumps(f'{len(data)} records sent to Kinesis Firehose successfully!') } if __name__ == "__main__": # Test the function locally event = {} # Define your test event if necessary context = {} # Define your test context if necessary response = lambda_handler(event, context) print("Lambda function response:", response)```
You could use this example
This script sends data to Kinesis firehose
which you can then point to OpenObserve which will accept data
Essentially the data that you are sending to OpenObserve has to be in this format: ```{ "records": [ { "data": "eyAibW9yZSI6ICJkYXRhIiB9" } ], "requestId": "46f65bae-3b62-4d32-a022-c5952f3a0c3f", "timestamp": 1698890892000 }```
`data` in this case will be `base64_encode(string(json_document))`
Thank you!!!!
I tried that I believe, still not seeing any items in. Maybe you can catch what I'm doing wrong. I tried both variant below with and without 'body'
```import json import os import boto3 # Initialize the Firehose client firehose_client = boto3.client('firehose') # The name of your Firehose stream FIREHOSE_STREAM_NAME = os.environ['LOG_OO_FIREHOSE_STREAM_NAME'] def main_handler(event, context): try: print(FIREHOSE_STREAM_NAME) records = [] # Extract the records from the event for record in event['Records']: encoded_message_body = json.dumps(record['body']).encode('utf-8') print(f"SQS message {record['messageId']}: {record['body']}") # Prepare the message for Kinesis Firehose firehose_record = { 'Data': bytes(encoded_message_body) } records.append(firehose_record) print("Records are:") print(records) # Put records into Firehose stream if records: response = firehose_client.put_record_batch( DeliveryStreamName=FIREHOSE_STREAM_NAME, Records=records ) return { 'statusCode': 200, 'body': json.dumps(f"Successfully processed {len(event['Records'])} messages.") } except Exception as e: print(f"Error: {str(e)}") return { 'statusCode': 500, 'body': json.dumps("An error occurred.") }```
Response without body was: ```START RequestId: REDACTED Version: $LATEST LOG_BRB_TO_OPENOBSERVE SQS message REDACTED Hello from SQS, trying to put this in the logs. Why isn't this working!! Records are: [{'Data': b'{"messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78", "receiptHandle": "MessageReceiptHandle", "body": "Hello from SQS, trying to put this in the logs. Why isn\'t this working!!", "attributes": {"ApproximateReceiveCount": "1", "SentTimestamp": "1523232000000", "SenderId": "123456789012", "ApproximateFirstReceiveTimestamp": "1523232000001"}, "messageAttributes": {}, "md5OfBody": "{{{md5_of_body}}}", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:MyQueue", "awsRegion": "us-east-1"}'}] END RequestId: REDACTED REPORT RequestId: REDACTAED Duration: 241.25 ms Billed Duration: 242 ms Memory Size: 128 MB Max Memory Used: 73 MB Init Duration: 362.72 ms```
Response with body was: ``` START RequestId: REDACTED Version: $LATEST LOG_BRB_TO_OPENOBSERVE SQS message REDACTED Hello from SQS, trying to put this in the logs. Why isn't this working!! Records are: [{'Data': b'"Hello from SQS, trying to put this in the logs. Why isn\'t this working!!"'}] END RequestId: REDACTED REPORT RequestId: REDACTED Duration: 235.79 ms Billed Duration: 236 ms Memory Size: 128 MB Max Memory Used: 73 MB Init Duration: 303.27 ms```
They both gave responses ```Response { "statusCode": 200, "body": "\"Successfully processed 1 messages.\"" }```
Nothing shows in the stream
Give me a couple hours. Will test this and come back.
Here is working code for you: ```import json import os import boto3 # Initialize the Firehose client firehose_client = boto3.client('firehose') # The name of your Firehose stream # FIREHOSE_STREAM_NAME = os.environ['LOG_OO_FIREHOSE_STREAM_NAME'] FIREHOSE_STREAM_NAME = 'asher' def main_handler(event, context): try: # print(FIREHOSE_STREAM_NAME) records = [] # print("Event is:", event) # Extract the records from the event for record in event['Records']: # record is stringified json. Let's parse it record = json.loads(record['Data']) record_data_to_send = { "body": record['body'] } encoded_record_data_to_send = json.dumps( record_data_to_send).encode('utf-8') # encoded_message_body = json.dumps(record['body']).encode('utf-8') # print(f"SQS message {record['messageId']}: {record['body']}") # Prepare the message for Kinesis Firehose firehose_record = { 'Data': bytes(encoded_record_data_to_send) } records.append(firehose_record) print("Records are:") print(records) # Put records into Firehose stream if records: response = firehose_client.put_record_batch( DeliveryStreamName=FIREHOSE_STREAM_NAME, Records=records ) return { 'statusCode': 200, 'body': json.dumps(f"Successfully processed {len(event['Records'])} messages.") } except Exception as e: print(f"Error: {str(e)}") return { 'statusCode': 500, 'body': json.dumps("An error occurred.") } if __name__ == "__main__": # Test the function locally event = { "Records": [{'Data': b'{"messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78", "receiptHandle": "MessageReceiptHandle", "body": "Hello from SQS, trying to put this in the logs. Why isn\'t this working!!", "attributes": {"ApproximateReceiveCount": "1", "SentTimestamp": "1523232000000", "SenderId": "123456789012", "ApproximateFirstReceiveTimestamp": "1523232000001"}, "messageAttributes": {}, "md5OfBody": "{{{md5_of_body}}}", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:MyQueue", "awsRegion": "us-east-1"}'}] } # Define your test event if necessary context = {} # Define your test context if necessary response = main_handler(event, context) print("Lambda function response:", response)```
Your records was: `[{'Data': b'"Hello from SQS, trying to put this in the logs. Why isn\'t this working!!"'}]`
What was needed was: `[{'Data': b'{"body": "Hello from SQS, trying to put this in the logs. Why isn\'t this working!!"}'}]`
Worked!!!! Thank you so much. Above and beyond! Write a doc article on it for Kinesis streams, a custom section :slightly_smiling_face:
:hugging_face:
Next step will be to figure out how I can send structure JSON in the body or replacement to body so the fields as JSON show up as columns :slightly_smiling_face:. If I replace body with just the object doesn't work, but good enough for now. Just curious for the future as you're able, thanks!!! For example, now we get in the log: {"_timestamp":1699121596482414,"body":"{\"log-type\":\"event_log\",\"payload\":{\"EventLogId\":0,\"EventDate\":\"2023-11-04T11:12:49.0100876-07:00\",\"Login\":\"452\",\"EventCode\":\"API\",\"Description\":\"Payload {\\\"productCode\\\":\\\"REDATED\\\",\\\"promoCode\\\":\\\"\\\",\\\"userLogin\\\":\\\"REDACTED\\\",\\\"userPassword\\\":\\\"REDACTED\\\",\\\"customerReferenceNumber\\\":\\\"REDACTED\\\",\\\"replaceServiceNumberWithCRN\\\":false,\\\"flightDetails\\\":\\\"Flight Details\\\",\\\"departureDt\\\":\\\"09/09/2023 10:20:00\\\",\\\"lastArrivalDt\\\":\\\"09/09/2023 15:50:00\\\",\\\"currencyCode\\\":\\\"USD\\\",\\\"REDACTED\\\":\\\"\\\",\\\"passengerList\\\":[{\\\"orderSequence\\\":1,\\\"lastName\\\":\\\"REDACTED\\\",\\\"firstName\\\":\\\"ASD QWEDRF\\\",\\\"airlineCode\\\":\\\"REDACTED\\\",\\\"REDACTED\\\":\\\"REDACTED\\\",\\\"REDACTED\\\":\\\"REDACTED\\\",\\\"sendSMS\\\":true}]}\"}}"}
'Data' should be json
Then it will work with any number of fields
Add more fields alongside body
But what if I just want to assign it. For example if the actual message body consists of a JSON object like so:
I thought I could just replace the `record_data_to_send` with the JSON object and do everything else as we had, but that doesn't stream in for some reason.
But if you're saying it should work, I'll test a bit more with some variations and see.
Technically: ``` record_data_to_send = { "body": record['body'] }``` should be replaceable with ``` record_data_to_send = record['body']``` if record['body'] is a valid JSON object, right?
So maybe it's a string already and need to decode it before assignment. Will investigate.
But hey, good stuff either way. If I get the json columns mapping that's just nice gravy. Since then can sort, search, columns, etc.
And who knows, maybe one day you'll add DataDog Logs like Patterns, then I'll be super happy :))
Am out. Will respond shortly.
Try this: ```import json import os import boto3 # Initialize the Firehose client firehose_client = boto3.client('firehose') # The name of your Firehose stream # FIREHOSE_STREAM_NAME = os.environ['LOG_OO_FIREHOSE_STREAM_NAME'] FIREHOSE_STREAM_NAME = 'asher' def main_handler(event, context): try: # print(FIREHOSE_STREAM_NAME) records = [] # Extract the records from the event for record in event['Records']: # record is stringified json. Let's parse it record = json.loads(record['Data']) record_data_to_send = record['body'] encoded_record_data_to_send = json.dumps( record_data_to_send).encode('utf-8') # Prepare the message for Kinesis Firehose firehose_record = { 'Data': bytes(encoded_record_data_to_send) } records.append(firehose_record) print("Records are:") print(records) # Put records into Firehose stream if records: response = firehose_client.put_record_batch( DeliveryStreamName=FIREHOSE_STREAM_NAME, Records=records ) return { 'statusCode': 200, 'body': json.dumps(f"Successfully processed {len(event['Records'])} messages.") } except Exception as e: print(f"Error: {str(e)}") return { 'statusCode': 500, 'body': json.dumps("An error occurred.") } if __name__ == "__main__": # Test the function locally log_message = { "hello": "world", "asher": "snyder", "king": "of the world" } sqs_message = { "messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78", "receiptHandle": "MessageReceiptHandle", "body": log_message, "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1523232000000", "SenderId": "123456789012", "ApproximateFirstReceiveTimestamp": "1523232000001" }, "messageAttributes": {}, "md5OfBody": "{{{md5_of_body}}}", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:MyQueue", "awsRegion": "us-east-1" } event = { "Records": [{'Data': bytes(json.dumps(sqs_message).encode('utf-8'))}] } # Define your test event if necessary context = {} # Define your test context if necessary response = main_handler(event, context) print("Lambda function response:", response)```
Wow, thanks so much. Will try it out and let you know!!! Really appreciate the help here. Above and beyond, really. Thanks!!!
Also, if you are not able to do it in your lambda function or from wherever you are sending data, you can always use VRL functions with OpenObserve to parse and massage the data within OpenObserve
Asher
Wed, 01 Nov 2023 22:58:45 UTCIs there an example somewhere of sending a custom payload via Kinesis to OpenObserve? My Kinesis Firehose says successful, but not seeing anything in open observe. Tried a bunch of different approaches but nothing seems to be working. Normal things from AWS directly like log groups work, but when trying to send custom payloads to kinesis and then to OO, no luck. I read in the OO docs that maybe should be base64 encoded. Is there any documentation on custom Kineses Firehose payloads and what format they should be to Kinesis so they stream into OpenObserve correctly? This is an AWS Lambda excerpt: ``` records = [] # Extract the records from the event for record in event['Records']: message_body = record['body'] print(f"SQS message {record['messageId']}: {message_body}") # Prepare the message for Kinesis Firehose firehose_record = { 'Data': message_body + "\n" } records.append(firehose_record) # Put records into Firehose stream if records: response = firehose_client.put_record_batch( DeliveryStreamName=FIREHOSE_STREAM_NAME, Records=records ) print(f"Firehose Response: {json.dumps(response)}") return { 'statusCode': 200, 'body': json.dumps(f"Successfully processed {len(event['Records'])} messages.") }```