Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This article provides details around how ModelOp Center an overview on how MOC leverages spark-submit to orchestrate the submission and management of execute Spark jobs within Spark environments.

Table of Contents

Table of Contents

Overview

The
minLevel1
maxLevel6
outlinefalse
styledefault
typelist
printabletrue

Design principles

To ensure efficient and robust execution of Spark jobs, the spark-submit

execution process was

process within ModelOp Center has been meticulously designed and architected

with the

. The following principles and features

in mind:When possible,

have been incorporated to optimize the integration and operation of Spark jobs in both local and cluster deploy modes:

  1. Code Implementation: Whenever possible, implement Python and/or PySpark components

should be implemented in python
  1. using Python. This

principle will ensure
  1. approach ensures easier testing and debugging

are easier when running
  1. in both local

or
  1. and cluster deploy

mode
  1. modes.

  2. Job Monitoring: The spark-runtime-service

is monitoring
  1. monitors jobs in the WAITING and RUNNING

state
  1. states until they reach

their
  1. COMPLETE, ERROR, or CANCELLED

state. If any new job changes are detected, model manager will be updated accordinglyBefore a job is submitted
  1. states. Any detected changes in job status will trigger updates in the model manager.

  2. Job Submission Process: Before submitting a job to the Spark cluster

for execution
  1. , the ModelOpPySparkDriver

implementation will extract the
  1. extracts inputData, outputData, and storedModel.modelAssets from the MODEL_BATCH_JOB

/
  1. , MODEL_BATCH_TEST_JOB

/
  1. , or MODEL_BATCH_TRAINING_JOB and

pass
  1. passes them

in
  1. as parameters to the function to be executed. These external file assets are

passed in
  1. provided as lists of

Assets
  1. assets:

    • input_data – [{

“filename“: “test.csv“, “assetType”: “EXTERNAL_FILE, “fileUrl”: “hdfs
    • "filename": "test.csv", "assetType": "EXTERNAL_FILE", "fileUrl": "hdfs:///hadoop/test.

csv“
    • csv"}]

    • output_data – [{

“filename“: “output.csv”, “assetType”: “EXTERNAL_FILE“, “fileUrl”: “hdfs
    • "filename": "output.csv", "assetType": "EXTERNAL_FILE", "fileUrl": "hdfs:///hadoop/output.

csv“
    • csv"}]

  1. Logging: The YarnMonitorService component

should be listening
  1. listens for and

storing in jobMessage
  1. stores the stdout and stderr outputs produced by the running job

that is running at
  1. on the Spark cluster.

Whether to store or not store
  1. The storage of stdout and stderr in jobMessages is configurable.

Architecture Overview

The following diagram represents a high level overview on how core components interact with each other,

and gives a high level overview of the process flow from a MODEL_BATCH_JOB/MODEL_BATCH_TEST_JOB/MODEL_BATCH_TRAINING_JOB to a Spark job running at the Spark cluster.Image Removed

Details

PySpark Dynamic Job Abstraction

ModelOp Center abstracts the process of executing PySpark jobs using MODEL_BATCH_JOB/MODEL_BATCH_TEST_JOB/MODEL_BATCH_TRAINING_JOB as input. The approach uses a main PySpark Driver (ModelOpPySparkDriver.py ) that imports the job metadata (details about job input, output and model assets, as well as the name of the method to be invoked), the model source code to be executed, and then just a simple main that will be able to call the desired function

when a Spark job is submitted to the cluster.

Image Added

Pre spark submit process flow

The following diagram illustrates the transformation process from a ModelOp Center Batch job into a SparkLauncher:

Image Added

Monitoring Spark jobs

ModelOp Center has a component called ModelOpJobMonitor in charge of:

  1. Submitting jobs to the Spark cluster

  2. Monitoring job statuses

In order to successfully fulfill these responsibilities, it relies on three key services:

  1. SparkLauncherServicecomponent in charge of translating jobs into SparkLauncher objects

  2. YarnMonitorService – component in charge of fetching job statuses and output produced from the Spark cluster

  3. KerberizedYarnMonitorService – component that is an instance of YarnMonitorService and in charge of authenticating the principal with Kerberos before delegating control to YarnMonitorService

Image Added


Supported Spark jobs types


ModelOpPySparkDriver.py

ModelOp Center utilizes a predefined PySpark class called ModelOpPySparkDriver.py to manage execution and job orchestration for Spark jobs. This class abstracts the complexity of these processes by taking ModelOp Center Batch Jobs as input and translating them into the necessary format for job execution.

Expand
titleModelOpPySparkDriver.py

Code Block
languagepy
########################################################################################
##### dynamic model / resources content.
########################################################################################

# primary source code
import model_source_code as modelop_model_source_code
# job metadata, variables pointing to [input, output, appname,etc]
import modelop_job_metadata as modelop_job_metadata
# for printing stack trace
import traceback
# for checking number of params in function signature
from inspect import signature


########################################################################################
##### static resources content.
########################################################################################

if __name__ == "__main__":
    try:
        print("### - ModelOpPySparkExecution execution")

        ## Check for an init function to be executed; An init function is optional
        init_method = getattr(modelop_model_source_code, modelop_job_metadata.init_method_to_be_executed, None)
        if init_method is None:
            print("### - Optional init '" + modelop_job_metadata.init_method_to_be_executed + "' function was not found in model source code")
        else:
            print("### - Executing function: " + modelop_job_metadata.init_method_to_be_executed)
            init_method()

        ## Check for a scoring or metrics function to be executed; A scoring or metrics function is required
        execution_method = getattr(modelop_model_source_code, modelop_job_metadata.method_to_be_executed, None)
        if execution_method is None:
            print("### - '" + modelop_job_metadata.method_to_be_executed + "' function was not found in model source code")
            raise Exception("### - '" + modelop_job_metadata.method_to_be_executed + "' function was not found in model source code")

        ## Executing requested function
        print("### - Executing function: " + modelop_job_metadata.method_to_be_executed)
        num_params = len(signature(execution_method).parameters)
        if num_params == 0:
            execution_method()
        elif num_params == 3:
            execution_method(modelop_job_metadata.job_input, modelop_job_metadata.job_output, modelop_job_metadata.job_model_assets_input)
        else:
            traceback.print_exc()
            raise Exception("Number of function parameters, '" + str(num_params) + "', does not match expected number of parameters (0 or 3)")

    except Exception as e:
        print("### - An exception of ", e.__class__, " occurred")
        traceback.print_exc()
        print("#################### RUNTIME ERROR ####################")
        raise Exception("### - An exception occurred in the ModelOpPySparkDriver")
    print("### - PySpark code execution completed"

)

PySparkJobManifest

ModelOp Center decouples the transformation process that takes a MODEL_BATCH_JOB/MODEL_BATCH_TEST_JOB/MODEL_BATCH_TRAINING_JOB as input and produces a SparkLauncher as output. For this purpose, ModelOp Center uses the PySparkJobManifest intermediate component. PySparkJobManifest contains all the required values when running a spark-submit: Python source code and metadata, function name, application name (all of which are used inside ModelOpPySparkDriver

)

. This design allows us to decouple and to keep agnostic the spark-submit.Image Removed

The following diagram illustrates the transformation process of a MODEL_BATCH_JOB/MODEL_BATCH_TEST_JOB/MODEL_BATCH_TRAINING_JOB into a SparkLauncher:

Image Removed

ModelOpJobMonitor

The ModelOpJobMonitor has two main responsibilities:

  1. Submitting jobs to the Spark cluster

  2. Monitoring job statuses for existing, waiting and running, Spark jobs.

In order to successfully fulfill these responsibilities, it relies on two key services:

SparkLauncherService–component in charge of translating jobs into SparkLauncher objects
  • YarnMonitorService – component in charge of fetching job statuses and output produced from the Spark cluster

  • KerberizedYarnMonitorService – component that is an instance of YarnMonitorService and in charge of authenticating the principal with Kerberos before delegating control to YarnMonitorService

  • Image Removed

    PySpark Job Input and Output Data

    The spark-runtime-service supports two types of input and output data for MODEL_BATCH_JOB/MODEL_BATCH_TEST_JOB/MODEL_BATCH_TRAINING_JOB:

    1. HDFS asset(s) by URL a.k.a. External asset(s)

    2. Embedded asset(s) a.k.a. File asset(s)

    Input Data by URL

    When the user provides HDFS asset(s) by URL as input data for a job, the HDFS asset(s) have to be present at the Spark cluster before the job is submitted to the Spark cluster for execution.

    Input Data Embedded

    When the user provides embedded input data asset(s), the spark-runtime-service will create a temporary HDFS directory for the job and upload the file content of each embedded input data asset to HDFS. The newly created HDFS file(s) will be represented as HDFS asset(s) by URL and used as the input data when the job is submitted to the Spark cluster for execution.

    The change in the input data from embedded asset(s) to HDFS asset(s) by URL is not visible to the user because the MODEL_BATCH_JOB/MODEL_BATCH_TEST_JOB/MODEL_BATCH_TRAINING_JOB is never updated to reflect this change. Instead, the change is handled internally in PySparkPreprocessorService.

    Output Data by URL

    When the user provides HDFS asset(s) by URL as output data for a job, the job output will be stored at the given HDFS file url.

    Output Data Embedded

    When the user provides embedded output data asset(s), the spark-runtime-service will create a temporary HDFS directory for the job, if one does not exist yet. Then, the following actions are performed in the given order:

    1. Represent each embedded output data asset as HDFS asset by URL where the file url of the new HDFS asset is the temporary HDFS directory and the filename of the new HDFS asset is the filename of the corresponding embedded asset

    2. Update MODEL_BATCH_JOB/MODEL_BATCH_TEST_JOB/MODEL_BATCH_TRAINING_JOB by storing the HDFS asset(s) by URL in its job parameters Map where the key is TMP_EXTERNAL_OUTPUT

    3. Use the newly created HDFS asset(s) by URL as the output data when the job is submitted to the Spark cluster for execution

    4. Download each HDFS asset by URL and store its content as the file content for the corresponding embedded output data asset in MODEL_BATCH_JOB/MODEL_BATCH_TEST_JOB/MODEL_BATCH_TRAINING_JOB

    5. Remove TMP_EXTERNAL_OUTPUT from the job parameters Map

    The change in the output data from embedded asset(s) to HDFS asset(s) by URL is not visible to the user because the MODEL_BATCH_JOB/MODEL_BATCH_TEST_JOB/MODEL_BATCH_TRAINING_JOB is never updated to reflect this change. Instead, the change is handled internally in PySparkPreprocessorService and ModelOpJobMonitor.

    For input and output data, the user can have any combination of HDFS asset(s) by URL or embedded asset(s).

    PySpark - Metrics Jobs

    Running jobs of type MODEL_BATCH_TEST_JOB is one of the key features of the spark-runtime-service. These jobs produce a ModelTestResult whose generation is performed by the MLC.