Spark Job execution requirements

This article provides details on the requirements to set up a model that executes a Spark scoring job

Table of Contents

 

Spark Model Structure

Any Spark scoring function code to be executed by the Spark-Runtime-Servive needs to comply with the following structure:

from __future__ import print_function from pyspark.sql import SparkSession from pyspark.ml.classification import RandomForestClassificationModel # modelop.score def <my-score-function-name>(external_inputs, external_outputs, external_model_assets): spark = SparkSession.builder.appName("<My-App-Name>").getOrCreate() ... # Read an input file that was uploaded to HDFS by the user df = (spark.read .format("csv") .option('header', 'true') .load(external_inputs[0]['fileUrl'])) # Read a model asset file/folder that was uploaded to HDFS by the user model = RandomForestClassificationModel.load(external_model_assets[0]['fileUrl']) ... # Write to an output file that will be created in HDFS predictions.write.csv(external_outputs[0]['fileUrl'])

 

As defined above, the scoring function receives the next 3 input parameters of type list.

  1. External_inputs - generated from the inputData list

  2. External_outputs - generated from the outputData list

  3. External_model_assets - generated from the modelAssets list (contains assets of type EXTERNAL_FILE).

For additional information on how these list of files are handled, please refer to PySpark Job Input and Output Data.

Please note that there could be other types of assets included as part of modelAssets, however this process only includes EXTERNAL_FILE assets (e.g: FILE and SOURCE_CODE assets are not included).

It is assumed that aforementioned external file assets exist in HDFS before the job is created and the user knows their location.


Spark-runtime-service job processing

Every time a Spark job is about to be executed, the Spark-runtime-service generates the next list of files, before job execution.

  1. model_source_code.py

    1. Contains the primary source code - extracted from model.storedModel.modelAssets where "assetType" = "SOURCE_CODE" and "primaryModelSource" = true

  2. model_job_metadata.py

    1. Contains the values related to all assets involved during job execution ( external_inputs, external_outputs, and external_model_assets in the following format):

      external_inputs = [ { "name": "test.csv", "assetType": "EXTERNAL_FILE", "fileUrl": "hdfs:///hadoop/test.csv", "filename": "test.csv", "fileFormat": "CSV" }] external_outputs = [ { "name": "titanic_output.csv", "assetType": "EXTERNAL_FILE", "fileUrl": "hdfs:///hadoop/titanic_output.csv", "filename": "titanic_output.csv", "fileFormat": "CSV" }] external_model_assets = [ { "name": "titanic", "assetType": "EXTERNAL_FILE", "fileUrl": "hdfs:///hadoop/titanic", "filename": "titanic" }] method_to_be_executed = "<my-score-function-name>"
  3. ModelOpPySparkDriver.py

    1. Contains the default Spark's application execution code (for additional details please see here).

This file is the one in charge of calling the score function.

  1. If modelAssets contains assets where "assetType" = "FILE", then it creates a file for each asset that matches the aforementioned criteria.

  2. If modelAssets contains assets where "assetType" = "SOURCE_CODE" and "primaryModelSource" = false, then it creates a file for each asset that matches the aforementioned criteria.

Once all required files have been created at the local file system, then files are uploaded to the cluster as part of the spark-submit execution.


PySpark Job Inputs and Outputs

Spark jobs support the next two types of input and output assets:

  1. HDFS asset(s) by URL also known as External File asset(s)

  2. Embedded asset(s) also known as File asset(s)

 

With the next set of rules and restrictions:

Input

Input Assets by URL

When the job includes input HDFS asset(s) by URL, it’s required that all HDFS asset(s) are present and available inside the Spark cluster before the job is submitted to the Spark cluster for execution.

Input Data Embedded

When a job includes embedded input data assets, the process will generate temporary HDFS files for each of these embedded assets. These newly created HDFS files will be represented as HDFS assets by their URLs and will be used as input data when the job is submitted to the Spark cluster for execution.

This change from embedded assets to HDFS assets by URL is not visible to the user, as the ModelOp Center Job is not updated to reflect this transformation. Instead, this change is managed internally by the PySparkPreprocessorService.

Output

Output Data by URL

When a job includes predefined output HDFS asset(s) by URL, the job output will be generated and stored at the predefined HDFS file locations.

Output Data Embedded

When the user provides embedded output data assets, the spark-runtime-service will create a temporary HDFS directory for the job if one does not already exist. The following steps are then performed in sequence:

  1. Represent each embedded output data asset as an HDFS asset by URL, where the file URL of the new HDFS asset corresponds to the temporary HDFS directory and the filename matches the original embedded asset.

  2. Update the ModelOp Center job by storing the HDFS assets by URL in its job parameters map, using the key TMP_EXTERNAL_OUTPUT.

  3. Use the newly created HDFS assets by URL as the output data when submitting the job to the Spark cluster for execution.

  4. Download each HDFS asset by URL and store its content as the file content for the corresponding embedded output data asset in the MODEL_BATCH_JOB, MODEL_BATCH_TEST_JOB, or MODEL_BATCH_TRAINING_JOB.

  5. Remove TMP_EXTERNAL_OUTPUT from the job parameters map.

This change from embedded assets to HDFS assets by URL is not visible to the user, as the ModelOp Center Job is not updated to reflect this transformation. Instead, this change is managed internally by the PySparkPreprocessorService.


Sample modelBatchJob JSON

{ "jobType": "MODEL_BATCH_JOB", "jobStatus": "CREATED", "model": { "storedModel": { "modelAssets": [ { "name": "titanic.py", "assetType": "SOURCE_CODE", "primaryModelSource": true, "scoreFunction": "score_function", "sourceCode": "from __future__ import print_function\n\nimport time\nimport sys\nfrom random import random\nfrom operator import add\n\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.functions import col\nfrom pyspark.sql.functions import isnull, when, count\nfrom pyspark.sql.functions import udf\nfrom pyspark.sql.types import ArrayType, FloatType\nfrom pyspark.ml.feature import StringIndexer\nfrom pyspark.ml.feature import VectorAssembler\nfrom pyspark.ml.classification import RandomForestClassificationModel\n\ndef score_function(input_files, output_files, model_attachment_files):\n spark = SparkSession\\\n .builder\\\n .appName(\"Titanic\")\\\n .getOrCreate()\n\n print('TITANIC FILE URL = ' + str(model_attachment_files[0]['fileUrl']))\n df = (spark.read\n .format(\"csv\")\n .option('header', 'true')\n .load(input_files[0]['fileUrl']))\n\n dataset = df.select(col('Pclass').cast('float'),\n col('Sex'),\n col('Age').cast('float'),\n col('Fare').cast('float'),\n col('Embarked')\n )\n\n dataset = dataset.replace('?', None)\\\n .dropna(how='any')\n\n dataset = StringIndexer(\n inputCol='Sex',\n outputCol='Gender',\n handleInvalid='keep').fit(dataset).transform(dataset)\n\n dataset = StringIndexer(\n inputCol='Embarked',\n outputCol='Boarded',\n handleInvalid='keep').fit(dataset).transform(dataset)\n\n dataset = dataset.drop('Sex')\n dataset = dataset.drop('Embarked')\n required_features = ['Pclass',\n 'Age',\n 'Fare',\n 'Gender',\n 'Boarded'\n ]\n\n assembler = VectorAssembler(inputCols=required_features, outputCol='features')\n transformed_data = assembler.transform(dataset)\n\n time.sleep(90)\n\n model = RandomForestClassificationModel.load(model_attachment_files[0]['fileUrl'])\n\n predictions = model.transform(transformed_data)\n get_propensity = udf(lambda x: x[1], ArrayType(FloatType()))\n print(predictions.head(5))\n predictions = predictions.select('Pclass',\n 'Age',\n 'Gender',\n 'Fare',\n 'Boarded',\n 'prediction'\n )\n\n print(predictions.head(5))\n predictions.write.csv(output_files[0]['fileUrl'])\n spark.stop()" }, { "name": "titanic", "assetType": "EXTERNAL_FILE", "fileUrl": "hdfs:///hadoop/titanic", "filename": "titanic" } ], "modelMetaData": { "type": "UNKNOWN" } } }, "inputData": [ { "name": "test.csv", "assetType": "EXTERNAL_FILE", "fileUrl": "hdfs:///hadoop/test.csv", "filename": "test.csv", "fileFormat": "CSV" } ], "outputData": [ { "name": "titanic_output.csv", "assetType": "EXTERNAL_FILE", "fileUrl": "hdfs:///hadoop/titanic_output.csv", "filename": "titanic_output.csv", "fileFormat": "CSV" } ], "targetEngine": { "engineType": "SPARK_RUNTIME", "name": "spark-runtime-service", "executorMemory": "512MB", "sparkConf": { "--master": "yarn" } } }

Â