Getting Started
To start the development on Amorphic we need entry point based on backend which can be python or pyspark.
Python
Following is the example sample main file which can be used as reference for app entry point.
import sys
import argparse
import logging
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
def main(app_name=None):
try:
logger.info("App Name: {0}".format(app_name))
####################################
############ Your Logic ############
####################################
except Exception as ex:
logger.error("Failed to execute main with error: {0}".format(str(ex)))
def parse_arguments():
ap = argparse.ArgumentParser()
# Add arguments to parser
ap.add_argument("--scriptLocation", help="Script location in aws glue, this is internal arguments given to glue execution")
ap.add_argument("--app_name", help="Name of the app when launched locally")
known_arguments, unknown_arguments = ap.parse_known_args()
arguments = vars(known_arguments)
if not arguments.get("scriptLocation"):
if not arguments.get("app_name"):
logger.error("--app_name is required as arguments when running locally.")
sys.exit(1)
return arguments
if __name__ == '__main__':
args = parse_arguments()
if not args.get('app_name'):
args['app_name'] = args['scriptLocation'].split('/')[-1]
print(args)
main(args.get('app_name'))
Pyspark
One can execute spark job locally or in aws glue environment. When one uses spark in aws glue then we use GlueContext. Following can be used as reference entry point.
from amorphicutils.pyspark.infra.spark import get_spark
from amorphicutils.pyspark.infra.gluespark import GlueSpark
from amorphicutils.amorphiclogging import Log4j
import sys
import argparse
def main(app_master=None, app_name=None):
try:
if not app_master:
glue_spark = GlueSpark()
spark = glue_spark.get_spark()
logger = glue_spark.get_logger()
logger.info("Initialized Glue Context")
else:
spark = get_spark(app_name)
spark_logger = Log4j(spark)
spark_logger.set_level("INFO")
logger = spark_logger.get_logger()
logger.info("Initialized Local Spark Context")
logger.info("Spark App Name: {0}".format(spark.conf.get("spark.app.name")))
####################################
############ Your Logic ############
####################################
except Exception as ex:
logger.error("Failed to execute main with error: {0}".format(str(ex)))
raise Exception(ex)
finally:
if not app_master:
glue_spark.commit_job()
def parse_arguments():
ap = argparse.ArgumentParser()
# Add arguments to parser
ap.add_argument("--master", help="User local[*] or local[<required cores>] to execute locally")
ap.add_argument("--app_name", help="Name of the app when launched locally")
known_arguments, unknown_arguments = ap.parse_known_args()
arguments = vars(known_arguments)
if arguments:
if arguments.get("master") and "local" in arguments.get("master") and not arguments.get("app_name"):
print("--app_name argument is required when running locally")
sys.exit(1)
return arguments
if __name__ == '__main__':
args = parse_arguments()
print(args)
main(args.get("master"), args.get("app_name"))