Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This article describes how ModelOp Center enables on-going ongoing Statistical Monitoring.

Table of Contents

...

Code Block
languagepy
import pandas as pd
import numpy as np
from scipy.stats import ks_2samp
from scipy.stats import binom_test

#modelop
# modelop.init
def begin():
    
    global train, numerical_features
    train = pd.read_csv('training_data.csv')
    numerical_features = train.select_dtypes(['int64', 'float64']).columns
    pass
 #modelop.score def action(datum): pass


 yield datum

#modelop# modelop.metrics
def metrics(data):

    ks_tests = [ks_2samp(train.loc[:, feat], data.loc[:, feat]) \
                for feat in numerical_features]
    pvalues = [x[1] for x in ks_tests]
    ks_pvalues = dict(zip(numerical_features, pvalues))
    
    yield dict(pvalues=ks_pvalues)

This drift model executes a two-sample Kolmogorov-Smirnov test between numerical features of the training data and the incoming batch and reports the p-values. If the p-values are sufficiently large (over 0.01 or 0.05), you can assume that the two samples are similar. If the p-values are small, you can assume that these samples are different and generate an alert.

...

Code Block
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import isnull, when, count
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassificationModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


#modelop# modelop.init
def begin():
    print("Begin function...")

    global SPARK
    SPARK = SparkSession.builder.appName("DriftTest").getOrCreate()

    global MODEL
    MODEL = RandomForestClassificationModel.load("/hadoop/demo/titanic-spark/titanic")

#modelop# modelop.scoremetrics
def action(datum):     yield datum

def metrics(external_inputs, external_outputs, external_model_assets):
    # Grab single input asset and single output asset file paths
    input_asset_path = external_inputs[0]["fileUrl"]
    output_asset_path = external_outputs[0]["fileUrl"]
    
    input_df = SPARK.read.format("csv").option("header", "true").load(input_asset_path)
    predictions = predict(input_df)

    # Select (prediction, true label) and compute test error
    evaluator = MulticlassClassificationEvaluator(
        labelCol="Survived", predictionCol="prediction", metricName="accuracy"
    )
    accuracy = evaluator.evaluate(predictions)

    output_df = SPARK.createDataFrame([{"accuracy": accuracy}])
    print("Metrics output:")
    output_df.show()

    output_df.coalesce(1).write.mode("overwrite").option("header", "true").format(
        "json"
    ).save(output_asset_path)

    SPARK.stop()
    
def predict(input_df):
    dataset = input_df.select(
        col("Survived").cast("float"),
        col("Pclass").cast("float"),
        col("Sex"),
        col("Age").cast("float"),
        col("Fare").cast("float"),
        col("Embarked"),
    )

    dataset = dataset.replace("?", None).dropna(how="any")

    dataset = (
        StringIndexer(inputCol="Sex", outputCol="Gender", handleInvalid="keep")
        .fit(dataset)
        .transform(dataset)
    )

    dataset = (
        StringIndexer(inputCol="Embarked", outputCol="Boarded", handleInvalid="keep")
        .fit(dataset)
        .transform(dataset)
    )

    dataset = dataset.drop("Sex")
    dataset = dataset.drop("Embarked")

    required_features = ["Pclass", "Age", "Fare", "Gender", "Boarded"]

    assembler = VectorAssembler(inputCols=required_features, outputCol="features")
    transformed_data = assembler.transform(dataset)

    predictions = MODEL.transform(transformed_data)
    
    return predictions

This model uses a Spark MulticlassClassificationEvaluator to determine the accuracy of the predictions generated by the titanic model.

...