Spark Details: Spark Submit

This article provides details around how ModelOp Center leverages Spark Submit to orchestrate the submission and management of Spark jobs within Spark environments.

Table of Contents

 

 

Overview

The spark-submit execution process was designed and architected with the following principles and features in mind:

  • When possible, Python and/or PySpark components should be implemented in python. This principle will ensure testing and debugging are easier when running in local or cluster deploy mode

  • The spark-runtime-service is monitoring jobs in WAITING and RUNNING state until they reach their COMPLETE, ERROR or CANCELLED state. If any new job changes are detected, model manager will be updated accordingly

  • Before a job is submitted to the Spark cluster for execution, the ModelOpPySparkDriver implementation will extract all the (external file assets) input, output and model assets from the ModelBatchJob and pass them in as parameters to the function to be executed. These external file assets are passed in as lists of Assets :

    • external_inputs – [{“name“: “test.csv“, “assetType”: “EXTERNAL_FILE, “fileUrl”: “/hadoop/test.csv“}]

    • external_outputs – [{“name“: “output.csv”, “assetType”: “EXTERNAL_FILE“, “fileUrl”: “/hadoop/output.csv“}]

    • external_model_assets – [{“name“: “asset.zip”, “assetType”: “EXTERNAL_FILE“, “fileUrl”: “/hadoop/asset.zip“}]

  • The YarnMonitorService component should be listening for and storing in jobMessage the stdout and stderr produced by a job that is running at the Spark cluster. Whether to store or not store, stdout and stderr in jobMessages, is configurable.

 

Architecture

This diagram represents how core components interact, and gives a high level overview of the process flow from a ModelBatchJob to a Spark job running at the cluster.

 


Details

PySpark dynamic job abstraction

ModelOp Center abstracts the process of executing PySpark jobs using ModelBatchJob as input. The approach uses a main PySpark Driver ( ModelOpPySparkDriver.py ) that imports the job metadata (details about job input, output and model assets, as well as method name to be executed), and the model source code to be executed, and then just a simple main that will be able to call the desired function.

 

 

######################################################################################## ##### dynamic model / resources content. ######################################################################################## # primary source code import model_source_code as modelop_model_source_code # job metadata, variables pointing to [input, output, appname,etc] import modelop_job_metadata as modelop_job_metadata # for printing stack trace import traceback # for checking number of params in function signature from inspect import signature ######################################################################################## ##### static resources content. ######################################################################################## if __name__ == "__main__": print("### - ModelOpPySparkExecution execution") try: # Determine if there is an optional, init function (# modelop.init) and execute it # Determine how many parameters the method (# modelop.score, # modelop.metrics or # modelop.train) to be executed takes num_params = len(signature(modelop_job_metadata.method_to_be_executed).parameters) ## Executing requested function print("### - Executing: " + modelop_job_metadata.method_to_be_executed) if num_params == 0: getattr(modelop_model_source_code, modelop_job_metadata.method_to_be_executed)() elif num_params == 3: getattr(modelop_model_source_code, modelop_job_metadata.method_to_be_executed)(modelop_job_metadata.job_input, modelop_job_metadata.job_output, modelop_job_metadata.job_model_assets_input) else: raise Exception("Number of function parameters, " + str(num_params) + ", does not match expected number of parameters (0 or 3)") except Exception as e: print("An exception of ", e.__class__, " occurred") traceback.print_exc() print("### - PySpark code execution completed")

 

 


PySparkJobManifest

ModelOp Center decouples the transformation process that takes a job as input and produces a SparkLauncher as output. For this purpose, ModelOp Center uses the PySparkJobManifest intermediate component. PySparkJobManifest contains all the required values when running a spark-submit: python source code and metadata, function name, application name (all of which are used inside ModelOpPySparkDriver). This design allows for decoupling and keeping agnostic the spark-submit.

 

The next diagram displays the transformation process of a ModelBatchJob into a SparkLauncher:

 

 


ModelOpJobMonitor

The ModelOpJobMonitor has two main responsibilities:

  1. Submitting jobs to Spark cluster.

  2. Monitoring job statuses for existing, waiting and running, PySpark jobs.

In order to successfully fulfill these responsibilities, it relies on two key services:

  1. SparkLauncherService component in charge of translating jobs into SparkLauncher objects

  2. YarnMonitorService – component in charge of fetching job statuses and output produced from the Spark cluster.

 

 


PySpark - Metrics Jobs

Running metrics jobs is one of the key features of the spark-runtime-service.

The ModelTestResult generation got moved out of SparkRuntimeService and now its performed by MLC.

In addition to metrics jobs, spark-runtime-service can run scoring and training jobs, too.