Spark Model Structure
A Spark
model for which a MODEL_BATCH_JOB
/MODEL_BATCH_TEST_JOB
/MODEL_BATCH_TRAINING_JOB
will be created needs Any Spark
scoring function code to be executed by the Spark-Runtime-Servive
needs to comply with the following structure:
Code Block | ||
---|---|---|
| ||
from __future__ import print_function from pyspark.sql import SparkSession from pyspark.ml.classification import RandomForestClassificationModel # modelop.score def <my-score-function-name>(external_inputs, external_outputs, external_model_assets): spark = SparkSession.builder.appName("<My-App-Name>").getOrCreate() ... # Read an input file that was uploaded to HDFS by the user df = (spark.read .format("csv") .option('header', 'true') .load(external_inputs[0]['fileUrl'])) # Read a model asset file/folder that was uploaded to HDFS by the user model = RandomForestClassificationModel.load(external_model_assets[0]['fileUrl']) ... # Write to an output file that will be created in HDFS predictions.write.csv(external_outputs[0]['fileUrl']) |
As defined in the model templateabove, the scoring function takes three receives the next 3 input parameters of type list. The external
External_inputs
external_outputs
, and modelAssets
lists will contain - generated from the
inputData
listExternal_outputs
- generated from theoutputData
listExternal_model_assets
- generated from themodelAssets
list (contains assets of typeEXTERNAL_FILE
external_inputs
and external_outputs
may contain EXTERNAL_FILE
assets representing any FILE
assets specified as part of the inputData
and/or outputData
, respectively. For more information on how FILE
assets that are part of the inputData
and/or outputData
).
For additional information on how these list of files are handled, please refer to PySpark Job Input and Output Data.It is assumed
that these external file assets Please note that there could be other types of assets included as part of modelAssets
, however this process only includes EXTERNAL_FILE
assets (e.g: FILE
and SOURCE_CODE
assets are not included).
Info |
---|
It is assumed that aforementioned external file assets exist in HDFS before the job is created and the user knows their location. |
external_inputs
are generated from theinputData
listexternal_outputs
are generated from theoutputData
listexternal_model_assets
are generated from themodelAssets
list
Each Python list contains entire Asset
objects, respectively. The external_model_assets
list will be empty if the model has no assets.
Please note that there could be other types of assets in modelAssets
, but we are only including the EXTERNAL_FILE
assets while excluding the FILE
and SOURCE_CODE
assets, if any.
Execution Flow in spark-runtime-service
Once the user creates their MODEL_BATCH_JOB
/MODEL_BATCH_TEST_JOB
/MODEL_BATCH_TRAINING_JOB
, the spark-runtime-service will be triggered. When the spark-runtime-service receives the job JSON, it will create the following files in its working directory:
model_source_code.py
Contains the primary source code which is retrieved from the job JSON (
model.storedModel.modelAssets
) where"assetType" = "SOURCE_CODE"
and"primaryModelSource" = true
model_job_metadata.py
Contains the values for external_inputs
, external_outputs
, and external_model_assets
in the following format:
language | py |
---|
Spark-runtime-service job processing
Every time a Spark job is about to be executed, the Spark-runtime-service
generates the next list of files, before job execution.
model_source_code.py
Contains the primary source code - extracted from
model.storedModel.modelAssets
where"assetType" = "SOURCE_CODE"
and"primaryModelSource" = true
model_job_metadata.py
Contains the values related to all assets involved during job execution (
external_inputs
,external_outputs
, andexternal_model_assets
in the following format):Code Block language py external_inputs = [ { "name": "test.csv", "assetType": "EXTERNAL_FILE", "fileUrl": "hdfs:///hadoop/test.csv", "filename": "test.csv", "fileFormat": "CSV" }] external_outputs = [ { "name": "titanic_output.csv", "assetType": "EXTERNAL_FILE", "fileUrl": "hdfs:///hadoop/testtitanic_output.csv", "filename": "testtitanic_output.csv", "fileFormat": "CSV" }] external_outputsmodel_assets = [ { "name": "titanic_output.csv", "assetType": "EXTERNAL_FILE", "fileUrl": "hdfs:///hadoop/titanic_output.csv", "filename": "titanic_output.csv", "fileFormat": "CSV" }] external_model_assetsmethod_to_be_executed = [ { "name": "titanic", "assetType": "EXTERNAL_FILE", "fileUrl": "hdfs:///hadoop/titanic", "filename": "titanic" }] method_to_be_executed = "<my-score-function-name>"
ModelOpPySparkDriver.py
Contains the code for
Spark
's application resource which you can see here. Also, it is from this file that the score function is called while passing in the values for the model’s three parameters
- If
modelAssets
contains assets where"assetType" = "FILE"
, then"<my-score-function-name>"
ModelOpPySparkDriver.py
Contains the default
Spark
's application execution code (for additional details please see here).
Info |
---|
This file is the one in charge of calling the score function. |
If
modelAssets
contains assets where"assetType" = "FILE"
, then it creates a file for each asset that matches the aforementioned criteria.If
modelAssets
contains assets where"assetType" = "SOURCE_CODE"
and"primaryModelSource" = false
, then it creates a file for each asset that matches the aforementioned criteria.
Once all required files have been created at the local file system, then files are uploaded to the cluster as part of the spark-submit
execution.
PySpark Job Inputs and Outputs
Spark jobs
support the next two types of input and output assets:
HDFS asset(s) by URL also known as External File asset(s)
Embedded asset(s) also known as File asset(s)
With the next set of rules and restrictions:
Input
Input Assets by URL
When the job includes input HDFS asset(s) by URL, it’s required that all HDFS asset(s) are present and available inside the Spark
cluster before the job is submitted to the Spark
cluster for execution.
Input Data Embedded
When a job includes embedded input data assets, the process will generate temporary HDFS files for each of these embedded assets. These newly created HDFS files will be represented as HDFS assets by their URLs and will be used as input data when the job is submitted to the Spark cluster for execution.
This change from embedded assets to HDFS assets by URL is not visible to the user, as the ModelOp Center Job is not updated to reflect this transformation. Instead, this change is managed internally by the PySparkPreprocessorService.
Output
Output Data by URL
When a job includes predefined output HDFS asset(s) by URL, the job output will be generated and stored at the predefined HDFS file locations.
Output Data Embedded
When the user provides embedded output data assets, the spark-runtime-service will create a
file in its workingtemporary HDFS directory for
each asset that matches the aforementioned criteria. An example of such file would be a schema fileIf modelAssets
contains assets where "assetType" = "SOURCE_CODE"
and "primaryModelSource" = false
, then the spark-runtime-service will create a file in its working directory for each asset that matches the aforementioned criteria.
Once these files (1-5) have been created, the spark-runtime-service will upload them to HDFS when spark-submit
is executed. None of these files will be included in external_inputs
, external_outputs
, or external_model_assets
. Since these files are not uploaded by the user directly, and instead when spark-submit
is executed, they will be stored at the following location:
Code Block |
---|
hdfs://<cluster-host>:<cluster-port>/user/<username>/.sparkStaging/<spark-application-id>/ModelOpPySparkDriver.py |
Therefore, if the user’s model source code uses any of these files, they must be read in pure Python with:
Code Block |
---|
open("ModelOpPySparkDriver.py", "r") |
Please note that accessing these files in the model source code is different from how we access the files included in external_inputs
,external_outputs
, and external_model_assets
.
Example modelBatchJob JSON
The followingmodelBatchJob
JSON was used to generate the example values:the job if one does not already exist. The following steps are then performed in sequence:
Represent each embedded output data asset as an HDFS asset by URL, where the file URL of the new HDFS asset corresponds to the temporary HDFS directory and the filename matches the original embedded asset.
Update the ModelOp Center job by storing the HDFS assets by URL in its job parameters map, using the key TMP_EXTERNAL_OUTPUT.
Use the newly created HDFS assets by URL as the output data when submitting the job to the Spark cluster for execution.
Download each HDFS asset by URL and store its content as the file content for the corresponding embedded output data asset in the MODEL_BATCH_JOB, MODEL_BATCH_TEST_JOB, or MODEL_BATCH_TRAINING_JOB.
Remove TMP_EXTERNAL_OUTPUT from the job parameters map.
This change from embedded assets to HDFS assets by URL is not visible to the user, as the ModelOp Center Job is not updated to reflect this transformation. Instead, this change is managed internally by the PySparkPreprocessorService.
Sample modelBatchJob JSON
Expand | |||||
---|---|---|---|---|---|
| |||||
|