Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Current »

This article explains how to create the Standard Model Definition to deploy, monitor, and govern a model. It is targeted to enable data scientists to prepare models for deployment and operationalization.

Table of Contents

Introduction

ModelOp Center provides a standard framework for defining a model for deployment. Enterprise model deployment requires data science experiments that need to be packaged as software assets for deployment. Model Development Notebooks cannot be deployed into Production. ModelOp Center provides the abstractions required to deploy, monitor, and govern a model.

Standard Model Definition in ModelOp Center

Of the several assets that define how a model will perform upon deployment, only a Model Source with a Scoring Function is required to initially register a model. The next section describes the available functions.

The list of available abstractions includes:

  • Model Source - Code that is called as the model is deployed, scored, trained, or validated. The Model Source can be backed with a git repo where it can be versioned and managed. See the next section for details.

  • Model Functions - Special functions within the Model Source which the runtime uses for specific tasks. See the next section for details.

  • Attachment - This abstraction allows data scientists to load in different versions of the trained model artifact without changing the code used for scoring. consists of external files to be utilized during prediction or scoring. The contents of the attachment get extracted into the current working directory of the model source. include any files or binaries that will be referenced during scoring including the trained model artifact.

  • Schemas - define the data structure and features the model uses on its input as well as its output. The schema can reject the record received for scoring in order to prevent the model from erring. This also serves as the contract between the data pipeline and the model, as well as the data scientist and the data engineer.

  • Model Platform - details the dependencies the model requires once deployed. This metadata is used to determine which runtime will be used during deployment of the model. Note: this information is automatically captured based on the Jupyter Notebook environment if using the Jupyter Notebook plugin (see Integrate with Jupyter).

  • See Model Governance: Standard Model Definition for additional detail.

Functions within Model Source

Within the Source Code, you can designate the specific entry points, or functions, into the model code. The Scoring Function is the only function required for deployment. The other functions define the other steps of a model’s life cycle and operationalization.

These functions are executed at different portions of the Model Life Cycle utilizing Batch Jobs. Batch Jobs can be executed manually as part of testing or automated within an MLC Process. This provides a flexible and scalable framework for deploying Models into Business. Batch Jobs including scoring, test, and training can be run manually or automated using an MLC Process.

The function examples in the following sections are a walk through of the Model Source for a lasso regression model that detects emails that are forwarded outside the organization. These functions can be mapped from the Model Source using the Command Center or using smart tag comments. The smart tag comments are called out in red in the following sections.

Init Function

The Init Function is executed when the model is initially deployed into the runtime. It initializes the model and loads any dependencies for scoring. If the model uses an attachment, the Init Function typically loads the trained model artifact for scoring. In this example of a regression model, the model artifacts are loaded from the pickle file with the trained model weights.

# modelop.init

def begin():
    global lasso_model_artifacts 
    lasso_model_artifacts = pickle.load(open('lasso_model_artifacts.pkl', 'rb'))
    nltk.download('averaged_perceptron_tagger')
    pass

Scoring Function

The Scoring Function yields predictions from input records. It executes when a record is sent to the endpoint of the runtime the model is deployed in. It can also be called using a Scoring Batch Job in the Command Center, and from the CLI. In this example of a regression model, the action function is tokenizing an incoming email, applying a bag-of-words transformation, applying a TFIDF vectorizer to the bag of words, and then using a LASSO regularized logistic regression to produce a classification.

# modelop.score

def action(x):
    lasso_model = lasso_model_artifacts['lasso_model']
    dictionary = lasso_model_artifacts['dictionary']
    threshold = lasso_model_artifacts['threshold']
    tfidf_model = lasso_model_artifacts['tfidf_model']
    
    x = pd.DataFrame(x, index=[0]) 
    cleaned = preprocess(x.content)
    corpus = cleaned.apply(dictionary.doc2bow)
    corpus_sparse = gensim.matutils.corpus2csc(corpus).transpose()
    corpus_sparse_padded = pad_sparse_matrix(sp_mat = corpus_sparse, 
                                             length=corpus_sparse.shape[0], 
                                             width = len(dictionary))
    tfidf_vectors = tfidf_model.transform(corpus_sparse_padded)

    probabilities = lasso_model.predict_proba(tfidf_vectors)[:,1]

    predictions = pd.Series(probabilities > threshold, index=x.index).astype(int)
    output = pd.concat([x, predictions], axis=1)
    output.columns = ['content', 'id', 'prediction']
    output = output.to_dict(orient='records')
    yield output

Metrics Function

The Metrics Function calculates metrics around the model’s performance based on labelled data, including mathematical (back-test), bias, and interpretability. It executes when a Test Batch Job is run on the model. The Metrics Function calculates a confusion matrix, ROC curve, AUC, F2 , ROC.

# modelop.metrics

def metrics(x):
    lasso_model = lasso_model_artifacts['lasso_model']
    dictionary = lasso_model_artifacts['dictionary']
    threshold = lasso_model_artifacts['threshold']
    tfidf_model = lasso_model_artifacts['tfidf_model']

    actuals = x.flagged
    
    cleaned = preprocess(x.content)
    corpus = cleaned.apply(dictionary.doc2bow)
    corpus_sparse = gensim.matutils.corpus2csc(corpus).transpose()
    corpus_sparse_padded = pad_sparse_matrix(sp_mat = corpus_sparse, 
                                             length=corpus_sparse.shape[0], 
                                             width = len(dictionary))
    tfidf_vectors = tfidf_model.transform(corpus_sparse_padded)

    probabilities = lasso_model.predict_proba(tfidf_vectors)[:,1]

    predictions = pd.Series(probabilities > threshold, index=x.index).astype(int) 
    
    confusion_matrix = sklearn.metrics.confusion_matrix(actuals, predictions)
    fpr,tpr,thres = sklearn.metrics.roc_curve(actuals, predictions)

    auc_val = sklearn.metrics.auc(fpr, tpr)
    f2_score = sklearn.metrics.fbeta_score(actuals, predictions, beta=2)

    roc_curve = [{'fpr': x[0], 'tpr':x[1]} for x in list(zip(fpr, tpr))]
    labels = ['Compliant', 'Non-Compliant']
    cm = matrix_to_dicts(confusion_matrix, labels)
    test_results = dict(roc_curve=roc_curve,
                   auc=auc_val,
                   f2_score=f2_score,
                   confusion_matrix=cm)    
    yield test_results

Training Function

The Training Function train and retrain the model. It executes when a Training Batch Job is called on the model. The Training Function also provides context and traceability on the model origins and collaboration between data scientists. Any files written to the directory “outputDir” as in the example below will be written to S3 as an External File Asset.

# modelop.training

def train(data):
	y_train = data.flagged
	removed_proper_nouns = data.content.astype(str).apply(remove_proper_nouns)
​
	CUSTOM_FILTERS = [lambda x: x.lower(), 
                  gensim.parsing.preprocessing.strip_tags, 
                  gensim.parsing.preprocessing.strip_punctuation]
​
	removed_punctuation = removed_proper_nouns.apply(functools.partial(gensim.parsing.preprocess_string, filters=CUSTOM_FILTERS))
	
	stemmer = nltk.stem.porter.PorterStemmer()
​
	#Remove stop words, words of length less than 2, and words with non-alphabet characters.
	cleaned = removed_punctuation.apply(lambda x: list(map(gensim.parsing.preprocessing.remove_stopwords, x)))
​
	cleaned = cleaned.apply(lambda x: list(filter(lambda y: len(y) > 1, x)))
​
	cleaned = cleaned.apply(lambda x: list(filter(lambda y: y.isalpha(), x)))
​
	cleaned = cleaned.apply(lambda x: list(map(stemmer.stem, x)))
	
	#Create a dictionary (key, value pairs of ids with words which appear in the corpus.
	dictionary = gensim.corpora.dictionary.Dictionary(documents=cleaned)
​
	dictionary.filter_extremes(no_below=5, no_above=0.4)
	
	# Produce a sparse bag-of-words matrix from the word-document frequency counts
	corpus = cleaned.apply(dictionary.doc2bow).to_list()
​
	corpus_sparse = gensim.matutils.corpus2csc(corpus).transpose()
	
	# Train a tf-idf transformer and transform the training data
	tfidf_model = sklearn.feature_extraction.text.TfidfTransformer()
	train_tfidf = tfidf_model.fit_transform(train_corpus_sparse)
	
	# Define and fit a logistic regression model
	logreg = sklearn.linear_model.LogisticRegression(penalty='l1', class_weight='balanced', max_iter=2500, random_state=740189)
​
	logreg_model = logreg.fit(X=train_tfidf, y=y_train)
	
	lasso_model_artifacts = dict(lasso_model = logreg_model, 
                             dictionary = dictionary, 
                             tfidf_model = tfidf_model, 
                             threshold = thresh)
                             
	with open('outputDir/lasso_model_artifact.pkl', 'wb') as f:
		pickle.dump(lasso_model_artifacts, f)
	pass

Preparing Production Ready Spark Models

This article provides details for preparing a Spark Model to execute within ModelOp Center.

Table of Contents

Format of Spark Model

A Spark model for which a modelBatchJob will be created needs to comply with the following structure:

from __future__ import print_function

from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassificationModel

# modelop.score
def <my-score-function-name>(external_inputs, external_outputs, external_model_assets):
    spark = SparkSession.builder.appName("<My-App-Name>").getOrCreate()
    
    ... 
    
    # Read an input file that was uploaded to HDFS by the user
    df = (spark.read
            .format("csv")
            .option('header', 'true')
            .load(external_inputs[0]['fileUrl']))
    
    # Read a model asset file/folder that was uploaded to HDFS by the user
    model = RandomForestClassificationModel.load(external_model_assets[0]['fileUrl'])
      
    ...
       
    # Write to an output file that will be created in HDFS
    predictions.write.csv(external_outputs[0]['fileUrl'])

As defined in the model template, the scoring function takes three parameters. Each parameter is a Python list where its elements match the "assetType" = "EXTERNAL_FILE" condition. It is assumed that these external file assets exist in HDFS before the job is created and the user knows their location.

  1. external_inputs are generated from the inputData list

  2. external_outputs are generated from the outputData list

  3. external_model_assets are generated from the modelAssets list

Each Python list contains entire Asset objects, respectively. The external_model_assets list will be empty if the model has no assets.

There could be other types of assets in inputData, outputData, and modelAssets, but we are only including the EXTERNAL_FILE assets while excluding the FILE and SOURCE_CODE assets, if any.


Execution Flow in spark-runtime-service

Once the user creates their modelBatchJob, the spark-runtime-service will be triggered. When the spark-runtime-service receives the modelBatchJob JSON, it will create the following files in its working directory:

  1. model_source_code.py

    1. Contains the primary source code which is retrieved from the modelBatchJob JSON (model.storedModel.modelAssets) where "assetType" = "SOURCE_CODE" and "primaryModelSource" = true

  2. model_job_metadata.py

    1. Contains the values for external_inputs, external_outputs, and external_model_assets in the following format:

      external_inputs = [
        {
          "name": "test.csv",
          "assetType": "EXTERNAL_FILE",
          "fileUrl": "/hadoop/test.csv",
          "filename": "test.csv",
          "fileFormat": "CSV"
        }]
      
      external_outputs = [
         {
          "name": "titanic_output.csv",
          "assetType": "EXTERNAL_FILE",
          "fileUrl": "/hadoop/titanic_output.csv",
          "filename": "titanic_output.csv",
          "fileFormat": "CSV"
        }]
      
      
      external_model_assets = [
        {
          "name": "titanic",
          "assetType": "EXTERNAL_FILE",
          "fileUrl": "/hadoop/titanic",
          "filename": "titanic"
        }]
      
      method_to_be_executed = "<my-score-function-name>"
  3. ModelOpPySparkDriver.py

    1. Contains the code for Spark’s application resource which you can see here. Also, it is from this file that the score function is called while passing in the values for the model’s three parameters

  4. If modelAssets contains assets where "assetType" = "FILE", then the spark-runtime-service will create a file in its working directory for each asset that matches the aforementioned criteria. An example of such file would be a schema file

  5. If modelAssets contains assets where "assetType" = "SOURCE_CODE" and "primaryModelSource" = false, then the spark-runtime-service will create a file in its working directory for each asset that matches the aforementioned criteria.

Once these files (1-5) have been created, the spark-runtime-service will upload them to HDFS when spark-submit is executed. None of these files will be included in external_inputs, external_outputs, or external_model_assets. Since these files are not uploaded by the user directly, and instead when spark-submit is executed, they will be stored at the following location:

hdfs://<cluster-host>:<cluster-port>/user/<username>/.sparkStaging/<spark-application-id>/ModelOpPySparkDriver.py 

Therefore, if the user’s model source code uses any of these files, they must be read in pure Python with:

open("ModelOpPySparkDriver.py", "r")

Please note that accessing these files in the model source code is different from how we access the files included in external_inputs,external_outputs, and external_model_assets.


Example modelBatchJob JSON

The following modelBatchJob JSON was used to generate the example values:

 modelBatchJob JSON
{
  "jobType": "MODEL_BATCH_JOB",
  "jobStatus": "CREATED",
  "model": {
    "storedModel": {
      "modelAssets": [
        {
          "name": "titanic.py",
          "assetType": "SOURCE_CODE",
          "primaryModelSource": true,
          "scoreFunction": "score_function",
          "sourceCode": "from __future__ import print_function\n\nimport time\nimport sys\nfrom random import random\nfrom operator import add\n\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.functions import col\nfrom pyspark.sql.functions import isnull, when, count\nfrom pyspark.sql.functions import udf\nfrom pyspark.sql.types import ArrayType, FloatType\nfrom pyspark.ml.feature import StringIndexer\nfrom pyspark.ml.feature import VectorAssembler\nfrom pyspark.ml.classification import RandomForestClassificationModel\n\ndef score_function(input_files, output_files, model_attachment_files):\n   spark = SparkSession\\\n       .builder\\\n       .appName(\"Titanic\")\\\n       .getOrCreate()\n\n   print('TITANIC FILE URL = ' + str(model_attachment_files[0]['fileUrl']))\n   df = (spark.read\n         .format(\"csv\")\n         .option('header', 'true')\n         .load(input_files[0]['fileUrl']))\n\n   dataset = df.select(col('Pclass').cast('float'),\n                       col('Sex'),\n                       col('Age').cast('float'),\n                       col('Fare').cast('float'),\n                       col('Embarked')\n                       )\n\n   dataset = dataset.replace('?', None)\\\n       .dropna(how='any')\n\n   dataset = StringIndexer(\n       inputCol='Sex',\n       outputCol='Gender',\n       handleInvalid='keep').fit(dataset).transform(dataset)\n\n   dataset = StringIndexer(\n       inputCol='Embarked',\n       outputCol='Boarded',\n       handleInvalid='keep').fit(dataset).transform(dataset)\n\n   dataset = dataset.drop('Sex')\n   dataset = dataset.drop('Embarked')\n   required_features = ['Pclass',\n                   'Age',\n                   'Fare',\n                   'Gender',\n                   'Boarded'\n                  ]\n\n   assembler = VectorAssembler(inputCols=required_features, outputCol='features')\n   transformed_data = assembler.transform(dataset)\n\n   time.sleep(90)\n\n   model = RandomForestClassificationModel.load(model_attachment_files[0]['fileUrl'])\n\n   predictions = model.transform(transformed_data)\n   get_propensity = udf(lambda x: x[1], ArrayType(FloatType()))\n   print(predictions.head(5))\n   predictions = predictions.select('Pclass',\n                       'Age',\n                       'Gender',\n                       'Fare',\n                       'Boarded',\n                       'prediction'\n                       )\n\n   print(predictions.head(5))\n   predictions.write.csv(output_files[0]['fileUrl'])\n   spark.stop()"
        },
        {
          "name": "titanic",
          "assetType": "EXTERNAL_FILE",
          "fileUrl": "/hadoop/titanic",
          "filename": "titanic"
        }
      ],
      "modelMetaData": {
        "type": "UNKNOWN"
      }
    }
  },
  "inputData": [
    {
      "name": "test.csv",
      "assetType": "EXTERNAL_FILE",
      "fileUrl": "/hadoop/test.csv",
      "filename": "test.csv",
      "fileFormat": "CSV"
    }
  ],
  "outputData": [
    {
      "name": "titanic_output.csv",
      "assetType": "EXTERNAL_FILE",
      "fileUrl": "/hadoop/titanic_output.csv",
      "filename": "titanic_output.csv",
      "fileFormat": "CSV"
    }
  ],
  "targetEngine": {
    "engineType": "SPARK_RUNTIME",
    "name": "spark-runtime-service",
    "executorMemory": "512MB",
    "sparkConf": {
      "--master": "yarn"
    }
  }
}

Next Article: Register a Model >

  • No labels