Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This article provides details for preparing a Spark Model to execute within ModelOp Center.on how to set up a model that executes a Spark scoring job

Table of Contents

Table of Contents

Format of

Spark Model Structure

A Spark model for which a MODEL_BATCH_JOB/MODEL_BATCH_TEST_JOB/MODEL_BATCH_TRAINING_JOB will be created Any Spark scoring function code to be executed by the Spark-Runtime-Servive needs to comply with the following structure:

Code Block
languagepy
from __future__ import print_function

from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassificationModel

# modelop.score
def <my-score-function-name>(external_inputs, external_outputs, external_model_assets):
    spark = SparkSession.builder.appName("<My-App-Name>").getOrCreate()
    
    ... 
    
    # Read an input file that was uploaded to HDFS by the user
    df = (spark.read
            .format("csv")
            .option('header', 'true')
            .load(external_inputs[0]['fileUrl']))
    
    # Read a model asset file/folder that was uploaded to HDFS by the user
    model = RandomForestClassificationModel.load(external_model_assets[0]['fileUrl'])
      
    ...
       
    # Write to an output file that will be created in HDFS
    predictions.write.csv(external_outputs[0]['fileUrl'])

As defined in the model templateabove, the scoring function takes three receives the next 3 input parameters of type list. The external

  1. External_inputs

, external_outputs, and modelAssets lists will contain
  1. - generated from the inputData list

  2. External_outputs - generated from the outputData list

  3. External_model_assets - generated from the modelAssets list (contains assets of type EXTERNAL_FILE

. The external_inputs and external_outputs may contain EXTERNAL_FILE assets representing any FILE assets specified as part of the inputData and/or outputData, respectively. For more information on how FILE assets that are part of the inputData and/or outputData
  1. ).

For additional information on how these list of files are handled, please refer to PySpark Job Input and Output Data.It is assumed

that these Please note that there could be other types of assets included as part of modelAssets, however this process only includes EXTERNAL_FILE assets (e.g: FILE and SOURCE_CODE assets are not included).

Info

It is assumed that aforementioned external file assets exist in HDFS before the job is created and the user knows their location.

  1. external_inputs are generated from the inputData list

  2. external_outputs are generated from the outputData list

  3. external_model_assets are generated from the modelAssets list

Each Python list contains entire Asset objects, respectively. The external_model_assets list will be empty if the model has no assets.

Please note that there could be other types of assets in modelAssets, but we are only including the EXTERNAL_FILE assets while excluding the FILE and SOURCE_CODE assets, if any.

Execution Flow in spark-runtime-service

Once the user creates their MODEL_BATCH_JOB/MODEL_BATCH_TEST_JOB/MODEL_BATCH_TRAINING_JOB, the spark-runtime-service will be triggered. When the spark-runtime-service receives the job JSON, it will create the following files in its working directory:

  • model_source_code.py

    1. Contains the primary source code which is retrieved from the job JSON (model.storedModel.modelAssets) where "assetType" = "SOURCE_CODE" and "primaryModelSource" = true


  • model_job_metadata.py

    Contains the values for external_inputs, external_outputs, and external_model_assets in the following format:

    Code Block
    languagepy
    external_inputs = [

    Spark-runtime-service job processing

    Every time a Spark job is about to be executed, the Spark-runtime-service generates the next list of files, before job execution.

    1. model_source_code.py

      1. Contains the primary source code - extracted from model.storedModel.modelAssets where "assetType" = "SOURCE_CODE" and "primaryModelSource" = true

    2. model_job_metadata.py

      1. Contains the values related to all assets involved during job execution ( external_inputs, external_outputs, and external_model_assets in the following format):

        Code Block
        languagepy
        external_inputs = [
          {
            "name": "test.csv",
            "assetType": "EXTERNAL_FILE",
            "fileUrl": "hdfs:///hadoop/test.csv",
            "filename": "test.csv",
            "fileFormat": "CSV"
          }]
        
        external_outputs = [
           {
            "name": "titanic_output.csv",
            "assetType": "EXTERNAL_FILE",
            "fileUrl": "hdfs:///hadoop/titanic_output.csv",
            "filename": "titanic_output.csv",
            "fileFormat": "CSV"
          }]
        
        
        external_model_assets = [
          {
            "name": "titanic",
            "assetType": "EXTERNAL_FILE",
            "fileUrl": "hdfs:///hadoop/titanic",
            "filename": "titanic"
          }]
        
        method_to_be_executed = "<my-score-function-name>"
    3. ModelOpPySparkDriver.py

      1. Contains the code for default Spark's application resource which you can execution code (for additional details please see here. Also, it is from this file that the score function is called while passing in the values for the model’s three parameters).

    Info

    This file is the one in charge of calling the score function.

    Image Modified
    1. If modelAssets contains assets where "assetType" = "FILE", then

    the spark-runtime-service will create
    1. it creates a file

    in its working directory
    1. for each asset that matches the aforementioned criteria

    . An example of such file would be a schema file
  • If modelAssets contains assets where "assetType" = "SOURCE_CODE" and "primaryModelSource" = false, then the spark-runtime-service will create a file in its working directory for each asset that matches the aforementioned criteria.

  • Once these files (1-5) have been created, the spark-runtime-service will upload them to HDFS when spark-submit is executed. None of these files will be included in external_inputs, external_outputs, or external_model_assets. Since these files are not uploaded by the user directly, and instead when spark-submit is executed, they will be stored at the following location:

    Code Block
    hdfs://<cluster-host>:<cluster-port>/user/<username>/.sparkStaging/<spark-application-id>/ModelOpPySparkDriver.py 

    Therefore, if the user’s model source code uses any of these files, they must be read in pure Python with:

    Code Block
    open("ModelOpPySparkDriver.py", "r")

    Please note that accessing these files in the model source code is different from how we access the files included in external_inputs,external_outputs, and external_model_assets.

    Example modelBatchJob JSON

    The following modelBatchJob JSON was used to generate the example values:
    1. .

    2. If modelAssets contains assets where "assetType" = "SOURCE_CODE" and "primaryModelSource" = false, then it creates a file for each asset that matches the aforementioned criteria.

    Once all required files have been created at the local file system, then files are uploaded to the cluster as part of the spark-submit execution.


    Sample modelBatchJob JSON

    Expand
    titlemodelBatchJob JSON
    Code Block
    languagejson
    {
      "jobType": "MODEL_BATCH_JOB",
      "jobStatus": "CREATED",
      "model": {
        "storedModel": {
          "modelAssets": [
            {
              "name": "titanic.py",
              "assetType": "SOURCE_CODE",
              "primaryModelSource": true,
              "scoreFunction": "score_function",
              "sourceCode": "from __future__ import print_function\n\nimport time\nimport sys\nfrom random import random\nfrom operator import add\n\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.functions import col\nfrom pyspark.sql.functions import isnull, when, count\nfrom pyspark.sql.functions import udf\nfrom pyspark.sql.types import ArrayType, FloatType\nfrom pyspark.ml.feature import StringIndexer\nfrom pyspark.ml.feature import VectorAssembler\nfrom pyspark.ml.classification import RandomForestClassificationModel\n\ndef score_function(input_files, output_files, model_attachment_files):\n   spark = SparkSession\\\n       .builder\\\n       .appName(\"Titanic\")\\\n       .getOrCreate()\n\n   print('TITANIC FILE URL = ' + str(model_attachment_files[0]['fileUrl']))\n   df = (spark.read\n         .format(\"csv\")\n         .option('header', 'true')\n         .load(input_files[0]['fileUrl']))\n\n   dataset = df.select(col('Pclass').cast('float'),\n                       col('Sex'),\n                       col('Age').cast('float'),\n                       col('Fare').cast('float'),\n                       col('Embarked')\n                       )\n\n   dataset = dataset.replace('?', None)\\\n       .dropna(how='any')\n\n   dataset = StringIndexer(\n       inputCol='Sex',\n       outputCol='Gender',\n       handleInvalid='keep').fit(dataset).transform(dataset)\n\n   dataset = StringIndexer(\n       inputCol='Embarked',\n       outputCol='Boarded',\n       handleInvalid='keep').fit(dataset).transform(dataset)\n\n   dataset = dataset.drop('Sex')\n   dataset = dataset.drop('Embarked')\n   required_features = ['Pclass',\n                   'Age',\n                   'Fare',\n                   'Gender',\n                   'Boarded'\n                  ]\n\n   assembler = VectorAssembler(inputCols=required_features, outputCol='features')\n   transformed_data = assembler.transform(dataset)\n\n   time.sleep(90)\n\n   model = RandomForestClassificationModel.load(model_attachment_files[0]['fileUrl'])\n\n   predictions = model.transform(transformed_data)\n   get_propensity = udf(lambda x: x[1], ArrayType(FloatType()))\n   print(predictions.head(5))\n   predictions = predictions.select('Pclass',\n                       'Age',\n                       'Gender',\n                       'Fare',\n                       'Boarded',\n                       'prediction'\n                       )\n\n   print(predictions.head(5))\n   predictions.write.csv(output_files[0]['fileUrl'])\n   spark.stop()"
            },
            {
              "name": "titanic",
              "assetType": "EXTERNAL_FILE",
              "fileUrl": "hdfs:///hadoop/titanic",
              "filename": "titanic"
            }
          ],
          "modelMetaData": {
            "type": "UNKNOWN"
          }
        }
      },
      "inputData": [
        {
          "name": "test.csv",
          "assetType": "EXTERNAL_FILE",
          "fileUrl": "hdfs:///hadoop/test.csv",
          "filename": "test.csv",
          "fileFormat": "CSV"
        }
      ],
      "outputData": [
        {
          "name": "titanic_output.csv",
          "assetType": "EXTERNAL_FILE",
          "fileUrl": "hdfs:///hadoop/titanic_output.csv",
          "filename": "titanic_output.csv",
          "fileFormat": "CSV"
        }
      ],
      "targetEngine": {
        "engineType": "SPARK_RUNTIME",
        "name": "spark-runtime-service",
        "executorMemory": "512MB",
        "sparkConf": {
          "--master": "yarn"
        }
      }
    }