Streams
You can import real-time data from external sources and stream it to Amorphic with Streams
, as Amorphic uses Kinesis Stream to collect and process large streams of data in real-time. Further, you can also use it for ETL/Analytics.
To utilize the stream of data you need to create Consumers
that will deliver the data to datasets. A stream can provide one-to-many connections so you can share that data with multiple consumers. You can also perform Data transformation
before you stream the data to datasets.
Kinesis
- To use the data stream, create "Consumers" to transfer data into datasets. A stream can connect to multiple consumers and data can be transformed before streaming.
- To create a Kinesis data stream in Amorphic, use the '+' icon in the Streams section of Ingestion. The table below shows the required fields for stream creation:
Attribute | Description |
---|---|
Stream Name | Name for the stream. |
Stream Type | Only Kinesis is supported. |
Stream Mode (Optional) | Type of stream, e.g. On-Demand or Provisioned. Default is Provisioned. |
Data Retention Period (Hrs) | Time period for which the data stream retains data records. |
Shard Count | Number of shards for the stream. Each shard ingests up to 1 MiB/second and 1000 records/second. |
- After creating a stream, Amorphic provides AWS access keys to push data to the stream. Refer to the AWS documentation for different ways to push data.
- To update a stream, use the edit option on the details page. Only metadata and configuration can be updated.
- To retrieve all datasets associated with a stream, use the following API call:
/streams/{id}?request_type=get_datasets
& GET
method
- Consumers use Kinesis Data Firehose delivery streams to continuously collect and load data into specified destinations. Each consumer is attached to a dataset as the final destination of collected data.
Consumer
To create a consumer in Amorphic, you need to use the service Kinesis Data Firehose. The Kinesis Data Firehose service allows you to continuously collect and load streaming data into specified destinations. Each consumer is attached to a dataset, which serves as the final destination of data collected from the stream.
To create a consumer in Amorphic, you will need to provide the following information:
Attribute | Description |
---|---|
Consumer Name | Name to be used for the consumer. |
Buffer Size | Buffer incoming data to the specified size, in MiBs, before delivering it to the destination |
Buffer Interval | A period of time during which the data stream retains data records (In hours). |
Target Location | Final destination for the streaming data is to be stored. |
Create Dataset | Option to create a new dataset or use an existing dataset. |
Dataset Configuration | Refer (Create Dataset) in Datasets |
The Dataset FileType supported for consumer target locations are:
- Redshift: CSV, PARQUET
- Lakeformation: CSV, PARQUET.
- S3 Athena: CSV, PARQUET.
- S3: CSV, OTHERS.
It is recommended to refer the AWS documentation for more detailed information on how to create a consumer using Kinesis Data Firehose and how to configure it.
Data Transformations
You can use Data Transformations to customize how data is processed as it streams into Amorphic. These functions are defined on a per-stream basis and can be applied to any consumer of that stream.
The data is temporarily stored in a buffer before the lambda function (user-defined processing) is applied. The buffer size can be set per consumer, independently of the consumer's own buffer settings. For more information, refer to the AWS documentation.
Attribute | Description |
---|---|
Function Name | Display Name for the user defined function. |
Lambda Handler | The unabridged lambda handler for the user defined function. |
Memory Size (Optional) | The memory to be allocated for the user defined function. Default: 128 MB. |
The number of invocations of the lambda function is directly proportional to the number of shards in the data stream and the number of consumers attached to the data transformation function.
Therefore, Amorphic limits this functionality to a maximum of 20 overall data transformation functions and 5 consumers per data transformation function.
Stream use case
Let's see a use case where you can feed in the streaming data & consume the data for analytical purposes.
To start, access the Stream My-Stream through the access key & secret access key given in the connection details.
Here, data is sent to the stream using a python boto3 script.
import json
import csv
import boto3
data_records = []
client = boto3.client('kinesis', region_name='eu-west-1', aws_access_key_id="ACCESS_KEY_ID", aws_secret_access_key='SECRET_ACCESS_KEY')
counter=0
with open('./data.csv', encoding='utf-8') as csvf:
lines=csvf.readlines()
for line in lines:
data_records.append(line)
response = client.put_record(
StreamName = "My-Stream",
Data = line,
PartitionKey = str(hash(line))
)
counter = counter + 1
print('Message sent #' + str(counter))
#If the message was not successfully sent print an error message
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
print('Error!')
print(response)
print(data_records)
Amorphic Stream My-Stream then receives the data and passes it over to the consumers attached to the stream, here the consumers attached are - ● finance-data-consumer ● technology-data-consumer
Both the consumers attached to the My-Stream receive the same records but if the Data Transformations are attached to these consumers, then they filter the respective records and only pass the needed ones defined as per the custom code of data Transformations.
Here, the Data Transformations are -
● filter-by-financial-sector - which filters only the financial sector records ● filter-by-technology-sector - which filters only the technical sector records
After the records being passed from the Data Transformations of the respective consumers the Datasets that are associated with consumers will have only the records filtered by the associated Consumer’s Data Transformation.
Here, the Consumer finance-data-consumer’s associated dataset my_financial_streaming_data will have only the Financial records & the consumer technology-data-consumer’s associated dataset my_technical_streaming_data will have the Technical records.
Thus, the resultant datasets can be used for further analysis of the amorphic application using several analytical services offered on the platform.
Data Transformation code for filtering technical records.
"""
File defines logic for transforming records.
"""
import base64
import json
import sys
OTHER_SECTORS = ["HEALTHCARE", "ENERGY", "RETAIL", "FINANCIAL"]
def lambda_handler(event, context):
"""
Entry point for lambda.update test.
"""
print("input size: ", sys.getsizeof(event["records"]))
output = []
for record in event["records"]:
payload = base64.b64decode(record["data"]).decode('utf-8')
record_info = payload.split(",")
if record_info[3].strip() == "TECHNOLOGY":
output_record = {
"recordId": record["recordId"],
"result": "Ok",
"data": base64.b64encode(payload.encode('utf-8'))
}
elif record_info[3].strip() in OTHER_SECTORS:
output_record = {
"recordId": record["recordId"],
"result": "Dropped",
"data": base64.b64encode(payload.encode('utf-8'))
}
else:
output_record = {
"recordId": record["recordId"],
"result": "ProcessingFailed",
"data": base64.b64encode(payload.encode('utf-8'))
}
output.append(output_record)
print("Processed {} records.".format(len(event["records"])))
print("output size: ", sys.getsizeof(output))
return {"records": output}
Data Transformation code for filtering Financial records.
"""
File defines logic for transforming records.
"""
import base64
import json
import sys
OTHER_SECTORS = ["HEALTHCARE", "ENERGY", "RETAIL", "TECHNOLOGY"]
def lambda_handler(event, context):
"""
Entry point for lambda.update test.
"""
print("input size: ", sys.getsizeof(event["records"]))
output = []
for record in event["records"]:
payload = base64.b64decode(record["data"]).decode('utf-8')
record_info = payload.split(",")
if record_info[3].strip() == "FINANCIAL":
output_record = {
"recordId": record["recordId"],
"result": "Ok",
"data": base64.b64encode(payload.encode('utf-8'))
}
elif record_info[3].strip() in OTHER_SECTORS:
output_record = {
"recordId": record["recordId"],
"result": "Dropped",
"data": base64.b64encode(payload.encode('utf-8'))
}
else:
output_record = {
"recordId": record["recordId"],
"result": "ProcessingFailed",
"data": base64.b64encode(payload.encode('utf-8'))
}
output.append(output_record)
print("Processed {} records.".format(len(event["records"])))
print("output size: ", sys.getsizeof(output))
return {"records": output}