Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Current »

This article provides details for preparing a Spark Model to execute within ModelOp Center.

Table of Contents

Format of Spark Model

A Spark model for which a modelBatchJob will be created needs to comply with the following structure:

from __future__ import print_function

from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassificationModel

# modelop.score
def <my-score-function-name>(input_files, output_files, model_attachment_files):
    spark = SparkSession.builder.appName("<My-App-Name>").getOrCreate()
    
    ... 
    
    # Read an input file that was uploaded to HDFS by the user
    df = (spark.read
            .format("csv")
            .option('header', 'true')
            .load(input_files[0]['fileUrl']))
    
    # Read a model attachment file/folder that was uploaded to HDFS by the user
    model = RandomForestClassificationModel.load(model_attachment_files[0]['fileUrl'])
      
    ...
       
    # Write to an output file that will be created in HDFS
    predictions.write.csv(output_files[0]['fileUrl'])

As defined in the model template, the scoring function takes three parameters. Each parameter is a Python list where its elements match the "assetType" = "EXTERNAL_FILE" condition. It is assumed that these external file assets exist in HDFS before the job is created and the user knows their location.

  1. input_files are generated from the inputData list

  2. output_files are generated from the outputData list

  3. model_attachment_files are generated from the modelAssets list

Each Python list contains entire Asset object, respectively. The model_attachment_files list will be empty if the model has no attachments.

There could be other types of assets in inputData, outputData, and modelAssets, but we are only including the EXTERNAL_FILE assets while excluding the FILE and SOURCE_CODE assets, if any.


Execution Flow in spark-runtime-service

Once the user creates their modelBatchJob, the spark-runtime-service will be triggered. When the spark-runtime-service receives the modelBatchJob JSON, it will create the following files in its working directory:

  1. model_source_code.py

    1. Contains the primary source code which is retrieved from the modelBatchJob JSON (model.storedModel.modelAssets) where "assetType" = "SOURCE_CODE" and "primaryModelSource" = true

  2. model_job_metadata.py

    1. Contains the values for input_files, output_files, and model_attachment_files in the following format:

      job_input = [
        {
          "name": "test.csv",
          "assetType": "EXTERNAL_FILE",
          "fileUrl": "/hadoop/test.csv",
          "filename": "test.csv",
          "fileFormat": "CSV"
        }]
      
      job_output = [
         {
          "name": "titanic_output.csv",
          "assetType": "EXTERNAL_FILE",
          "fileUrl": "/hadoop/titanic_output.csv",
          "filename": "titanic_output.csv",
          "fileFormat": "CSV"
        }]
      
      
      job_model_assets_input = [
        {
          "name": "titanic",
          "assetType": "EXTERNAL_FILE",
          "fileUrl": "/hadoop/titanic",
          "filename": "titanic"
        }
      ]
      
      method_to_be_executed = "<my-score-function-name>"
  3. ModelOpPySparkDriver.py

    1. Contains the code for Spark’s application resource which you can see here. Also, it is from this file that the score function is called while passing in the values for the model’s three parameters

  4. If modelAssets contains assets where "assetType" = "FILE", then the spark-runtime-service will create a file in its working directory for each asset that matches the aforementioned criteria. An example of such file would be a schema file

  5. If modelAssets contains assets where "assetType" = "SOURCE_CODE" and "primaryModelSource" = false, then the spark-runtime-service will create a file in its working directory for each asset that matches the aforementioned criteria.

Once these files (1-5) have been created, the spark-runtime-service will upload them to HDFS when spark-submit is executed. None of these files will be included in input_files, output_files, or model_attachment_files. Since these files are not uploaded by the user directly, and instead when spark-submit is executed, they will be stored at the following location:

hdfs://<cluster-host>:<cluster-port>/user/<username>/.sparkStaging/<spark-application-id>/ModelOpPySparkDriver.py 

Therefore, if the user’s model source code uses any of these files, they must be read in pure Python with:

open("ModelOpPySparkDriver.py", "r")

Please note that accessing these files in the model source code is different from how we access the files included in input_files,output_files, and model_attachment_files.


Example modelBatchJob JSON

The following modelBatchJob JSON was used to generate the example values:

 modelBatchJob JSON
{
  "jobType": "MODEL_BATCH_JOB",
  "jobStatus": "CREATED",
  "model": {
    "storedModel": {
      "modelAssets": [
        {
          "name": "titanic.py",
          "assetType": "SOURCE_CODE",
          "primaryModelSource": true,
          "scoreFunction": "score_function",
          "sourceCode": "from __future__ import print_function\n\nimport time\nimport sys\nfrom random import random\nfrom operator import add\n\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.functions import col\nfrom pyspark.sql.functions import isnull, when, count\nfrom pyspark.sql.functions import udf\nfrom pyspark.sql.types import ArrayType, FloatType\nfrom pyspark.ml.feature import StringIndexer\nfrom pyspark.ml.feature import VectorAssembler\nfrom pyspark.ml.classification import RandomForestClassificationModel\n\ndef score_function(input_files, output_files, model_attachment_files):\n   spark = SparkSession\\\n       .builder\\\n       .appName(\"Titanic\")\\\n       .getOrCreate()\n\n   print('TITANIC FILE URL = ' + str(model_attachment_files[0]['fileUrl']))\n   df = (spark.read\n         .format(\"csv\")\n         .option('header', 'true')\n         .load(input_files[0]['fileUrl']))\n\n   dataset = df.select(col('Pclass').cast('float'),\n                       col('Sex'),\n                       col('Age').cast('float'),\n                       col('Fare').cast('float'),\n                       col('Embarked')\n                       )\n\n   dataset = dataset.replace('?', None)\\\n       .dropna(how='any')\n\n   dataset = StringIndexer(\n       inputCol='Sex',\n       outputCol='Gender',\n       handleInvalid='keep').fit(dataset).transform(dataset)\n\n   dataset = StringIndexer(\n       inputCol='Embarked',\n       outputCol='Boarded',\n       handleInvalid='keep').fit(dataset).transform(dataset)\n\n   dataset = dataset.drop('Sex')\n   dataset = dataset.drop('Embarked')\n   required_features = ['Pclass',\n                   'Age',\n                   'Fare',\n                   'Gender',\n                   'Boarded'\n                  ]\n\n   assembler = VectorAssembler(inputCols=required_features, outputCol='features')\n   transformed_data = assembler.transform(dataset)\n\n   time.sleep(90)\n\n   model = RandomForestClassificationModel.load(model_attachment_files[0]['fileUrl'])\n\n   predictions = model.transform(transformed_data)\n   get_propensity = udf(lambda x: x[1], ArrayType(FloatType()))\n   print(predictions.head(5))\n   predictions = predictions.select('Pclass',\n                       'Age',\n                       'Gender',\n                       'Fare',\n                       'Boarded',\n                       'prediction'\n                       )\n\n   print(predictions.head(5))\n   predictions.write.csv(output_files[0]['fileUrl'])\n   spark.stop()"
        },
        {
          "name": "titanic",
          "assetType": "EXTERNAL_FILE",
          "fileUrl": "/hadoop/titanic",
          "filename": "titanic"
        }
      ],
      "modelMetaData": {
        "type": "UNKNOWN"
      }
    }
  },
  "inputData": [
    {
      "name": "test.csv",
      "assetType": "EXTERNAL_FILE",
      "fileUrl": "/hadoop/test.csv",
      "filename": "test.csv",
      "fileFormat": "CSV"
    }
  ],
  "outputData": [
    {
      "name": "titanic_output.csv",
      "assetType": "EXTERNAL_FILE",
      "fileUrl": "/hadoop/titanic_output.csv",
      "filename": "titanic_output.csv",
      "fileFormat": "CSV"
    }
  ],
  "targetEngine": {
    "engineType": "SPARK_RUNTIME",
    "name": "spark-runtime-service",
    "executorMemory": "512MB",
    "sparkConf": {
      "--master": "yarn"
    }
  }
}

  • No labels