{ "cells": [ { "cell_type": "code", "source": [ "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n", "\n", "# Licensed to the Apache Software Foundation (ASF) under one\n", "# or more contributor license agreements. See the NOTICE file\n", "# distributed with this work for additional information\n", "# regarding copyright ownership. The ASF licenses this file\n", "# to you under the Apache License, Version 2.0 (the\n", "# \"License\"); you may not use this file except in compliance\n", "# with the License. You may obtain a copy of the License at\n", "#\n", "# http://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing,\n", "# software distributed under the License is distributed on an\n", "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", "# KIND, either express or implied. See the License for the\n", "# specific language governing permissions and limitations\n", "# under the License" ], "metadata": { "id": "P5tEC4_l6bwd" }, "execution_count": null, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }] }, { "cell_type": "markdown", "source": [ "# TensorFlow Model Analysis in Beam\n", "\n", "\n", " \n", " \n", "
\n", " Run in Google Colab\n", " \n", " View source on GitHub\n", "
\n" ], "metadata": { "id": "1m9dEIsQAP_-" } }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "GNbarEZsalS2" }, "source": [ "[TensorFlow Model Analysis (TFMA)](https://www.tensorflow.org/tfx/guide/tfma) is a library for performing model evaluation across different slices of data. TFMA performs its computations in a distributed manner over large quantities of data by using Apache Beam.\n", "\n", "This example notebook shows how you can use TFMA to investigate and visualize the performance of a model as part of your Apache Beam pipeline by creating and comparing two models. This example uses [ExtractEvaluateAndWriteResults](https://www.tensorflow.org/tfx/model_analysis/api_docs/python/tfma/ExtractEvaluateAndWriteResults), which is a `PTransform` that performs extraction and evaluation and writes results all in one step.\n", "\n", "TFMA enables scalable and flexible execution of your evaluation pipeline. For additional information about TFMA, see the [TFMA basic notebook](https://www.tensorflow.org/tfx/tutorials/model_analysis/tfma_basic), which provides an in-depth look at TFMA capabilities." ] }, { "cell_type": "markdown", "source": [ "## Install Jupyter extensions\n", "If you are running this example in a local Jupyter notebook, before running Jupyter, you must install these Jupyter extensions in the environment.\n", "\n", "```bash\n", "jupyter nbextension enable --py widgetsnbextension --sys-prefix \n", "jupyter nbextension install --py --symlink tensorflow_model_analysis --sys-prefix \n", "jupyter nbextension enable --py tensorflow_model_analysis --sys-prefix \n", "```" ], "metadata": { "id": "GKcUZKcTRhW_" } }, { "cell_type": "markdown", "source": [ "## Install TFMA\n", "\n", "Installing TFMA pulls in all of the dependencies. The installation takes about a minute." ], "metadata": { "id": "-01Hts8eR9OV" } }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "8aLJh4pFasK0" }, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }], "source": [ "# Upgrade pip to the latest version, and then install TFMA.\n", "!pip install -U pip\n", "!pip install tensorflow-model-analysis\n", "\n", "# To use the newly installed version of pip, restart the runtime.\n", "exit() " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "SUTczmH2dWk2" }, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }], "source": [ "# This configuration was tested in Colab with TensorFlow 2.11, TFMA 0.43, and Apache Beam 2.44.\n", "# The setup is also compatible with the current release.\n", "import sys\n", "\n", "# Confirm that you're using Python 3.\n", "assert sys.version_info.major==3, 'This notebook must be run using Python 3.'\n", "\n", "import tensorflow as tf\n", "print('TF version: {}'.format(tf.__version__))\n", "import apache_beam as beam\n", "print('Beam version: {}'.format(beam.__version__))\n", "import tensorflow_model_analysis as tfma\n", "print('TFMA version: {}'.format(tfma.__version__))\n", "import tensorflow_datasets as tfds\n", "print('TFDS version: {}'.format(tfds.__version__))" ] }, { "cell_type": "markdown", "source": [ "**Note:** Before proceeding, verify that the output does not have errors. If errors occur, re-run the installation, and restart your kernel." ], "metadata": { "id": "KP7V1pIU_9H2" } }, { "cell_type": "markdown", "metadata": { "id": "SHL89whuLqmq" }, "source": [ "## Preprocess data\n", "\n", "This section includes the steps for preprocessing your data." ] }, { "cell_type": "markdown", "metadata": { "id": "6yapf7rN_lB7" }, "source": [ "### Create a diamond price prediction model\n", "\n", "This example uses the [TFDS diamonds dataset](https://www.tensorflow.org/datasets/catalog/diamonds) to train a linear regression model that predicts the price of a diamond. This dataset contains various physical attributes of the diamonds, such as the weight (carat), cut quality, color, and clarity, as well as the price of 53,940 diamonds. The model's performance is evaluated using metrics such as mean squared error and mean absolute error.\n", "\n", "To simulate a scenario where a model's performance improves over time as new data is added to the dataset, first use half of the diamond dataset to train a model called v1. Then, use additional data to train a second model called v2. These steps demonstrate the use of TFMA when comparing the performance of two models for the same task." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "hQCHORIG8Ixv" }, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }], "source": [ "# Load the data from TFDS and then split the dataset into parts to create train, test, and validation datasets.\n", "(ds_train_v1, ds_test, ds_val), info = tfds.load('diamonds', split=['train[:40%]', 'train[80%:90%]', 'train[90%:]'], as_supervised=True, with_info=True)" ] }, { "cell_type": "code", "source": [ "import numpy as np\n", "\n", "# Load the numerical training data to use for normalization.\n", "def extract_numerical_features(item):\n", " carat = item['carat']\n", " depth = item['depth']\n", " table = item['table']\n", " x = item['x']\n", " y = item['y']\n", " z = item['z']\n", " \n", " return [carat, depth, table, x, y, z]\n", "\n", "def get_train_data(ds_train):\n", " train_data = []\n", " for item, label in ds_train:\n", " features = extract_numerical_features(item)\n", " train_data.append(features)\n", "\n", " train_data = np.array(train_data)\n", "\n", " return train_data" ], "metadata": { "id": "_sNqOzwNGo6V" }, "execution_count": null, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }] }, { "cell_type": "code", "source": [ "train_data_v1 = get_train_data(ds_train_v1)" ], "metadata": { "id": "PVVUZ1LOwQAf" }, "execution_count": null, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }] }, { "cell_type": "code", "source": [ "# Define the length of the features.\n", "NUMERICAL_FEATURES = 6\n", "NUM_FEATURES = (NUMERICAL_FEATURES +\n", " info.features['features']['color'].num_classes +\n", " info.features['features']['cut'].num_classes +\n", " info.features['features']['clarity'].num_classes)" ], "metadata": { "id": "__6xLw92aI9a" }, "execution_count": null, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "ZJt8Wu2LmX9j" }, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }], "source": [ "# To transform the input data into a feature vector and label, select the input and output for the model.\n", "def transform_data(item, label):\n", " numerical_features = extract_numerical_features(item)\n", "\n", " # Categorical features are encoded using one-hot encoding.\n", " color = tf.one_hot(item['color'], info.features['features']['color'].num_classes)\n", " cut = tf.one_hot(item['cut'], info.features['features']['cut'].num_classes)\n", " clarity = tf.one_hot(item['clarity'], info.features['features']['clarity'].num_classes)\n", " \n", " # Create the output tensor.\n", " output = tf.concat([tf.stack(numerical_features, axis=0), color, cut, clarity], 0)\n", " return output, [label]" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "f3ZxhNZDsbkx" }, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }], "source": [ "ds_train_v1 = ds_train_v1.map(transform_data)\n", "ds_test = ds_test.map(transform_data)\n", "ds_val = ds_val.map(transform_data)" ] }, { "cell_type": "code", "source": [ "# To prepare the data for training, structure it in batches.\n", "BATCH_SIZE = 32\n", "ds_train_v1 = ds_train_v1.batch(BATCH_SIZE)\n", "ds_test = ds_test.batch(BATCH_SIZE)" ], "metadata": { "id": "sw3udkicwVZE" }, "execution_count": null, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }] }, { "cell_type": "markdown", "source": [ "### Create TFRecords\n", "\n", "TFMA and Apache Beam need to read the dataset used during evaluation from a file. Create a `TFRecords` file that contains the validation dataset." ], "metadata": { "id": "KFd6NwPAacSM" } }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "ipHOHiRqMJOi" }, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }], "source": [ "!mkdir data" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "TkeG4uw1K9Dt" }, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }], "source": [ "# Write the validation record to a file, which is used by TFMA.\n", "tfrecord_file = 'data/val_data.tfrecord'\n", "\n", "with tf.io.TFRecordWriter(tfrecord_file) as file_writer:\n", " for x, y in ds_val:\n", " record_bytes = tf.train.Example(features=tf.train.Features(feature={\n", " \"inputs\": tf.train.Feature(float_list=tf.train.FloatList(value=x)),\n", " \"output\": tf.train.Feature(float_list=tf.train.FloatList(value=[y])),\n", " })).SerializeToString()\n", " file_writer.write(record_bytes)" ] }, { "cell_type": "markdown", "metadata": { "id": "zhunMzSOLmdG" }, "source": [ "## Define and train one model" ] }, { "cell_type": "markdown", "source": [ "Train a linear regression model that predicts the price of a diamond. The model is a neural network with one hidden layer. The model also uses a normalization layer to scale all of the numerical features between 0 and 1." ], "metadata": { "id": "Svsic-PSbGu7" } }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "so7l2WDzd2kg" }, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }], "source": [ "def construct_model(model_name, train_data):\n", " inputs = tf.keras.Input(shape=(NUM_FEATURES,), name='inputs')\n", "\n", " # Normalize the numerical features.\n", " normalization_layer = tf.keras.layers.Normalization()\n", " # Fit the normalization layer to the training data.\n", " normalization_layer.adapt(train_data)\n", " # Split the input between numerical and categorical input.\n", " input_numerical = tf.gather(inputs, indices=[*range(NUMERICAL_FEATURES)], axis=1)\n", " input_normalized = normalization_layer(input_numerical)\n", " input_one_hot = tf.gather(inputs, indices=[*range(NUMERICAL_FEATURES, NUM_FEATURES)], axis=1)\n", " # Define one hidden layer with 8 neurons.\n", " x = tf.keras.layers.Dense(8, activation='relu')(tf.concat([input_normalized, input_one_hot], 1))\n", " outputs = tf.keras.layers.Dense(1, name='output')(x)\n", " model = tf.keras.Model(inputs=inputs, outputs=outputs, name=model_name)\n", "\n", " model.compile(\n", " optimizer=tf.keras.optimizers.Adam(learning_rate=0.1),\n", " loss='mean_absolute_error')\n", " \n", " return model" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "buc0NIc4oRv4" }, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }], "source": [ "model_v1 = construct_model('model_v1', train_data_v1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "8bvna8ZJAKj5" }, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }], "source": [ "# Train the model.\n", "history = model_v1.fit(\n", " ds_train_v1,\n", " validation_data=ds_test,\n", " epochs=5,\n", " verbose=1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "pyCfKRY9DUhO" }, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }], "source": [ "# Save the model to disk.\n", "model_path_v1 = 'saved_model_v1'\n", "model_v1.save(model_path_v1)" ] }, { "cell_type": "markdown", "metadata": { "id": "kwUycnjpLvTX" }, "source": [ "## Evaluate the model" ] }, { "cell_type": "markdown", "source": [ "With the trained model, you can use TFMA to analyze the performance. First, define the evaluation configuration. This example uses the most common metrics used for a linear regression model: mean squared error and mean absolute error. For more information about the supported evaluation parameters, see [TFMA metrics and plots](https://www.tensorflow.org/tfx/model_analysis/metrics). " ], "metadata": { "id": "7gmbEx6wG2Za" } }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "b2NN9zVr-AE1" }, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }], "source": [ "from google.protobuf import text_format\n", "\n", "# Define the TFMA evaluation configuration.\n", "eval_config = text_format.Parse(\"\"\"\n", " ## Model information\n", " model_specs {\n", " # For keras and serving models, you need to add a `label_key`.\n", " label_key: \"output\"\n", " }\n", "\n", " ## This post-training metric information is merged with any built-in\n", " ## metrics from training.\n", " metrics_specs {\n", " metrics { class_name: \"ExampleCount\" }\n", " metrics { class_name: \"MeanAbsoluteError\" }\n", " metrics { class_name: \"MeanSquaredError\" }\n", " metrics { class_name: \"MeanPrediction\" }\n", " }\n", "\n", " slicing_specs {}\n", "\"\"\", tfma.EvalConfig())" ] }, { "cell_type": "markdown", "source": [ "Next, use the [ExtractEvaluateAndWriteResults](https://www.tensorflow.org/tfx/model_analysis/api_docs/python/tfma/ExtractEvaluateAndWriteResults) `PTransform`, which performs extraction and evaluation and writes results. To use this `PTransform` directly in your Apache Beam pipeline, use [TFXIO](https://www.tensorflow.org/tfx/tfx_bsl/api_docs/python/tfx_bsl/public/tfxio) to combine it with reading in your `TFRecords`." ], "metadata": { "id": "8JVSVK4iH-d2" } }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "4NDEQAMDhJWL" }, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }], "source": [ "from tfx_bsl.public import tfxio\n", "\n", "output_path = 'evaluation_results'\n", "\n", "eval_shared_model = tfma.default_eval_shared_model(\n", " eval_saved_model_path=model_path_v1, eval_config=eval_config)\n", "\n", "tfx_io = tfxio.TFExampleRecord(\n", " file_pattern=tfrecord_file,\n", " raw_record_column_name=tfma.ARROW_INPUT_COLUMN)\n", "\n", "# Run Evaluation.\n", "with beam.Pipeline() as pipeline:\n", " _ = (\n", " pipeline\n", " | 'ReadData' >> tfx_io.BeamSource()\n", " | 'EvalModel' >> tfma.ExtractEvaluateAndWriteResults(\n", " eval_shared_model=eval_shared_model,\n", " eval_config=eval_config,\n", " output_path=output_path))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "He97LPB_NbtU" }, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }], "source": [ "# Visualize the results.\n", "result = tfma.load_eval_result(output_path=output_path)\n", "tfma.view.render_slicing_metrics(result)" ] }, { "cell_type": "markdown", "source": [ "The following image shows an example of a visualisation when evaluating one model:" ], "metadata": { "id": "DYMNK_8k-eV2" } }, { "cell_type": "markdown", "source": [ "![image.png]()" ], "metadata": { "id": "UhGtaw9897wK" } }, { "cell_type": "markdown", "source": [ "## Compare multiple models" ], "metadata": { "id": "I7ZuqLcdwpCs" } }, { "cell_type": "markdown", "source": [ "You can compare the performance of multiple models to select the best candidate to use in production. With Apache Beam, you can evaluate and compare multiple models in one step." ], "metadata": { "id": "ebkJsE3zJ2W2" } }, { "cell_type": "markdown", "source": [ "### Train a second model" ], "metadata": { "id": "ah1xX9jl44O8" } }, { "cell_type": "markdown", "source": [ "For this use case, train a second model on the full dataset." ], "metadata": { "id": "6GcGg08YKRhg" } }, { "cell_type": "code", "source": [ "# Preprocess the data.\n", "ds_train_v2 = tfds.load('diamonds', split=['train[:80%]'], as_supervised=True)[0]\n", "train_data_v2 = get_train_data(ds_train_v2)\n", "ds_train_v2 = ds_train_v2.map(transform_data)\n", "ds_train_v2 = ds_train_v2.batch(BATCH_SIZE)" ], "metadata": { "id": "cQYA0cdzwoXr" }, "execution_count": null, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }] }, { "cell_type": "code", "source": [ "# Define and train the model.\n", "model_v2 = construct_model('model_v2', train_data_v2)\n", "history = model_v2.fit(\n", " ds_train_v2,\n", " validation_data=ds_test,\n", " epochs=5,\n", " verbose=1)" ], "metadata": { "id": "WII_PC5rxzTc" }, "execution_count": null, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }] }, { "cell_type": "code", "source": [ "# Save the model to a file.\n", "model_path_v2 = 'saved_model_v2'\n", "model_v2.save(model_path_v2)" ], "metadata": { "id": "ppVFHy7myhXu" }, "execution_count": null, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }] }, { "cell_type": "markdown", "source": [ "### Evaluate the model\n", "The following code demonstrates how to compare the two models and then visualize the results." ], "metadata": { "id": "QKO-vh1X48tv" } }, { "cell_type": "code", "source": [ "# Define the TFMA evaluation configuration, including two model specs for the two models being compared.\n", "eval_config_compare = text_format.Parse(\"\"\"\n", " ## Model information\n", " model_specs {\n", " name: \"model_v1\"\n", " # For keras (and serving models), add a `label_key`.\n", " label_key: \"output\"\n", " is_baseline: true\n", " }\n", " model_specs {\n", " name: \"model_v2\"\n", " # For keras (and serving models), add a `label_key`.\n", " label_key: \"output\"\n", " }\n", "\n", " ## This post-training metric information is merged with any built-in\n", " ## metrics from training.\n", " metrics_specs {\n", " metrics { class_name: \"ExampleCount\" }\n", " metrics { class_name: \"MeanAbsoluteError\" }\n", " metrics { class_name: \"MeanSquaredError\" }\n", " metrics { class_name: \"MeanPrediction\" }\n", " }\n", "\n", " slicing_specs {}\n", "\"\"\", tfma.EvalConfig())" ], "metadata": { "id": "pB4aXUo45RAg" }, "execution_count": null, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }] }, { "cell_type": "code", "source": [ "from tfx_bsl.public import tfxio\n", "\n", "output_path_compare = 'evaluation_results_compare'\n", "\n", "eval_shared_models = [\n", " tfma.default_eval_shared_model(\n", " model_name='model_v1',\n", " eval_saved_model_path=model_path_v1,\n", " eval_config=eval_config_compare),\n", " tfma.default_eval_shared_model(\n", " model_name='model_v2',\n", " eval_saved_model_path=model_path_v2,\n", " eval_config=eval_config_compare),\n", "]\n", "\n", "tfx_io = tfxio.TFExampleRecord(\n", " file_pattern=tfrecord_file,\n", " raw_record_column_name=tfma.ARROW_INPUT_COLUMN)\n", "\n", "# Run the evaluation.\n", "with beam.Pipeline() as pipeline:\n", " _ = (\n", " pipeline\n", " | 'ReadData' >> tfx_io.BeamSource()\n", " | 'EvalModel' >> tfma.ExtractEvaluateAndWriteResults(\n", " eval_shared_model=eval_shared_models,\n", " eval_config=eval_config_compare,\n", " output_path=output_path_compare))" ], "metadata": { "id": "4eVdpW0aWTah" }, "execution_count": null, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }] }, { "cell_type": "markdown", "source": [ "Use the following code to create a visualization of the results. By default, the visualisation displays one time series, which is the evolution of the size of the validation set.\n", "To add more interesting visualisations, you can select **add metric series**, and choose to visualise the loss and mean absolute error." ], "metadata": { "id": "oEinNfzv8ZgM" } }, { "cell_type": "code", "source": [ "# Visualize the results.\n", "results = tfma.load_eval_results(output_paths=output_path_compare)\n", "tfma.view.render_time_series(results)" ], "metadata": { "id": "G7B-GGYB6Dmu" }, "execution_count": null, "outputs": [{ "output_type": "stream", "name": "stdout", "text": [ "\n" ] }] }, { "cell_type": "markdown", "source": [ "The following image displays an example of a visualisation that evaluates multiple models:" ], "metadata": { "id": "GgM5vLt8-7cj" } }, { "cell_type": "markdown", "source": [ "![image.png]()" ], "metadata": { "id": "BxsQmjkq-70M" } } ], "metadata": { "colab": { "provenance": [], "toc_visible": true }, "gpuClass": "standard", "kernelspec": { "display_name": "Python 3", "name": "python3" }, "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 0 }