Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This article provides details on how the requirements to set up a model that executes a Spark scoring job

Table of Contents

Table of Contents

Spark Model Structure

Any Spark scoring function code to be executed by the Spark-Runtime-Servive needs to comply with the following structure:

Code Block
languagepy
from __future__ import print_function

from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassificationModel

# modelop.score
def <my-score-function-name>(external_inputs, external_outputs, external_model_assets):
    spark = SparkSession.builder.appName("<My-App-Name>").getOrCreate()
    
    ... 
    
    # Read an input file that was uploaded to HDFS by the user
    df = (spark.read
            .format("csv")
            .option('header', 'true')
            .load(external_inputs[0]['fileUrl']))
    
    # Read a model asset file/folder that was uploaded to HDFS by the user
    model = RandomForestClassificationModel.load(external_model_assets[0]['fileUrl'])
      
    ...
       
    # Write to an output file that will be created in HDFS
    predictions.write.csv(external_outputs[0]['fileUrl'])

As defined above, the scoring function receives the next 3 input parameters of type list.

  1. External_inputs - generated from the inputData list

  2. External_outputs - generated from the outputData list

  3. External_model_assets - generated from the modelAssets list (contains assets of type EXTERNAL_FILE).

For additional information on how these list of files are handled, please refer to PySpark Job Input and Output Data.

Please note that there could be other types of assets included as part of modelAssets, however this process only includes EXTERNAL_FILE assets (e.g: FILE and SOURCE_CODE assets are not included).

Info

It is assumed that aforementioned external file assets exist in HDFS before the job is created and the user knows their location.


Spark-runtime-service job processing

Every time a Spark job is about to be executed, the Spark-runtime-service generates the next list of files, before job execution.

  1. model_source_code.py

    1. Contains the primary source code - extracted from model.storedModel.modelAssets where "assetType" = "SOURCE_CODE" and "primaryModelSource" = true

  2. model_job_metadata.py

    1. Contains the values related to all assets involved during job execution ( external_inputs, external_outputs, and external_model_assets in the following format):

      Code Block
      languagepy
      external_inputs = [
        {
          "name": "test.csv",
          "assetType": "EXTERNAL_FILE",
          "fileUrl": "hdfs:///hadoop/test.csv",
          "filename": "test.csv",
          "fileFormat": "CSV"
        }]
      
      external_outputs = [
         {
          "name": "titanic_output.csv",
          "assetType": "EXTERNAL_FILE",
          "fileUrl": "hdfs:///hadoop/titanic_output.csv",
          "filename": "titanic_output.csv",
          "fileFormat": "CSV"
        }]
      
      
      external_model_assets = [
        {
          "name": "titanic",
          "assetType": "EXTERNAL_FILE",
          "fileUrl": "hdfs:///hadoop/titanic",
          "filename": "titanic"
        }]
      
      method_to_be_executed = "<my-score-function-name>"
  3. ModelOpPySparkDriver.py

    1. Contains the default Spark's application execution code (for additional details please see here).

Info

This file is the one in charge of calling the score function.

  1. If modelAssets contains assets where "assetType" = "FILE", then it creates a file for each asset that matches the aforementioned criteria.

  2. If modelAssets contains assets where "assetType" = "SOURCE_CODE" and "primaryModelSource" = false, then it creates a file for each asset that matches the aforementioned criteria.

Once all required files have been created at the local file system, then files are uploaded to the cluster as part of the spark-submit execution.


Sample modelBatchJob JSON

Expand
titlemodelBatchJob JSON
Code Block
languagejson
{
  "jobType": "MODEL_BATCH_JOB",
  "jobStatus": "CREATED",
  "model": {
    "storedModel": {
      "modelAssets": [
        {
          "name": "titanic.py",
          "assetType": "SOURCE_CODE",
          "primaryModelSource": true,
          "scoreFunction": "score_function",
          "sourceCode": "from __future__ import print_function\n\nimport time\nimport sys\nfrom random import random\nfrom operator import add\n\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.functions import col\nfrom pyspark.sql.functions import isnull, when, count\nfrom pyspark.sql.functions import udf\nfrom pyspark.sql.types import ArrayType, FloatType\nfrom pyspark.ml.feature import StringIndexer\nfrom pyspark.ml.feature import VectorAssembler\nfrom pyspark.ml.classification import RandomForestClassificationModel\n\ndef score_function(input_files, output_files, model_attachment_files):\n   spark = SparkSession\\\n       .builder\\\n       .appName(\"Titanic\")\\\n       .getOrCreate()\n\n   print('TITANIC FILE URL = ' + str(model_attachment_files[0]['fileUrl']))\n   df = (spark.read\n         .format(\"csv\")\n         .option('header', 'true')\n         .load(input_files[0]['fileUrl']))\n\n   dataset = df.select(col('Pclass').cast('float'),\n                       col('Sex'),\n                       col('Age').cast('float'),\n                       col('Fare').cast('float'),\n                       col('Embarked')\n                       )\n\n   dataset = dataset.replace('?', None)\\\n       .dropna(how='any')\n\n   dataset = StringIndexer(\n       inputCol='Sex',\n       outputCol='Gender',\n       handleInvalid='keep').fit(dataset).transform(dataset)\n\n   dataset = StringIndexer(\n       inputCol='Embarked',\n       outputCol='Boarded',\n       handleInvalid='keep').fit(dataset).transform(dataset)\n\n   dataset = dataset.drop('Sex')\n   dataset = dataset.drop('Embarked')\n   required_features = ['Pclass',\n                   'Age',\n                   'Fare',\n                   'Gender',\n                   'Boarded'\n                  ]\n\n   assembler = VectorAssembler(inputCols=required_features, outputCol='features')\n   transformed_data = assembler.transform(dataset)\n\n   time.sleep(90)\n\n   model = RandomForestClassificationModel.load(model_attachment_files[0]['fileUrl'])\n\n   predictions = model.transform(transformed_data)\n   get_propensity = udf(lambda x: x[1], ArrayType(FloatType()))\n   print(predictions.head(5))\n   predictions = predictions.select('Pclass',\n                       'Age',\n                       'Gender',\n                       'Fare',\n                       'Boarded',\n                       'prediction'\n                       )\n\n   print(predictions.head(5))\n   predictions.write.csv(output_files[0]['fileUrl'])\n   spark.stop()"
        },
        {
          "name": "titanic",
          "assetType": "EXTERNAL_FILE",
          "fileUrl": "hdfs:///hadoop/titanic",
          "filename": "titanic"
        }
      ],
      "modelMetaData": {
        "type": "UNKNOWN"
      }
    }
  },
  "inputData": [
    {
      "name": "test.csv",
      "assetType": "EXTERNAL_FILE",
      "fileUrl": "hdfs:///hadoop/test.csv",
      "filename": "test.csv",
      "fileFormat": "CSV"
    }
  ],
  "outputData": [
    {
      "name": "titanic_output.csv",
      "assetType": "EXTERNAL_FILE",
      "fileUrl": "hdfs:///hadoop/titanic_output.csv",
      "filename": "titanic_output.csv",
      "fileFormat": "CSV"
    }
  ],
  "targetEngine": {
    "engineType": "SPARK_RUNTIME",
    "name": "spark-runtime-service",
    "executorMemory": "512MB",
    "sparkConf": {
      "--master": "yarn"
    }
  }
}