Format of Spark Model
A Spark model for which a modelBatchJob
will be created needs to comply with the following structure:
from __future__ import print_function from pyspark.sql import SparkSession from pyspark.ml.classification import RandomForestClassificationModel # modelop.score def <my-score-function-name>(external_inputs, external_outputs, external_model_assets): spark = SparkSession.builder.appName("<My-App-Name>").getOrCreate() ... # Read an input file that was uploaded to HDFS by the user df = (spark.read .format("csv") .option('header', 'true') .load(external_inputs[0]['fileUrl'])) # Read a model asset file/folder that was uploaded to HDFS by the user model = RandomForestClassificationModel.load(external_model_assets[0]['fileUrl']) ... # Write to an output file that will be created in HDFS predictions.write.csv(external_outputs[0]['fileUrl'])
As defined in the model template, the scoring function takes three parameters. Each parameter is a Python list where its elements match the "assetType" = "EXTERNAL_FILE"
condition. It is assumed that these external file assets exist in HDFS before the job is created and the user knows their location.
external_inputs
are generated from theinputData
listexternal_outputs
are generated from theoutputData
listexternal_model_assets
are generated from themodelAssets
list
Each Python list contains entire Asset
objects, respectively. The external_model_assets
list will be empty if the model has no assets.
There could be other types of assets in inputData
, outputData
, and modelAssets
, but we are only including the EXTERNAL_FILE
assets while excluding the FILE
and SOURCE_CODE
assets, if any.
Execution Flow in spark-runtime-service
Once the user creates their modelBatchJob
, the spark-runtime-service will be triggered. When the spark-runtime-service receives the modelBatchJob
JSON, it will create the following files in its working directory:
model_source_code.py
Contains the primary source code which is retrieved from the modelBatchJob JSON (
model.storedModel.modelAssets
) where"assetType" = "SOURCE_CODE"
and"primaryModelSource" = true
model_job_metadata.py
Contains the values for
external_inputs
,external_outputs
, andexternal_model_assets
in the following format:external_inputs = [ { "name": "test.csv", "assetType": "EXTERNAL_FILE", "fileUrl": "/hadoop/test.csv", "filename": "test.csv", "fileFormat": "CSV" }] external_outputs = [ { "name": "titanic_output.csv", "assetType": "EXTERNAL_FILE", "fileUrl": "/hadoop/titanic_output.csv", "filename": "titanic_output.csv", "fileFormat": "CSV" }] external_model_assets = [ { "name": "titanic", "assetType": "EXTERNAL_FILE", "fileUrl": "/hadoop/titanic", "filename": "titanic" }] method_to_be_executed = "<my-score-function-name>"
ModelOpPySparkDriver.py
Contains the code for Spark’s application resource which you can see here. Also, it is from this file that the score function is called while passing in the values for the model’s three parameters
If
modelAssets
contains assets where"assetType" = "FILE"
, then the spark-runtime-service will create a file in its working directory for each asset that matches the aforementioned criteria. An example of such file would be a schema fileIf
modelAssets
contains assets where"assetType" = "SOURCE_CODE"
and"primaryModelSource" = false
, then the spark-runtime-service will create a file in its working directory for each asset that matches the aforementioned criteria.
Once these files (1-5) have been created, the spark-runtime-service will upload them to HDFS when spark-submit
is executed. None of these files will be included in external_inputs
, external_outputs
, or external_model_assets
. Since these files are not uploaded by the user directly, and instead when spark-submit
is executed, they will be stored at the following location:
hdfs://<cluster-host>:<cluster-port>/user/<username>/.sparkStaging/<spark-application-id>/ModelOpPySparkDriver.py
Therefore, if the user’s model source code uses any of these files, they must be read in pure Python with:
open("ModelOpPySparkDriver.py", "r")
Please note that accessing these files in the model source code is different from how we access the files included in external_inputs
,external_outputs
, and external_model_assets
.
Example modelBatchJob JSON
The following modelBatchJob
JSON was used to generate the example values: