Bookmarking In Amorphic
As many etl jobs can be interdependent on different datasets, it is required by us to maintain the state at some persistent location for later access and synchronise the jobs. In this package, we have an util to store the state into amorphic with s3 as the backend. The data is stored as dictionary into s3 with key, value structure. Value can be dict or list or any primitive type.
The state is stored based on job name and thus one need to be consistent with the job_name for respective state data.
Usage
- Pre-requisite : * Create dataset backed by S3 to store the state data.
- Initialize the state management class and get StateData instance
from amorphicutils import statemanagement
state_mgm = statemanagement.StateStore(lz_bucket, dlz_bucket, state_domain, state_dataset)
state_data = state_mgm.get_or_create(test_job_name)
test_job_name is the name of the job for which the state is.
- Add new element to state data.
state_data.add_element(test_statedata_key_1, test_statedata_value_1)
We add new key and value to existing StateData object with add_element method. The key should be unique and thus
<domain name>.<dataset name>
is good selection for key.
- Save the state data in persistent storage
state_mgm.update(state_data)
update method store the data in s3 with suffix as _running.json, which indicates that the jobs related to respective state store is not complete and just stored in persistent location.
- Complete the state data
state_mgm.complete(state_data)
This will store the final copy of state_data into persistent storage with suffix as _completed.json
Implementation
State Data
This is the object which stores the data in key-value format.
class amorphicutils.tools.statemanagement.StateData
Class to store the state data. It stores the data as dictionary.
__init__()
Initialize the StateData class
>>> state_data = StateData()
add_element(key, value)
Add element to the state data
- Parameters:
- key – key of the state data
- value – value of the state data
- Returns:
>>> state_data = StateData()
>>> state_data.add_element("DatasetId", {"epoch_time": 156234987})
from_bytes(data)
Load the dict of data into state data class
- Parameters:data – dict of data to be loaded
- Returns:
>>> state_data = StateData()
>>> state_data.from_bytes({ "DatasetId": { "epoch_time": 156234678}})
remove_element(key)
Removes the element from state data
- Parameters:key – key of the state data
- Returns:
>>> state_data = StateData()
>>> state_data.remove_element("DatasetId")
reset()
Resets the data of StateData class :return:
to_bytes()
Returns the state data in bytes format
- Returns: bytes of state data
>>> state_data.to_bytes()
State Store
This object interacts with Amorphic to update the dataset.
class amorphicutils.tools.statemanagement.StateStore(lz_bucket_name, dlz_bucket_name, state_domain, state_dataset_name, username, file_type='others')
Store the state of the amorphic etl jobs
__init__(lz_bucket_name, dlz_bucket_name, state_domain, state_dataset_name, username, file_type='others')
Initialize the StateStore class
- Parameters:
- lz_bucket_name – bucket name of lz bucket
- dlz_bucket_name – bucket name of dlz bucket
- state_domain – domain of the state dataset
- state_dataset_name – dataset name of the state dataset
- username – user id of the user who has access to state data
- file_type – file type of the state dataset. Default: “others”
>>> state_store = StateStore("lz_bucket", "dlz_bucket", "test_state_domain",
"test_state_dataset", "test_user", "others")
complete(data)
Saves the file with completed tag
- Parameters:data – StateData which will store data into s3
- Returns:
>>> state_store = StateStore("lz_bucket", "dlz_bucket", "test_state_domain",
"test_state_dataset", "test_user", "others")
>>> state_data = state_store.get_or_create("test_job")
>>> state_data.add_element("DatasetId", {"epoch_time": 156234987})
>>> state_store.complete(state_data)
get_or_create(job_name)
Gets or create StateData instance
- Parameters:job_name – name of the job name for which to maintain state
- Returns: instance of StateData
>>> state_store = StateStore("lz_bucket", "dlz_bucket", "test_state_domain",
"test_state_dataset", "test_user", "others")
>>> state_data = state_store.get_or_create("test_job")
update(data)
Saves the file with running tag in s3
- Parameters:data – StateData which will store data into s3
- Returns:
>>> state_store = StateStore("lz_bucket", "dlz_bucket", "test_state_domain",
"test_state_dataset", "test_user", "others")
>>> state_data = state_store.get_or_create("test_job")
>>> state_data.add_element("DatasetId", {"epoch_time": 156234987})
>>> state_store.update(state_data)