Design principles
To ensure efficient and robust execution of Spark jobs, the spark-submit process within ModelOp Center has been meticulously designed and architected. The following principles and features have been incorporated to optimize the integration and operation of Spark jobs in both local and cluster deploy modes:
Code Implementation: Whenever possible, implement Python and/or PySpark components using Python. This approach ensures easier testing and debugging in both local and cluster deploy modes.
Job Monitoring: The spark-runtime-service monitors jobs in the WAITING and RUNNING states until they reach COMPLETE, ERROR, or CANCELLED states. Any detected changes in job status will trigger updates in the model manager.
Job Submission Process: Before submitting a job to the Spark cluster, the ModelOpPySparkDriver extracts
inputData
,outputData
, andstoredModel.modelAssets
from the MODEL_BATCH_JOB, MODEL_BATCH_TEST_JOB, or MODEL_BATCH_TRAINING_JOB and passes them as parameters to the function to be executed. These external file assets are provided as lists of assets:input_data
– [{"filename": "test.csv", "assetType": "EXTERNAL_FILE", "fileUrl": "hdfs:///hadoop/test.csv"}]output_data
– [{"filename": "output.csv", "assetType": "EXTERNAL_FILE", "fileUrl": "hdfs:///hadoop/output.csv"}]
Logging: The YarnMonitorService component listens for and stores the stdout and stderr outputs produced by the running job on the Spark cluster. The storage of stdout and stderr in jobMessages is configurable.
Architecture Overview
The following diagram represents a high level overview on how core components interact with each other, when a Spark job is submitted to the cluster.
From ModelOp Center Batch Job toPre spark submit process flow
The following diagram illustrates the transformation process from a ModelOp Center Batch job into a SparkLauncher:
Monitoring Spark jobs
ModelOp Center has a component called ModelOpJobMonitor
in charge of:
Submitting jobs to the
Spark
clusterMonitoring job statuses
In order to successfully fulfill these responsibilities, it relies on three key services:
SparkLauncherService
–component in charge of translating jobs intoSparkLauncher
objectsYarnMonitorService
– component in charge of fetching job statuses and output produced from theSpark
clusterKerberizedYarnMonitorService
– component that is an instance ofYarnMonitorService
and in charge of authenticating the principal withKerberos
before delegating control toYarnMonitorService
Supported Spark jobs types
Scoring jobs
Metrics jobs - these jobs produce a
ModelTestResult
PySpark Job Inputs and Outputs
Spark jobs
support the next two types of input and output assets:
HDFS asset(s) by URL also known as External File asset(s)
Embedded asset(s) also known as File asset(s)
With the next set of rules and restrictions:
Input
Input Assets by URL
When the job includes input HDFS asset(s) by URL, it’s required that all HDFS asset(s) are present and available inside the Spark
cluster before the job is submitted to the Spark
cluster for execution.
Input Data Embedded
When a job includes embedded input data assets, the process will generate temporary HDFS files for each of these embedded assets. These newly created HDFS files will be represented as HDFS assets by their URLs and will be used as input data when the job is submitted to the Spark cluster for execution.
This change from embedded assets to HDFS assets by URL is not visible to the user, as the ModelOp Center Job is not updated to reflect this transformation. Instead, this change is managed internally by the PySparkPreprocessorService.
Output
Output Data by URL
When a job includes predefined output HDFS asset(s) by URL, the job output will be generated and stored at the predefined HDFS file locations.
Output Data Embedded
When the user provides embedded output data assets, the spark-runtime-service will create a temporary HDFS directory for the job if one does not already exist. The following steps are then performed in sequence:
Represent each embedded output data asset as an HDFS asset by URL, where the file URL of the new HDFS asset corresponds to the temporary HDFS directory and the filename matches the original embedded asset.
Update the ModelOp Center job by storing the HDFS assets by URL in its job parameters map, using the key TMP_EXTERNAL_OUTPUT.
Use the newly created HDFS assets by URL as the output data when submitting the job to the Spark cluster for execution.
Download each HDFS asset by URL and store its content as the file content for the corresponding embedded output data asset in the MODEL_BATCH_JOB, MODEL_BATCH_TEST_JOB, or MODEL_BATCH_TRAINING_JOB.
Remove TMP_EXTERNAL_OUTPUT from the job parameters map.
This change from embedded assets to HDFS assets by URL is not visible to the user, as the ModelOp Center Job is not updated to reflect this transformation. Instead, this change is managed internally by the PySparkPreprocessorService.
ModelOpPySparkDriver.py
ModelOp Center leverages utilizes a predefined Pyspark
PySpark class called ModelOpPySparkDriver.py
to abstract the process of executing and orchestrating PySpark
jobs. Using the manage execution and job orchestration for Spark jobs. This class abstracts the complexity of these processes by taking ModelOp Center Batch Job Jobs as input , and then translating it as input for the translating them into the necessary format for job execution.
Expand | |||||
---|---|---|---|---|---|
| |||||
|
PySparkJobManifest
ModelOp Center decouples the transformation process that takes a MODEL_BATCH_JOB
/MODEL_BATCH_TEST_JOB
/MODEL_BATCH_TRAINING_JOB
as input and produces a SparkLauncher
as output. For this purpose, ModelOp Center uses the PySparkJobManifest
intermediate component. PySparkJobManifest
contains all the required values when running a spark-submit
: Python
source code and metadata, function name, application name (all of which are used inside ModelOpPySparkDriver
). This design allows us to decouple and to keep agnostic the spark-submit
.
The following diagram illustrates the transformation process of a MODEL_BATCH_JOB
/MODEL_BATCH_TEST_JOB
/MODEL_BATCH_TRAINING_JOB
into a SparkLauncher
:
ModelOpJobMonitor
The ModelOpJobMonitor
has two main responsibilities:
Submitting jobs to the
Spark
clusterMonitoring job statuses for existing, waiting and running,
Spark
jobs.
In order to successfully fulfill these responsibilities, it relies on two key services:
SparkLauncherService
–component in charge of translating jobs into SparkLauncher
objectsYarnMonitorService
– component in charge of fetching job statuses and output produced from the Spark
cluster
KerberizedYarnMonitorService
– component that is an instance of YarnMonitorService
and in charge of authenticating the principal with Kerberos
before delegating control to YarnMonitorService
PySpark Job Input and Output Data
The spark-runtime-service
supports two types of input and output data for MODEL_BATCH_JOB
/MODEL_BATCH_TEST_JOB
/MODEL_BATCH_TRAINING_JOB
:
|
Embedded asset(s) a.k.a. File asset(s)
Input Data by URL
When the user provides HDFS asset(s) by URL as input data for a job, the HDFS asset(s) have to be present at the Spark
cluster before the job is submitted to the Spark
cluster for execution.
Input Data Embedded
When the user provides embedded input data asset(s), the spark-runtime-service
will create a temporary HDFS directory for the job and upload the file content of each embedded input data asset to HDFS. The newly created HDFS file(s) will be represented as HDFS asset(s) by URL and used as the input data when the job is submitted to the Spark
cluster for execution.
The change in the input data from embedded asset(s) to HDFS asset(s) by URL is not visible to the user because the MODEL_BATCH_JOB
/MODEL_BATCH_TEST_JOB
/MODEL_BATCH_TRAINING_JOB
is never updated to reflect this change. Instead, the change is handled internally in PySparkPreprocessorService
.
Output Data by URL
When the user provides HDFS asset(s) by URL as output data for a job, the job output will be stored at the given HDFS file url.
Output Data Embedded
When the user provides embedded output data asset(s), the spark-runtime-service
will create a temporary HDFS directory for the job, if one does not exist yet. Then, the following actions are performed in the given order:
Represent each embedded output data asset as HDFS asset by URL where the file url of the new HDFS asset is the temporary HDFS directory and the filename of the new HDFS asset is the filename of the corresponding embedded asset
Update
MODEL_BATCH_JOB
/MODEL_BATCH_TEST_JOB
/MODEL_BATCH_TRAINING_JOB
by storing the HDFS asset(s) by URL in its job parametersMap
where the key isTMP_EXTERNAL_OUTPUT
Use the newly created HDFS asset(s) by URL as the output data when the job is submitted to the
Spark
cluster for executionDownload each HDFS asset by URL and store its content as the file content for the corresponding embedded output data asset in
MODEL_BATCH_JOB
/MODEL_BATCH_TEST_JOB
/MODEL_BATCH_TRAINING_JOB
Remove
TMP_EXTERNAL_OUTPUT
from the job parametersMap
The change in the output data from embedded asset(s) to HDFS asset(s) by URL is not visible to the user because the MODEL_BATCH_JOB
/MODEL_BATCH_TEST_JOB
/MODEL_BATCH_TRAINING_JOB
is never updated to reflect this change. Instead, the change is handled internally in PySparkPreprocessorService
and ModelOpJobMonitor
.
For input and output data, the user can have any combination of HDFS asset(s) by URL or embedded asset(s).
PySpark - Metrics Jobs
Running jobs of typeMODEL_BATCH_TEST_JOB
is one of the key features of the spark-runtime-service
. These jobs produce a ModelTestResult
whose generation is performed by the MLC.