Spark Details: Spark Submit
This article provides details around how ModelOp Center leverages spark-submit
to orchestrate the submission and management of Spark
jobs within Spark
environments.
Table of Contents
Overview
The spark-submit
execution process was designed and architected with the following principles and features in mind:
When possible,
Python
and/orPySpark
components should be implemented inpython
. This principle will ensure testing and debugging are easier when running in local or cluster deploy modeThe spark-runtime-service is monitoring jobs in
WAITING
andRUNNING
state until they reach theirCOMPLETE
,ERROR
orCANCELLED
state. If any new job changes are detected, model manager will be updated accordinglyBefore a job is submitted to the
Spark
cluster for execution, theModelOpPySparkDriver
implementation will extract theinputData
,outputData
andstoredModel.modelAssets
from theMODEL_BATCH_JOB
/MODEL_BATCH_TEST_JOB
/MODEL_BATCH_TRAINING_JOB
and pass them in as parameters to the function to be executed. These external file assets are passed in as lists ofAsset
s:input_data – [{“filename“: “test.csv“, “assetType”: “EXTERNAL_FILE, “fileUrl”: “hdfs:///hadoop/test.csv“}]
output_data – [{“filename“: “output.csv”, “assetType”: “EXTERNAL_FILE“, “fileUrl”: “hdfs:///hadoop/output.csv“}]
The
YarnMonitorService
component should be listening for and storing injobMessage
thestdout
andstderr
produced by the job that is running at theSpark
cluster. Whether to store or not storestdout
andstderr
injobMessages
is configurable.
Architecture Overview
The following diagram represents how core components interact, and gives a high level overview of the process flow from a MODEL_BATCH_JOB
/MODEL_BATCH_TEST_JOB
/MODEL_BATCH_TRAINING_JOB
to a Spark
job running at the Spark
cluster.
Details
PySpark Dynamic Job Abstraction
ModelOp Center abstracts the process of executing PySpark
jobs using MODEL_BATCH_JOB
/MODEL_BATCH_TEST_JOB
/MODEL_BATCH_TRAINING_JOB
as input. The approach uses a main PySpark Driver (ModelOpPySparkDriver.py
) that imports
the job metadata (details about job input, output and model assets, as well as the name of the method to be invoked), the model source code to be executed, and then just a simple main
that will be able to call the desired function.
PySparkJobManifest
ModelOp Center decouples the transformation process that takes a MODEL_BATCH_JOB
/MODEL_BATCH_TEST_JOB
/MODEL_BATCH_TRAINING_JOB
as input and produces a SparkLauncher
as output. For this purpose, ModelOp Center uses the PySparkJobManifest
intermediate component. PySparkJobManifest
contains all the required values when running a spark-submit
: Python
source code and metadata, function name, application name (all of which are used inside ModelOpPySparkDriver
). This design allows us to decouple and to keep agnostic the spark-submit
.
The following diagram illustrates the transformation process of a MODEL_BATCH_JOB
/MODEL_BATCH_TEST_JOB
/MODEL_BATCH_TRAINING_JOB
into a SparkLauncher
:
ModelOpJobMonitor
The ModelOpJobMonitor
has two main responsibilities:
Submitting jobs to the
Spark
clusterMonitoring job statuses for existing, waiting and running,
Spark
jobs.
In order to successfully fulfill these responsibilities, it relies on two key services:
SparkLauncherService
– component in charge of translating jobs intoSparkLauncher
objectsYarnMonitorService
– component in charge of fetching job statuses and output produced from theSpark
clusterKerberizedYarnMonitorService
– component that is an instance ofYarnMonitorService
and in charge of authenticating the principal withKerberos
before delegating control toYarnMonitorService
PySpark Job Input and Output Data
The spark-runtime-service
supports two types of input and output data for MODEL_BATCH_JOB
/MODEL_BATCH_TEST_JOB
/MODEL_BATCH_TRAINING_JOB
:
HDFS asset(s) by URL a.k.a. External asset(s)
Embedded asset(s) a.k.a. File asset(s)
Input Data by URL
When the user provides HDFS asset(s) by URL as input data for a job, the HDFS asset(s) have to be present at the Spark
cluster before the job is submitted to the Spark
cluster for execution.
Input Data Embedded
When the user provides embedded input data asset(s), the spark-runtime-service
will create a temporary HDFS directory for the job and upload the file content of each embedded input data asset to HDFS. The newly created HDFS file(s) will be represented as HDFS asset(s) by URL and used as the input data when the job is submitted to the Spark
cluster for execution.
The change in the input data from embedded asset(s) to HDFS asset(s) by URL is not visible to the user because the MODEL_BATCH_JOB
/MODEL_BATCH_TEST_JOB
/MODEL_BATCH_TRAINING_JOB
is never updated to reflect this change. Instead, the change is handled internally in PySparkPreprocessorService
.
Output Data by URL
When the user provides HDFS asset(s) by URL as output data for a job, the job output will be stored at the given HDFS file url.
Output Data Embedded
When the user provides embedded output data asset(s), the spark-runtime-service
will create a temporary HDFS directory for the job, if one does not exist yet. Then, the following actions are performed in the given order:
Represent each embedded output data asset as HDFS asset by URL where the file url of the new HDFS asset is the temporary HDFS directory and the filename of the new HDFS asset is the filename of the corresponding embedded asset
Update
MODEL_BATCH_JOB
/MODEL_BATCH_TEST_JOB
/MODEL_BATCH_TRAINING_JOB
by storing the HDFS asset(s) by URL in its job parametersMap
where the key isTMP_EXTERNAL_OUTPUT
Use the newly created HDFS asset(s) by URL as the output data when the job is submitted to the
Spark
cluster for executionDownload each HDFS asset by URL and store its content as the file content for the corresponding embedded output data asset in
MODEL_BATCH_JOB
/MODEL_BATCH_TEST_JOB
/MODEL_BATCH_TRAINING_JOB
Remove
TMP_EXTERNAL_OUTPUT
from the job parametersMap
The change in the output data from embedded asset(s) to HDFS asset(s) by URL is not visible to the user because the MODEL_BATCH_JOB
/MODEL_BATCH_TEST_JOB
/MODEL_BATCH_TRAINING_JOB
is never updated to reflect this change. Instead, the change is handled internally in PySparkPreprocessorService
and ModelOpJobMonitor
.
For input and output data, the user can have any combination of HDFS asset(s) by URL or embedded asset(s).
PySpark - Metrics Jobs
Running jobs of type MODEL_BATCH_TEST_JOB
is one of the key features of the spark-runtime-service
. These jobs produce a ModelTestResult
whose generation is performed by the MLC.