Spark Model Structure
Any Spark
scoring function code to be executed by the Spark-Runtime-Servive
needs to comply with the following structure:
from __future__ import print_function from pyspark.sql import SparkSession from pyspark.ml.classification import RandomForestClassificationModel # modelop.score def <my-score-function-name>(external_inputs, external_outputs, external_model_assets): spark = SparkSession.builder.appName("<My-App-Name>").getOrCreate() ... # Read an input file that was uploaded to HDFS by the user df = (spark.read .format("csv") .option('header', 'true') .load(external_inputs[0]['fileUrl'])) # Read a model asset file/folder that was uploaded to HDFS by the user model = RandomForestClassificationModel.load(external_model_assets[0]['fileUrl']) ... # Write to an output file that will be created in HDFS predictions.write.csv(external_outputs[0]['fileUrl'])
As defined above, the scoring function receives the next 3 input parameters of type list.
External_inputs
- generated from theinputData
listExternal_outputs
- generated from theoutputData
listExternal_model_assets
- generated from themodelAssets
list (contains assets of typeEXTERNAL_FILE
).
For additional information on how these list of files are handled, please refer to PySpark Job Input and Output Data.
Please note that there could be other types of assets included as part of modelAssets
, however this process only includes EXTERNAL_FILE
assets (e.g: FILE
and SOURCE_CODE
assets are not included).
It is assumed that aforementioned external file assets exist in HDFS before the job is created and the user knows their location.
Spark-runtime-service job processing
Every time a Spark job is about to be executed, the Spark-runtime-service
generates the next list of files, before job execution.
model_source_code.py
Contains the primary source code - extracted from
model.storedModel.modelAssets
where"assetType" = "SOURCE_CODE"
and"primaryModelSource" = true
model_job_metadata.py
Contains the values related to all assets involved during job execution (
external_inputs
,external_outputs
, andexternal_model_assets
in the following format):external_inputs = [ { "name": "test.csv", "assetType": "EXTERNAL_FILE", "fileUrl": "hdfs:///hadoop/test.csv", "filename": "test.csv", "fileFormat": "CSV" }] external_outputs = [ { "name": "titanic_output.csv", "assetType": "EXTERNAL_FILE", "fileUrl": "hdfs:///hadoop/titanic_output.csv", "filename": "titanic_output.csv", "fileFormat": "CSV" }] external_model_assets = [ { "name": "titanic", "assetType": "EXTERNAL_FILE", "fileUrl": "hdfs:///hadoop/titanic", "filename": "titanic" }] method_to_be_executed = "<my-score-function-name>"
ModelOpPySparkDriver.py
Contains the default
Spark
's application execution code (for additional details please see here).
This file is the one in charge of calling the score function.
If
modelAssets
contains assets where"assetType" = "FILE"
, then it creates a file for each asset that matches the aforementioned criteria.If
modelAssets
contains assets where"assetType" = "SOURCE_CODE"
and"primaryModelSource" = false
, then it creates a file for each asset that matches the aforementioned criteria.
Once all required files have been created at the local file system, then files are uploaded to the cluster as part of the spark-submit
execution.