{ "nbformat": 4, "nbformat_minor": 0, "metadata": { "colab": { "provenance": [], "toc_visible": true }, "kernelspec": { "name": "python3", "display_name": "Python 3" }, "language_info": { "name": "python" } }, "cells": [ { "cell_type": "code", "source": [ "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n", "\n", "# Licensed to the Apache Software Foundation (ASF) under one\n", "# or more contributor license agreements. See the NOTICE file\n", "# distributed with this work for additional information\n", "# regarding copyright ownership. The ASF licenses this file\n", "# to you under the Apache License, Version 2.0 (the\n", "# \"License\"); you may not use this file except in compliance\n", "# with the License. You may obtain a copy of the License at\n", "#\n", "# http://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing,\n", "# software distributed under the License is distributed on an\n", "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", "# KIND, either express or implied. See the License for the\n", "# specific language governing permissions and limitations\n", "# under the License" ], "metadata": { "id": "SGjEjVxwudf2" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "# Apache Beam RunInference with Hugging Face\n", "\n", "\n", " \n", " \n", "
\n", " Run in Google Colab\n", " \n", " View source on GitHub\n", "
" ], "metadata": { "id": "BJ0mTOhFucGg" } }, { "cell_type": "markdown", "source": [ "This notebook shows how to use models from [Hugging Face](https://huggingface.co/) and [Hugging Face pipeline](https://huggingface.co/docs/transformers/main_classes/pipelines) in Apache Beam pipelines that uses the [RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference) transform.\n", "\n", "Apache Beam has built-in support for Hugging Face model handlers. Hugging Face has three model handlers:\n", "\n", "\n", "\n", "* Use the [`HuggingFacePipelineModelHandler`](https://github.com/apache/beam/blob/926774dd02be5eacbe899ee5eceab23afb30abca/sdks/python/apache_beam/ml/inference/huggingface_inference.py#L567) model handler to run inference with [Hugging Face pipelines](https://huggingface.co/docs/transformers/main_classes/pipelines#pipelines).\n", "* Use the [`HuggingFaceModelHandlerKeyedTensor`](https://github.com/apache/beam/blob/926774dd02be5eacbe899ee5eceab23afb30abca/sdks/python/apache_beam/ml/inference/huggingface_inference.py#L208) model handler to run inference with models that uses keyed tensors as inputs. For example, you might use this model handler with language modeling tasks.\n", "* Use the [`HuggingFaceModelHandlerTensor`](https://github.com/apache/beam/blob/926774dd02be5eacbe899ee5eceab23afb30abca/sdks/python/apache_beam/ml/inference/huggingface_inference.py#L392) model handler to run inference with models that uses tensor inputs, such as `tf.Tensor` or `torch.Tensor`. \n", "\n", "\n", "For more information about using RunInference, see [Get started with AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the Apache Beam documentation." ], "metadata": { "id": "GBloorZevCXC" } }, { "cell_type": "markdown", "source": [ "## Install dependencies" ], "metadata": { "id": "xpylrB7_2Jzk" } }, { "cell_type": "markdown", "source": [ "Install both Apache Beam and the required dependencies for Hugging Face." ], "metadata": { "id": "IBQLg8on2S40" } }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "yrqNSBB3qsI1" }, "outputs": [], "source": [ "!pip install torch --quiet\n", "!pip install tensorflow --quiet\n", "!pip install transformers==4.44.2 --quiet\n", "!pip install apache-beam[gcp]>=2.50 --quiet" ] }, { "cell_type": "code", "source": [ "from typing import Dict\n", "from typing import Iterable\n", "from typing import Tuple\n", "\n", "import tensorflow as tf\n", "import torch\n", "from transformers import AutoTokenizer\n", "from transformers import TFAutoModelForMaskedLM\n", "\n", "import apache_beam as beam\n", "from apache_beam.ml.inference.base import KeyedModelHandler\n", "from apache_beam.ml.inference.base import PredictionResult\n", "from apache_beam.ml.inference.base import RunInference\n", "from apache_beam.ml.inference.huggingface_inference import HuggingFacePipelineModelHandler\n", "from apache_beam.ml.inference.huggingface_inference import HuggingFaceModelHandlerKeyedTensor\n", "from apache_beam.ml.inference.huggingface_inference import HuggingFaceModelHandlerTensor\n", "from apache_beam.ml.inference.huggingface_inference import PipelineTask\n" ], "metadata": { "id": "BIDZLGFRrAmF" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "## Use RunInference with Hugging Face pipelines" ], "metadata": { "id": "OQ1wv6xk3UeV" } }, { "cell_type": "markdown", "source": [ "You can use [Hugging Face pipelines](https://huggingface.co/docs/transformers/main_classes/pipelines#pipelines) with `RunInference` by using the `HuggingFacePipelineModelHandler` model handler. Similar to the Hugging Face pipelines, to instantiate the model handler, the model handler needs either the pipeline `task` or the `model` that defines the task. To pass any optional arguments to load the pipeline, use `load_pipeline_args`. To pass the optional arguments for inference, use `inference_args`.\n", "\n", "\n", "\n", "You can define the pipeline task in one of the following two ways:\n", "\n", "\n", "\n", "* In the form of string, for example `\"translation\"`. This option is similar to how the pipeline task is defined when using Hugging Face.\n", "* In the form of a [`PipelineTask`](https://github.com/apache/beam/blob/ac936b0b89a92d836af59f3fc04f5733ad6819b3/sdks/python/apache_beam/ml/inference/huggingface_inference.py#L75) enum object defined in Apache Beam, such as `PipelineTask.Translation`.\n" ], "metadata": { "id": "hWZQ49Pt3ojg" } }, { "cell_type": "markdown", "source": [ "### Create a model handler\n", "\n", "This example demonstrates a task that translates text from English to Spanish." ], "metadata": { "id": "pVVg9RfET86L" } }, { "cell_type": "code", "source": [ "model_handler = HuggingFacePipelineModelHandler(\n", " task=PipelineTask.Translation_XX_to_YY,\n", " model = \"google/flan-t5-small\",\n", " load_pipeline_args={'framework': 'pt'},\n", " inference_args={'max_length': 200}\n", ")" ], "metadata": { "id": "aF_BDPXk3UG4" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "### Define the input examples\n", "\n", "Use this code to define the input examples." ], "metadata": { "id": "lxTImFGJUBIw" } }, { "cell_type": "code", "source": [ "text = [\"translate English to Spanish: How are you doing?\",\n", " \"translate English to Spanish: This is the Apache Beam project.\"]" ], "metadata": { "id": "POAuIFS_UDgE" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "### Postprocess the results\n", "\n", "The output from the `RunInference` transform is a `PredictionResult` object. Use that output to extract inferences, and then format and print the results." ], "metadata": { "id": "Ay6-7DD5TZLn" } }, { "cell_type": "code", "source": [ "class FormatOutput(beam.DoFn):\n", " \"\"\"\n", " Extract the results from PredictionResult and print the results.\n", " \"\"\"\n", " def process(self, element):\n", " example = element.example\n", " translated_text = element.inference[0]['translation_text']\n", " print(f'Example: {example}')\n", " print(f'Translated text: {translated_text}')\n", " print('-' * 80)\n" ], "metadata": { "id": "74I3U1JsrG0R" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "### Run the pipeline\n", "\n", "Use the following code to run the pipeline." ], "metadata": { "id": "Ve2cpHZ_UH0o" } }, { "cell_type": "code", "source": [ "with beam.Pipeline() as beam_pipeline:\n", " examples = (\n", " beam_pipeline\n", " | \"CreateExamples\" >> beam.Create(text)\n", " )\n", " inferences = (\n", " examples\n", " | \"RunInference\" >> RunInference(model_handler)\n", " | \"Print\" >> beam.ParDo(FormatOutput())\n", " )" ], "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "_xStwO3qubqr", "outputId": "5aeef601-c3e5-4b0f-e982-183ff36dc56e" }, "execution_count": null, "outputs": [ { "output_type": "stream", "name": "stdout", "text": [ "Example: translate English to Spanish: How are you doing?\n", "Translated text: Cómo está acerca?\n", "--------------------------------------------------------------------------------\n", "Example: translate English to Spanish: This is the Apache Beam project.\n", "Translated text: Esto es el proyecto Apache Beam.\n", "--------------------------------------------------------------------------------\n" ] } ] }, { "cell_type": "markdown", "source": [ "### Try with a different model\n", "One of the best parts of using RunInference is how easy it is to swap in different models. For example, if we wanted to use a larger model like DeepSeek-R1-Distill-Llama-8B outside of Colab (which has very tight memory constraints and limited GPU access), all we need to change is our ModelHandler:\n", "\n", "```\n", "model_handler = HuggingFacePipelineModelHandler(\n", " task=PipelineTask.Translation_XX_to_YY,\n", " model = \"deepseek-ai/DeepSeek-R1-Distill-Llama-8B\",\n", " load_pipeline_args={'framework': 'pt'},\n", " inference_args={'max_length': 400}\n", ")\n", "```\n", "\n", "We can then run the exact same pipeline code and Beam will take care of the rest." ], "metadata": { "id": "LWNM81ivoZcF" } }, { "cell_type": "markdown", "source": [ "## RunInference with a pretrained model from Hugging Face Hub\n" ], "metadata": { "id": "KJEsPkXnUS5y" } }, { "cell_type": "markdown", "source": [ "To use pretrained models directly from [Hugging Face Hub](https://huggingface.co/docs/hub/models), use either the `HuggingFaceModelHandlerTensor` model handler or the `HuggingFaceModelHandlerKeyedTensor` model handler. Which model handler you use depends on your input type:\n", "\n", "\n", "* Use `HuggingFaceModelHandlerKeyedTensor` to run inference with models that uses keyed tensors as inputs.\n", "* Use `HuggingFaceModelHandlerTensor` to run inference with models that uses tensor inputs, such as `tf.Tensor` or `torch.Tensor`.\n", "\n", "When you construct your pipeline, you might also need to use the following items:\n", "\n", "\n", "* Use the `load_model_args` to provide optional arguments to load the model.\n", "* Use the `inference_args` argument to do the inference.\n", "* For TensorFlow models, specify the `framework='tf'`.\n", "* For PyTorch models, specify the `framework='pt'`.\n", "\n", "\n", "\n", "The following language modeling task predicts the masked word in a sentence." ], "metadata": { "id": "mDcpG78tWcBN" } }, { "cell_type": "markdown", "source": [ "### Create a model handler\n", "\n", "This example shows a masked language modeling task. These models take keyed tensors as inputs." ], "metadata": { "id": "dU6NDE4DaRuF" } }, { "cell_type": "code", "source": [ "model_handler = HuggingFaceModelHandlerKeyedTensor(\n", " model_uri=\"stevhliu/my_awesome_eli5_mlm_model\",\n", " model_class=TFAutoModelForMaskedLM,\n", " framework='tf',\n", " load_model_args={'from_pt': True},\n", " max_batch_size=1\n", ")" ], "metadata": { "id": "Zx1ep1UXWYrq" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "### Define the input examples\n", "\n", "Use this code to define the input examples." ], "metadata": { "id": "D18eQZfgcIM6" } }, { "cell_type": "code", "source": [ "text = ['The capital of France is Paris .',\n", " 'It is raining cats and dogs .',\n", " 'He looked up and saw the sun and stars .',\n", " 'Today is Monday and tomorrow is Tuesday .',\n", " 'There are 5 coconuts on this palm tree .']" ], "metadata": { "id": "vWI_A6VrcH-m" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "### Preprocess the input\n", "\n", "Edit the given input to replace the last word with a ``. Then, tokenize the input for doing inference." ], "metadata": { "id": "-62nvMSbeNBy" } }, { "cell_type": "code", "source": [ "def add_mask_to_last_word(text: str) -> Tuple[str, str]:\n", " \"\"\"Replace the last word of sentence with and return\n", " the original sentence and the masked sentence.\"\"\"\n", " text_list = text.split()\n", " masked = ' '.join(text_list[:-2] + ['' + text_list[-1]])\n", " return text, masked\n", "\n", "tokenizer = AutoTokenizer.from_pretrained(\"stevhliu/my_awesome_eli5_mlm_model\")\n", "\n", "def tokenize_sentence(\n", " text_and_mask: Tuple[str, str],\n", " tokenizer) -> Tuple[str, Dict[str, tf.Tensor]]:\n", " \"\"\"Convert string examples to tensors.\"\"\"\n", " text, masked_text = text_and_mask\n", " tokenized_sentence = tokenizer.encode_plus(\n", " masked_text, return_tensors=\"tf\")\n", "\n", " # Workaround to manually remove batch dim until we have the feature to\n", " # add optional batching flag.\n", " # TODO(https://github.com/apache/beam/issues/21863): Remove when optional\n", " # batching flag added\n", " return text, {\n", " k: tf.squeeze(v)\n", " for k, v in dict(tokenized_sentence).items()\n", " }" ], "metadata": { "id": "d6TXVfWhWRzz" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "### Postprocess the results\n", "\n", "Extract the result from the `PredictionResult` object. Then, format the output to print the actual sentence and the word predicted for the last word in the sentence." ], "metadata": { "id": "KnLtuYyTfC-g" } }, { "cell_type": "code", "source": [ "class PostProcessor(beam.DoFn):\n", " \"\"\"Processes the PredictionResult to get the predicted word.\n", "\n", " The logits are the output of the BERT Model. To get the word with the highest\n", " probability of being the masked word, take the argmax.\n", " \"\"\"\n", " def __init__(self, tokenizer):\n", " super().__init__()\n", " self.tokenizer = tokenizer\n", "\n", " def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]:\n", " text, prediction_result = element\n", " inputs = prediction_result.example\n", " logits = prediction_result.inference['logits']\n", " mask_token_index = tf.where(inputs[\"input_ids\"] == self.tokenizer.mask_token_id)[0]\n", " predicted_token_id = tf.math.argmax(logits[mask_token_index[0]], axis=-1)\n", " decoded_word = self.tokenizer.decode(predicted_token_id)\n", " print(f\"Actual Sentence: {text}\\nPredicted last word: {decoded_word}\")\n", " print('-' * 80)" ], "metadata": { "id": "DnWlNV1kelnq" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "### Run the pipeline\n", "\n", "Use the following code to run the pipeline." ], "metadata": { "id": "scepcVUpgD63" } }, { "cell_type": "code", "source": [ "with beam.Pipeline() as beam_pipeline:\n", " tokenized_examples = (\n", " beam_pipeline\n", " | \"CreateExamples\" >> beam.Create(text)\n", " | 'AddMask' >> beam.Map(add_mask_to_last_word)\n", " | 'TokenizeSentence' >>\n", " beam.Map(lambda x: tokenize_sentence(x, tokenizer)))\n", "\n", " result = (\n", " tokenized_examples\n", " | \"RunInference\" >> RunInference(KeyedModelHandler(model_handler))\n", " | \"PostProcess\" >> beam.ParDo(PostProcessor(tokenizer))\n", " )" ], "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "IEPrQGEWgBCo", "outputId": "218cc1f4-2613-4bf1-9666-782df020536b" }, "execution_count": null, "outputs": [ { "output_type": "stream", "name": "stdout", "text": [ "Actual Sentence: The capital of France is Paris .\n", "Predicted last word: Paris\n", "--------------------------------------------------------------------------------\n", "Actual Sentence: It is raining cats and dogs .\n", "Predicted last word: dogs\n", "--------------------------------------------------------------------------------\n", "Actual Sentence: He looked up and saw the sun and stars .\n", "Predicted last word: stars\n", "--------------------------------------------------------------------------------\n", "Actual Sentence: Today is Monday and tomorrow is Tuesday .\n", "Predicted last word: Tuesday\n", "--------------------------------------------------------------------------------\n", "Actual Sentence: There are 5 coconuts on this palm tree .\n", "Predicted last word: tree\n", "--------------------------------------------------------------------------------\n" ] } ] } ] }