Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This article provides an overview on how MOC leverages spark-submit to execute Spark jobs.

Table of Contents

Table of Contents
minLevel1
maxLevel6
outlinefalse
styledefault
typelist
printabletrue

Design principles

To ensure efficient and robust execution of Spark jobs, the spark-submit process within ModelOp Center has been meticulously designed and architected. The following principles and features have been incorporated to optimize the integration and operation of Spark jobs in both local and cluster deploy modes:

  1. Code Implementation: Whenever possible, implement Python and/or PySpark components using Python. This approach ensures easier testing and debugging in both local and cluster deploy modes.

  2. Job Monitoring: The spark-runtime-service monitors jobs in the WAITING and RUNNING states until they reach COMPLETE, ERROR, or CANCELLED states. Any detected changes in job status will trigger updates in the model manager.

  3. Job Submission Process: Before submitting a job to the Spark cluster, the ModelOpPySparkDriver extracts inputData, outputData, and storedModel.modelAssets from the MODEL_BATCH_JOB, MODEL_BATCH_TEST_JOB, or MODEL_BATCH_TRAINING_JOB and passes them as parameters to the function to be executed. These external file assets are provided as lists of assets:

    • input_data – [{"filename": "test.csv", "assetType": "EXTERNAL_FILE", "fileUrl": "hdfs:///hadoop/test.csv"}]

    • output_data – [{"filename": "output.csv", "assetType": "EXTERNAL_FILE", "fileUrl": "hdfs:///hadoop/output.csv"}]

  4. Logging: The YarnMonitorService component listens for and stores the stdout and stderr outputs produced by the running job on the Spark cluster. The storage of stdout and stderr in jobMessages is configurable.

Architecture Overview

The following diagram represents a high level overview on how core components interact with each other, when a Spark job is submitted to the cluster.

Pre spark submit process flow

The following diagram illustrates the transformation process from a ModelOp Center Batch job into a SparkLauncher:

Monitoring Spark jobs

ModelOp Center has a component called ModelOpJobMonitor in charge of:

  1. Submitting jobs to the Spark cluster

  2. Monitoring job statuses

In order to successfully fulfill these responsibilities, it relies on three key services:

  1. SparkLauncherServicecomponent in charge of translating jobs into SparkLauncher objects

  2. YarnMonitorService – component in charge of fetching job statuses and output produced from the Spark cluster

  3. KerberizedYarnMonitorService – component that is an instance of YarnMonitorService and in charge of authenticating the principal with Kerberos before delegating control to YarnMonitorService


Supported Spark jobs types

PySpark Job Inputs and Outputs

Spark jobs support the next two types of input and output assets:

  1. HDFS asset(s) by URL also known as External File asset(s)

  2. Embedded asset(s) also known as File asset(s)

With the next set of rules and restrictions:

Input

Input Assets by URL

When the job includes input HDFS asset(s) by URL, it’s required that all HDFS asset(s) are present and available inside the Spark cluster before the job is submitted to the Spark cluster for execution.

Input Data Embedded

When a job includes embedded input data assets, the process will generate temporary HDFS files for each of these embedded assets. These newly created HDFS files will be represented as HDFS assets by their URLs and will be used as input data when the job is submitted to the Spark cluster for execution.

This change from embedded assets to HDFS assets by URL is not visible to the user, as the ModelOp Center Job is not updated to reflect this transformation. Instead, this change is managed internally by the PySparkPreprocessorService.

Output

Output Data by URL

When a job includes predefined output HDFS asset(s) by URL, the job output will be generated and stored at the predefined HDFS file locations.

Output Data Embedded

When the user provides embedded output data assets, the spark-runtime-service will create a temporary HDFS directory for the job if one does not already exist. The following steps are then performed in sequence:

  1. Represent each embedded output data asset as an HDFS asset by URL, where the file URL of the new HDFS asset corresponds to the temporary HDFS directory and the filename matches the original embedded asset.

  2. Update the ModelOp Center job by storing the HDFS assets by URL in its job parameters map, using the key TMP_EXTERNAL_OUTPUT.

  3. Use the newly created HDFS assets by URL as the output data when submitting the job to the Spark cluster for execution.

  4. Download each HDFS asset by URL and store its content as the file content for the corresponding embedded output data asset in the MODEL_BATCH_JOB, MODEL_BATCH_TEST_JOB, or MODEL_BATCH_TRAINING_JOB.

  5. Remove TMP_EXTERNAL_OUTPUT from the job parameters map.

This change from embedded assets to HDFS assets by URL is not visible to the user, as the ModelOp Center Job is not updated to reflect this transformation. Instead, this change is managed internally by the PySparkPreprocessorService.

ModelOpPySparkDriver.py

ModelOp Center utilizes a predefined PySpark class called ModelOpPySparkDriver.py to manage execution and job orchestration for Spark jobs. This class abstracts the complexity of these processes by taking ModelOp Center Batch Jobs as input and translating them into the necessary format for job execution.

Expand
titleModelOpPySparkDriver.py

Code Block
languagepy
########################################################################################
##### dynamic model / resources content.
########################################################################################

# primary source code
import model_source_code as modelop_model_source_code
# job metadata, variables pointing to [input, output, appname,etc]
import modelop_job_metadata as modelop_job_metadata
# for printing stack trace
import traceback
# for checking number of params in function signature
from inspect import signature


########################################################################################
##### static resources content.
########################################################################################

if __name__ == "__main__":
    try:
        print("### - ModelOpPySparkExecution execution")

        ## Check for an init function to be executed; An init function is optional
        init_method = getattr(modelop_model_source_code, modelop_job_metadata.init_method_to_be_executed, None)
        if init_method is None:
            print("### - Optional init '" + modelop_job_metadata.init_method_to_be_executed + "' function was not found in model source code")
        else:
            print("### - Executing function: " + modelop_job_metadata.init_method_to_be_executed)
            init_method()

        ## Check for a scoring or metrics function to be executed; A scoring or metrics function is required
        execution_method = getattr(modelop_model_source_code, modelop_job_metadata.method_to_be_executed, None)
        if execution_method is None:
            print("### - '" + modelop_job_metadata.method_to_be_executed + "' function was not found in model source code")
            raise Exception("### - '" + modelop_job_metadata.method_to_be_executed + "' function was not found in model source code")

        ## Executing requested function
        print("### - Executing function: " + modelop_job_metadata.method_to_be_executed)
        num_params = len(signature(execution_method).parameters)
        if num_params == 0:
            execution_method()
        elif num_params == 3:
            execution_method(modelop_job_metadata.job_input, modelop_job_metadata.job_output, modelop_job_metadata.job_model_assets_input)
        else:
            traceback.print_exc()
            raise Exception("Number of function parameters, '" + str(num_params) + "', does not match expected number of parameters (0 or 3)")

    except Exception as e:
        print("### - An exception of ", e.__class__, " occurred")
        traceback.print_exc()
        print("#################### RUNTIME ERROR ####################")
        raise Exception("### - An exception occurred in the ModelOpPySparkDriver")
    print("### - PySpark code execution completed")