This article describes how ModelOp Center enables on-going ongoing Statistical Monitoring.
Table of Contents
...
Code Block | ||
---|---|---|
| ||
import pandas as pd import numpy as np from scipy.stats import ks_2samp from scipy.stats import binom_test #modelop # modelop.init def begin(): global train, numerical_features train = pd.read_csv('training_data.csv') numerical_features = train.select_dtypes(['int64', 'float64']).columns pass #modelop.score def action(datum): pass yield datum #modelop# modelop.metrics def metrics(data): ks_tests = [ks_2samp(train.loc[:, feat], data.loc[:, feat]) \ for feat in numerical_features] pvalues = [x[1] for x in ks_tests] ks_pvalues = dict(zip(numerical_features, pvalues)) yield dict(pvalues=ks_pvalues) |
This drift model executes a two-sample Kolmogorov-Smirnov test between numerical features of the training data and the incoming batch and reports the p-values. If the p-values are sufficiently large (over 0.01 or 0.05), you can assume that the two samples are similar. If the p-values are small, you can assume that these samples are different and generate an alert.
...
Code Block |
---|
from pyspark.sql import SparkSession from pyspark.sql.functions import col from pyspark.sql.functions import isnull, when, count from pyspark.sql.functions import udf from pyspark.sql.types import ArrayType, FloatType from pyspark.ml.feature import StringIndexer from pyspark.ml.feature import VectorAssembler from pyspark.ml.classification import RandomForestClassificationModel from pyspark.ml.evaluation import MulticlassClassificationEvaluator #modelop# modelop.init def begin(): print("Begin function...") global SPARK SPARK = SparkSession.builder.appName("DriftTest").getOrCreate() global MODEL MODEL = RandomForestClassificationModel.load("/hadoop/demo/titanic-spark/titanic") #modelop# modelop.scoremetrics def action(datum): yield datum def metrics(external_inputs, external_outputs, external_model_assets): # Grab single input asset and single output asset file paths input_asset_path = external_inputs[0]["fileUrl"] output_asset_path = external_outputs[0]["fileUrl"] input_df = SPARK.read.format("csv").option("header", "true").load(input_asset_path) predictions = predict(input_df) # Select (prediction, true label) and compute test error evaluator = MulticlassClassificationEvaluator( labelCol="Survived", predictionCol="prediction", metricName="accuracy" ) accuracy = evaluator.evaluate(predictions) output_df = SPARK.createDataFrame([{"accuracy": accuracy}]) print("Metrics output:") output_df.show() output_df.coalesce(1).write.mode("overwrite").option("header", "true").format( "json" ).save(output_asset_path) SPARK.stop() def predict(input_df): dataset = input_df.select( col("Survived").cast("float"), col("Pclass").cast("float"), col("Sex"), col("Age").cast("float"), col("Fare").cast("float"), col("Embarked"), ) dataset = dataset.replace("?", None).dropna(how="any") dataset = ( StringIndexer(inputCol="Sex", outputCol="Gender", handleInvalid="keep") .fit(dataset) .transform(dataset) ) dataset = ( StringIndexer(inputCol="Embarked", outputCol="Boarded", handleInvalid="keep") .fit(dataset) .transform(dataset) ) dataset = dataset.drop("Sex") dataset = dataset.drop("Embarked") required_features = ["Pclass", "Age", "Fare", "Gender", "Boarded"] assembler = VectorAssembler(inputCols=required_features, outputCol="features") transformed_data = assembler.transform(dataset) predictions = MODEL.transform(transformed_data) return predictions |
This model uses a Spark MulticlassClassificationEvaluator
to determine the accuracy of the predictions generated by the titanic model.
...