Advanced usage of file load validation node
In advanced use cases where preceding etl job can have concurrent runs, users will have to provide an input manifest file from etl job for file load validation node to identify correct files. Another use case is when users wants to add multiple partitions in addition to the default 'upload_date' partition. Following are the two cases mixed into one example.
Following are the constraints for a valid input manifest file:
- File should be of type json
- File should contain the schema shown below
[{
"Domain": "string",
"DatasetName": "string",
"Partitions": [
{"upload_date": "string", "custom_partition": "string",......},
{"upload_date": "string", "custom_partition": "string",......},
......
]
},
............
]
- File should be written to the following s3 location:
s3://<etl_bucket_name>/<preceding_etl_job_id>/temp/manifest-files/<workflow_name>/workflow_execution_id-input_manifest_file.json
Important thing to note: The multi-partition concept only applies for datasets in the S3-Athena Target Location
Here are some sample input manifest files:
Input manifest file for etl job writing to a single dataset: Input manifest file for etl job writing to multiple datasets:
After workflow execution, workflow execution properties of file validation node is shown below:
When you click 'Download' button next to output manifest file, you can see details of file load statuses as shown below:
Sample code snippet writing data to multiple datasets with multi-partition:
Data is written to customers dataset in insurance domain
Corresponding partitions of customers dataset are stored in a dict
Data is written to employees dataset in insurance domain
Corresponding partitions of employees are stored in a dict
Partition information of both datasets are written to etl bucket as input_manifest_file
It should be noted that all values written between < > should be replaced with the appropriate values in the below sample code
import os import sys import datetime import boto3 import time from awsglue.utils import getResolvedOptions import pandas as pd from io import StringIO import boto3 import json
def write_csv_dataframe_to_s3(s3_resource, dataframe, bucket, file_path):
"""
writes csv file to s3
"""
csv_buffer = StringIO()
dataframe.to_csv(csv_buffer, index=False)
s3_resource.Object(bucket, file_path).put(Body=csv_buffer.getvalue())
def write_json_manifest_file_to_s3(s3, json_data, file_path):
"""
writes json file to s3
"""
s3object = s3.Object('cdap-<region>-<aws_account_id>-master-etl', file_path)
s3object.put(Body=(bytes(json.dumps(json_data).encode('UTF-8'))))
json_data = []
args = getResolvedOptions(sys.argv, ['WORKFLOW_NAME', 'WORKFLOW_RUN_ID'])
workflow_name = args['WORKFLOW_NAME']
workflow_run_id = args['WORKFLOW_RUN_ID']
s3 = boto3.resource('s3')
domain_name = 'insurance'
dataset_name = 'customers'
epoch_time = int(time.time())
epoch_time_str = str(epoch_time)
data = [['1', 'Alfreds Futterkiste', 'Maria Anders', 'Germany'], ['2', 'Ana Trujillo Emparedados y helados', 'Ana Trujillo', 'Mexico']]
df = pd.DataFrame(data, columns = ['CustomerID', 'CustomerName', 'ContactName', 'Country'])
path='{}/{}/uploaded_by={}/upload_date={}/<valid-amorphic-userID>/csv'.format(domain_name, dataset_name, 'Jeebu', epoch_time_str)
write_csv_dataframe_to_s3(s3, df, 'cdap-<region>-<aws_account_id>-master-lz', '{}/customers_1.csv'.format(path))
# Create a dict for domain, dataset combination and partition information
customers_partition_dict = {"Domain": domain_name, "DatasetName": dataset_name}
partitions = []
partitions.append({"uploaded_by":"Sam", "upload_date": epoch_time_str})
customers_partition_dict.update({"Partitions": partitions})
json_data.append(customers_partition_dict)
domain_name = 'insurance'
dataset_name = 'employees'
epoch_time = int(time.time())
epoch_time_str = str(epoch_time)
data = [['1', 'Alfreds Futterkiste', 'Maria Anders', 'Germany'], ['2', 'Ana Trujillo Emparedados y helados', 'Ana Trujillo', 'Mexico']]
df = pd.DataFrame(data, columns = ['CustomerID', 'CustomerName', 'ContactName', 'Country'])
path='{}/{}/uploaded_by={}/upload_date={}/<valid-amorphic-userID>/csv'.format(domain_name, dataset_name, 'Jeebu', epoch_time_str)
write_csv_dataframe_to_s3(s3, df, 'cdap-<region>-<aws_account_id>-master-lz', '{}/employees_1.csv'.format(path))
# Create a dict for domain, dataset combination and partition information
employees_partition_dict = {"Domain": domain_name, "DatasetName": dataset_name}
partitions = []
partitions.append({"uploaded_by":"Sam", "upload_date": epoch_time_str})
employees_partition_dict.update({"Partitions": partitions})
json_data.append(employees_partition_dict)
# Write json_data as input manifest file
file_path = "<etl_job_id>/temp/manifest-files/{}/{}-input_manifest_file.json".format(workflow_name, workflow_run_id)
write_json_manifest_file_to_s3(s3, json_data, file_path)