Skip to main content
Version: v2.7 print this page

Advanced usage of file load validation node

In advanced use cases where preceding ETL jobs can run concurrently, users must supply an input manifest file to the file load validation node for the identification of the correct files. Another scenario arises when users wish to incorporate multiple partitions alongside the default 'upload_date' partition. The following example combines these two cases.

Following are the constraints for a valid input manifest file:

  1. File should be of type json
  2. File should contain the schema shown below
Schema
    [{
"Domain": "string",
"DatasetName": "string",
"Partitions": [
{"upload_date": "string", "custom_partition": "string",......},
{"upload_date": "string", "custom_partition": "string",......},
......
]
},
............
]
  1. File should be written to the following s3 location:
s3://<etl_bucket_name>/<preceding_etl_job_id>/temp/manifest-files/<workflow_name>/workflow_execution_id-input_manifest_file.json

Important thing to note: The multi-partition concept only applies for datasets present in the S3-Athena Target Location

Here are some sample input manifest files:

Input manifest file for etl job writing to a single dataset: Sample_input_manifest_file Input manifest file for etl job writing to multiple datasets: Sample_input_manifest_file_2

After workflow execution, workflow execution properties of file validation node is shown below: file_load_validation_workflow_execution_properties

When you click 'Download' button next to output manifest file, you can see details of file load statuses as shown below: file_load_validation_output_manifest_file

Sample code snippet writing data to multiple datasets with multi-partition:

  1. Data is written to customers dataset in insurance domain

  2. Corresponding partitions of customers dataset are stored in a dict

  3. Data is written to employees dataset in insurance domain

  4. Corresponding partitions of employees are stored in a dict

  5. Partition information of both datasets are written to etl bucket as input_manifest_file

  6. It should be noted that all values written between < > should be replaced with the appropriate values in the below sample code

    import os import sys import datetime import boto3 import time from awsglue.utils import getResolvedOptions import pandas as pd from io import StringIO import boto3 import json

def write_csv_dataframe_to_s3(s3_resource, dataframe, bucket, file_path):
"""
writes csv file to s3
"""
csv_buffer = StringIO()
dataframe.to_csv(csv_buffer, index=False)
s3_resource.Object(bucket, file_path).put(Body=csv_buffer.getvalue())

def write_json_manifest_file_to_s3(s3, json_data, file_path):
"""
writes json file to s3
"""
s3object = s3.Object('cdap-<region>-<aws_account_id>-master-etl', file_path)
s3object.put(Body=(bytes(json.dumps(json_data).encode('UTF-8'))))


json_data = []
args = getResolvedOptions(sys.argv, ['WORKFLOW_NAME', 'WORKFLOW_RUN_ID'])
workflow_name = args['WORKFLOW_NAME']
workflow_run_id = args['WORKFLOW_RUN_ID']
s3 = boto3.resource('s3')
domain_name = 'insurance'
dataset_name = 'customers'
epoch_time = int(time.time())
epoch_time_str = str(epoch_time)
data = [['1', 'Alfreds Futterkiste', 'Maria Anders', 'Germany'], ['2', 'Ana Trujillo Emparedados y helados', 'Ana Trujillo', 'Mexico']]
df = pd.DataFrame(data, columns = ['CustomerID', 'CustomerName', 'ContactName', 'Country'])
path='{}/{}/uploaded_by={}/upload_date={}/<valid-amorphic-userID>/csv'.format(domain_name, dataset_name, 'Jeebu', epoch_time_str)
write_csv_dataframe_to_s3(s3, df, 'cdap-<region>-<aws_account_id>-master-lz', '{}/customers_1.csv'.format(path))

# Create a dict for domain, dataset combination and partition information
customers_partition_dict = {"Domain": domain_name, "DatasetName": dataset_name}
partitions = []
partitions.append({"uploaded_by":"Sam", "upload_date": epoch_time_str})
customers_partition_dict.update({"Partitions": partitions})
json_data.append(customers_partition_dict)

domain_name = 'insurance'
dataset_name = 'employees'
epoch_time = int(time.time())
epoch_time_str = str(epoch_time)
data = [['1', 'Alfreds Futterkiste', 'Maria Anders', 'Germany'], ['2', 'Ana Trujillo Emparedados y helados', 'Ana Trujillo', 'Mexico']]
df = pd.DataFrame(data, columns = ['CustomerID', 'CustomerName', 'ContactName', 'Country'])
path='{}/{}/uploaded_by={}/upload_date={}/<valid-amorphic-userID>/csv'.format(domain_name, dataset_name, 'Jeebu', epoch_time_str)
write_csv_dataframe_to_s3(s3, df, 'cdap-<region>-<aws_account_id>-master-lz', '{}/employees_1.csv'.format(path))

# Create a dict for domain, dataset combination and partition information
employees_partition_dict = {"Domain": domain_name, "DatasetName": dataset_name}
partitions = []
partitions.append({"uploaded_by":"Sam", "upload_date": epoch_time_str})
employees_partition_dict.update({"Partitions": partitions})
json_data.append(employees_partition_dict)

# Write json_data as input manifest file
file_path = "<etl_job_id>/temp/manifest-files/{}/{}-input_manifest_file.json".format(workflow_name, workflow_run_id)
write_json_manifest_file_to_s3(s3, json_data, file_path)