...
If the training data is too large to fit in memory, you can save summary statistics about the training data and save those as, e.g., a pickle file and read those statistics in during the init function of the drift model. The metrics function can contain other statistical tests to compare those statistics to the statistics of the incoming batch.
Spark Models
A similar drift detection method may be used for PySpark models with HDFS assets by parsing the HDFS asset URLs from the parameters of the metrics function. The following is a simple example:
Code Block |
---|
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import isnull, when, count
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassificationModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#modelop.init
def begin():
print("Begin function...")
global SPARK
SPARK = SparkSession.builder.appName("DriftTest").getOrCreate()
global MODEL
MODEL = RandomForestClassificationModel.load("/hadoop/demo/titanic-spark/titanic")
#modelop.score
def action(datum):
yield datum
def metrics(external_inputs, external_outputs, external_model_assets):
# Grab single input asset and single output asset file paths
input_asset_path = external_inputs[0]["fileUrl"]
output_asset_path = external_outputs[0]["fileUrl"]
input_df = SPARK.read.format("csv").option("header", "true").load(input_asset_path)
predictions = predict(input_df)
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
labelCol="Survived", predictionCol="prediction", metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
output_df = SPARK.createDataFrame([{"accuracy": accuracy}])
print("Metrics output:")
output_df.show()
output_df.coalesce(1).write.mode("overwrite").option("header", "true").format(
"json"
).save(output_asset_path)
SPARK.stop()
def predict(input_df):
dataset = input_df.select(
col("Survived").cast("float"),
col("Pclass").cast("float"),
col("Sex"),
col("Age").cast("float"),
col("Fare").cast("float"),
col("Embarked"),
)
dataset = dataset.replace("?", None).dropna(how="any")
dataset = (
StringIndexer(inputCol="Sex", outputCol="Gender", handleInvalid="keep")
.fit(dataset)
.transform(dataset)
)
dataset = (
StringIndexer(inputCol="Embarked", outputCol="Boarded", handleInvalid="keep")
.fit(dataset)
.transform(dataset)
)
dataset = dataset.drop("Sex")
dataset = dataset.drop("Embarked")
required_features = ["Pclass", "Age", "Fare", "Gender", "Boarded"]
assembler = VectorAssembler(inputCols=required_features, outputCol="features")
transformed_data = assembler.transform(dataset)
predictions = MODEL.transform(transformed_data)
return predictions |
This model uses a Spark MulticlassClassificationEvaluator
to determine the accuracy of the predictions generated by the titanic model.
Next Article: Model Governance: Standard Model Definition >