Datalake Util
Datalake util has functionality to get the state of the platform.
Dataload success check
Amorphic has multi level of data flow to ensure that data written to a dataset is of expected schema. The flow is, user writes the data to Landing Zone bucket(lz bucket) of the respective data. Then it perform load to datawarehouse (if dataset is stored in dataset) into temp table of dataset to validate the schema. On success it is copied to Data Landing Zone bucket(dlz bucket), which is then copied to final table of data warehouse. So we can just check if data is copied to dlz bucket and on success we can be sure that data will be successfully copied to final target.
Usage
To check if data is successfully copied
- Initialize DataLandingZoneUtil
from amorphicutils import datalakeutil
datalake_util = datalakeutil.DataLandingZoneUtil(lz_bucket, dlz_bucket)
- Check if the dataload is success
datalake_util.is_data_load_complete(domain, dataset, upload_time)
- User bool_retry decorator to check the status with retry
@bool_retry(3, 2, True)
def check_if_data_load_complete(domain, dataset, upload_time):
return datalake_util.is_data_load_complete(domain, dataset, upload_time)
result = check_if_data_load_complete(self.domain, self.dataset, self.upload_time)
Above bool_retry accepts retry_count, sleep_time in seconds and what outcome your function accepts True or False. In above example it will retry at max for 3 times with delay of 2 sec between every check and it is success if check_if_data_load_complete returns True
- Get last upload_date epoch folder
datalake_util = datalakeutil.DataLandingZoneUtil(<lz_bucket>, <dlz_bucket>)
last_epoch = datalake_util.get_last_upload_date_folder_epoch(<domain_name>, <dataset_name>)
- Get list of epoch folders
datalake_util = datalakeutil.DataLandingZoneUtil(<lz_bucket>, <dlz_bucket>)
last_epoch = datalake_util.get_epochs_list(<domain_name>, <dataset_name>)
- Get new epoch folder
datalake_util = datalakeutil.DataLandingZoneUtil(<lz_bucket>, <dlz_bucket>)
new_epochs_list = datalake_util.get_new_epochs(<domain_name>, <dataset_name>, <upload_time>)
- Get new files greater than a upload_date epoch
datalake_util = datalakeutil.DataLandingZoneUtil(<lz_bucket>, <dlz_bucket>)
last_epoch = datalake_util.get_new_files_prefix(<domain_name>, <dataset_name>, last_processed_epoch=<upload_time>)
Implementation
class amorphicutils.datalakeutil.DataLandingZoneUtil(lz_bucket_name, dlz_bucket_name, region=None)
Util class to get state of DLZ
__init__(lz_bucket_name, dlz_bucket_name, region=None)
Initialize DLZ Util
- Parameters:
- lz_bucket_name – Lz bucket name
- dlz_bucket_name – Dlz bucket name
get_epochs_list(domain_name, dataset_name)
Returns the list of epochs
- Parameters:
- domain_name – domain name of dataset
- dataset_name – dataset name
- Returns:
get_last_upload_date_folder_epoch(domain_name, dataset_name)
Get the epoch of folder when there was last upload
- Parameters:
- domain_name – domain name of dataset
- dataset_name – dataset name
- Returns:
get_new_epochs(domain_name, dataset_name, last_processed_epoch=None)
Returns the list of epochs which are greater than last_processed_epoch
- Parameters:
- domain_name – domain name of dataset
- dataset_name – dataset name
- last_processed_epoch – last epoch processed
- Returns: list of new epochs
get_new_files_prefix(domain_name, dataset_name, last_processed_epoch=None)
Returns the list of file location whose epoch is greater than last_processed_epoch.
- Parameters:
- domain_name – domain name of dataset
- dataset_name – dataset name
- last_processed_epoch – last epoch processed
- Returns: list of new file location
class amorphicutils.datalandingzonestate.DataLandingZoneState(lz_bucket_name, dlz_bucket_name)
Class to check state of DataLandingZone
__init__(lz_bucket_name, dlz_bucket_name)
Initialize the DataLandingZoneState object :param lz_bucket_name: Lz bucket name :param dlz_bucket_name: Dlz bucket name
is_data_load_complete(domain_name, dataset_name, upload_date, region=None)
Return True if data load is complete
- Parameters:
- domain_name – domain name
- dataset_name – dataset name
- upload_date – upload date to check if exists
- region – region of aws
- Returns: True or False
is_dir_exists(paths=None, region=None)
Returns true if the dir with upload date exists :param paths: list of path for which to check dir :param region: region of aws :return: True or False
static is_epoch_dir_exists(bucket_name, domain_name, dataset_name, upload_date, path=None, region=None)
Returns true if the dir with upload date exists
- Parameters:
- bucket_name – name of the bucket
- domain_name – domain name
- dataset_name – dataset name
- upload_date – upload date to check if exists
- path – path for which to check dir
- region – region of aws
- Returns: True or False