Skip to main content
info
This documentation is for version v2.5 of the product.
For the latest version(v2.7) documentation click here
Version: v2.5 print this page

Streams

You can import real-time data from external sources and stream it to Amorphic with Streams, Amorphic uses Kinesis Stream to collect and process large streams of data in real-time. Further, you can also use it for ETL/Analytics.

To utilize the stream of data you need to create Consumers that will deliver the data to datasets. A stream can provide one-to-many connections so you can share that data with multiple consumers. You can also perform Data transformation before you stream the data to datasets.

Kinesis

Streams

  • To use the data stream, create "Consumers" to transfer data into datasets. A stream can connect to multiple consumers and data can be transformed before streaming.
  • To create a Kinesis data stream in Amorphic, use the '+' icon in the Streams section of Ingestion. The table below shows the required fields for stream creation:
AttributeDescription
Stream NameName for the stream.
Stream TypeOnly Kinesis is supported.
Stream Mode (Optional)Type of stream, e.g. On-Demand or Provisioned. Default is Provisioned.
Data Retention Period (Hrs)Time period for which the data stream retains data records.
Shard CountNumber of shards for the stream. Each shard ingests up to 1 MiB/second and 1000 records/second.
  • After creating a stream, Amorphic provides AWS access keys to push data to the stream. Refer to the AWS documentation for different ways to push data.
  • To update a stream, use the edit option on the details page. Only metadata and configuration can be updated.
  • To retrieve all datasets associated with a stream, use the following API call:

/streams/{id}?request_type=get_datasets & GET method

  • Consumers use Kinesis Data Firehose delivery streams to continuously collect and load data into specified destinations. Each consumer is attached to a dataset as the final destination of collected data.

Consumer

Stream consumers

To create a consumer in Amorphic, you need to use the service Kinesis Data Firehose. The Kinesis Data Firehose service allows you to continuously collect and load streaming data into specified destinations. Each consumer is attached to a dataset, which serves as the final destination of data collected from the stream.

To create a consumer in Amorphic, you will need to provide the following information:

AttributeDescription
Consumer NameName to be used for the consumer.
Buffer SizeBuffer incoming data to the specified size, in MiBs, before delivering it to the destination
Buffer IntervalA period of time during which the data stream retains data records (In hours).
Target LocationFinal destination for the streaming data is to be stored.
Create DatasetOption to create a new dataset or use an existing dataset.
Dataset ConfigurationRefer (Create Dataset) in Datasets

The Dataset FileType supported for consumer target locations are:

  • Redshift: CSV, PARQUET
  • Lakeformation: CSV, PARQUET.
  • S3 Athena: CSV, PARQUET.
  • S3: CSV, OTHERS.

It is recommended to refer the AWS documentation for more detailed information on how to create a consumer using Kinesis Data Firehose and how to configure it.

Data Transformations

You can use Data Transformations to customize how data is processed as it streams into Amorphic. These functions are defined on per-stream basis and can be applied to any consumer of that stream.

The data is temporarily stored in a buffer before the lambda function (user-defined processing) is applied. The buffer size can be configured per consumer, regardless of the consumer's individual buffer settings. For more information, refer to the AWS documentation.

AttributeDescription
Function NameDisplay Name for the user defined function.
Lambda HandlerThe unbridged lambda handler for the user defined function.
Memory Size (Optional)The memory to be allocated for the user defined function. Default: 128 MB.
info

The number of invocations of the lambda function is directly proportional to the number of shards in the data stream and the number of consumers attached to the data transformation function.

Therefore, Amorphic limits this functionality to a maximum of 20 overall data transformation functions and 5 consumers per data transformation function.

Stream use case

Streams chart

Let's see a use case where you can feed in the streaming data & consume the data for analytical purposes.

To start, access the Stream My-Stream through the access key & secret access key given in the connection details.

Streams chart

Here, data is sent to the stream using a python boto3 script.

import json
import csv
import boto3


data_records = []
client = boto3.client('kinesis', region_name='eu-west-1', aws_access_key_id="ACCESS_KEY_ID", aws_secret_access_key='SECRET_ACCESS_KEY')
counter=0


with open('./data.csv', encoding='utf-8') as csvf:
lines=csvf.readlines()
for line in lines:
data_records.append(line)
response = client.put_record(
StreamName = "My-Stream",
Data = line,
PartitionKey = str(hash(line))
)
counter = counter + 1
print('Message sent #' + str(counter))
#If the message was not successfully sent print an error message
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
print('Error!')
print(response)
print(data_records)

Amorphic Stream My-Stream then receives the data and passes it over to the consumers attached to the stream, here the consumers attached are - ● finance-data-consumer ● technology-data-consumer

Streams chart

Both the consumers attached to the My-Stream receive the same records but if the Data Transformations are linked to these consumers, where they filter the respective records and exclusively allow the ones specified by custom code within data Transformations.

Here, the Data Transformations are -

Streams chart

● filter-by-financial-sector - which filters only the financial sector records ● filter-by-technology-sector - which filters only the technical sector records

After the records have been processed by the Data Transformations of the respective consumers, the datasets associated with those consumers will contain only the records filtered by the respective Consumer's Data Transformation.

Here, the Consumer finance-data-consumer’s associated dataset my_financial_streaming_data will have only the Financial records & the consumer technology-data-consumer’s associated dataset my_technical_streaming_data will have the Technical records.

Thus, the resultant datasets can be used for further analysis of the amorphic application using several analytical services offered on the platform.

Streams chart

Streams chart

Data Transformation code for filtering technical records.

"""
File defines logic for transforming records.
"""
import base64
import json
import sys
OTHER_SECTORS = ["HEALTHCARE", "ENERGY", "RETAIL", "FINANCIAL"]


def lambda_handler(event, context):
"""
Entry point for lambda.update test.
"""
print("input size: ", sys.getsizeof(event["records"]))
output = []


for record in event["records"]:
payload = base64.b64decode(record["data"]).decode('utf-8')
record_info = payload.split(",")
if record_info[3].strip() == "TECHNOLOGY":
output_record = {
"recordId": record["recordId"],
"result": "Ok",
"data": base64.b64encode(payload.encode('utf-8'))
}
elif record_info[3].strip() in OTHER_SECTORS:
output_record = {
"recordId": record["recordId"],
"result": "Dropped",
"data": base64.b64encode(payload.encode('utf-8'))
}
else:
output_record = {
"recordId": record["recordId"],
"result": "ProcessingFailed",
"data": base64.b64encode(payload.encode('utf-8'))
}


output.append(output_record)


print("Processed {} records.".format(len(event["records"])))
print("output size: ", sys.getsizeof(output))
return {"records": output}


Data Transformation code for filtering Financial records.

"""
File defines logic for transforming records.
"""
import base64
import json
import sys
OTHER_SECTORS = ["HEALTHCARE", "ENERGY", "RETAIL", "TECHNOLOGY"]




def lambda_handler(event, context):
"""
Entry point for lambda.update test.
"""
print("input size: ", sys.getsizeof(event["records"]))
output = []


for record in event["records"]:
payload = base64.b64decode(record["data"]).decode('utf-8')
record_info = payload.split(",")
if record_info[3].strip() == "FINANCIAL":
output_record = {
"recordId": record["recordId"],
"result": "Ok",
"data": base64.b64encode(payload.encode('utf-8'))
}
elif record_info[3].strip() in OTHER_SECTORS:
output_record = {
"recordId": record["recordId"],
"result": "Dropped",
"data": base64.b64encode(payload.encode('utf-8'))
}
else:
output_record = {
"recordId": record["recordId"],
"result": "ProcessingFailed",
"data": base64.b64encode(payload.encode('utf-8'))
}


output.append(output_record)


print("Processed {} records.".format(len(event["records"])))
print("output size: ", sys.getsizeof(output))
return {"records": output}