# Using LlamaIndex Workflows with Elasticsearch

This notebook demonstrates how to use AutoGen with Elasticsearch. This notebook is based on the article [Using AutoGen with Elasticsearch](https://www.elastic.co/search-labs/blog/using-autogen-with-elasticsearch).

## Installing dependencies and importing packages

In [None]:
%pip install elasticsearch==8.17 llama-index llama-index-llms-groq

In [None]:
import os
import json
from getpass import getpass

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

from llama_index.llms.groq import Groq
from llama_index.core.workflow import (
 Event,
 StartEvent,
 StopEvent,
 Workflow,
 step,
)

## Declaring variables

In [None]:
os.environ["GROQ_API_KEY"] = getpass("Groq Api key: ")
os.environ["ELASTIC_ENDPOINT"] = getpass("Elastic Endpoint: ")
os.environ["ELASTIC_API_KEY"] = getpass("Elastic Api key: ")

## Elasticsearch client

In [None]:
# Elasticsearch client
_client = Elasticsearch(
 os.environ["ELASTIC_ENDPOINT"],
 api_key=os.environ["ELASTIC_API_KEY"],
)

## Elasticsearch mappings

In [None]:
INDEX_NAME = "hotel-rooms"

In [None]:
try:
 _client.indices.create(
 index=INDEX_NAME,
 body={
 "mappings": {
 "properties": {
 "room_name": {"type": "text"},
 "description": {"type": "text"},
 "price_per_night": {"type": "integer"},
 "beds": {"type": "byte"},
 "features": {"type": "keyword"},
 }
 }
 },
 )

 print("index created successfully")
except Exception as e:
 print(
 f"Error creating inference endpoint: {e.info['error']['root_cause'][0]['reason'] }"
 )

In [None]:
documents = [
 {
 "room_name": "Standard Room",
 "beds": 1,
 "description": "A cozy room with a comfortable queen-size bed, ideal for solo travelers or couples.",
 "price_per_night": 80,
 "features": ["air conditioning", "wifi", "flat-screen TV", "mini fridge"],
 },
 {
 "room_name": "Deluxe Room",
 "beds": 1,
 "description": "Spacious room with a king-size bed and modern amenities for a luxurious stay.",
 "price_per_night": 120,
 "features": ["air conditioning", "wifi", "smart TV", "mini bar", "city view"],
 },
 {
 "room_name": "Family Room",
 "beds": 2,
 "description": "A large room with two queen-size beds, perfect for families or small groups.",
 "price_per_night": 150,
 "features": ["air conditioning", "wifi", "flat-screen TV", "sofa", "bath tub"],
 },
 {
 "room_name": "Suite",
 "beds": 1,
 "description": "An elegant suite with a separate living area, offering maximum comfort and luxury.",
 "price_per_night": 200,
 "features": ["air conditioning", "wifi", "smart TV", "jacuzzi", "balcony"],
 },
 {
 "room_name": "Penthouse Suite",
 "beds": 1,
 "description": "The ultimate luxury experience with a panoramic view and top-notch amenities.",
 "price_per_night": 350,
 "features": [
 "air conditioning",
 "wifi",
 "private terrace",
 "jacuzzi",
 "exclusive lounge access",
 ],
 },
 {
 "room_name": "Single Room",
 "beds": 1,
 "description": "A compact and comfortable room designed for solo travelers on a budget.",
 "price_per_night": 60,
 "features": ["wifi", "air conditioning", "desk", "flat-screen TV"],
 },
 {
 "room_name": "Double Room",
 "beds": 1,
 "description": "A well-furnished room with a queen-size bed, ideal for couples or business travelers.",
 "price_per_night": 100,
 "features": ["air conditioning", "wifi", "mini fridge", "work desk"],
 },
 {
 "room_name": "Executive Suite",
 "beds": 1,
 "description": "A high-end suite with premium furnishings and exclusive business amenities.",
 "price_per_night": 250,
 "features": [
 "air conditioning",
 "wifi",
 "smart TV",
 "conference table",
 "city view",
 ],
 },
 {
 "room_name": "Honeymoon Suite",
 "beds": 1,
 "description": "A romantic suite with a king-size bed, perfect for newlyweds and special occasions.",
 "price_per_night": 220,
 "features": [
 "air conditioning",
 "wifi",
 "hot tub",
 "romantic lighting",
 "balcony",
 ],
 },
 {
 "room_name": "Presidential Suite",
 "beds": 2,
 "description": "A luxurious suite with separate bedrooms and a living area, offering first-class comfort.",
 "price_per_night": 500,
 "features": [
 "air conditioning",
 "wifi",
 "private dining area",
 "personal butler service",
 "exclusive lounge access",
 ],
 },
]

In [None]:
def build_data():
 for doc in documents:
 yield {"_index": INDEX_NAME, "_source": doc}


try:
 success, errors = bulk(_client, build_data())
 print(f"{success} documents indexed successfully")
 if errors:
 print("Errors during indexing:", errors)

except Exception as e:
 print(f"Error: {str(e)}")

## Llama-index workflow

In [None]:
class ElasticsearchRequest:
 @staticmethod
 def get_mappings(_es_client: Elasticsearch):
 """
 Get the mappings of the Elasticsearch index.
 """

 return _es_client.indices.get_mapping(index=INDEX_NAME)

 @staticmethod
 async def do_es_query(query: str, _es_client: Elasticsearch):
 """
 Execute an Elasticsearch query and return the results as a JSON string.
 """

 try:
 parsed_query = json.loads(query)

 if "query" not in parsed_query:
 return Exception(
 "Error: Query JSON must contain a 'query' key"
 ) # if the query is not a valid JSON return an error

 response = _es_client.search(index=INDEX_NAME, body=parsed_query)
 hits = response["hits"]["hits"]

 if not hits or len(hits) == 0:
 return Exception(
 "Query has not found any results"
 ) # if the query has no results return an error

 return json.dumps([hit["_source"] for hit in hits], indent=2)

 except json.JSONDecodeError:
 return Exception("Error: Query JSON no valid format")
 except Exception as e:
 return Exception(str(e))

In [None]:
EXTRACTION_PROMPT = """
Context information is below:
---------------------
{passage}
---------------------

Given the context information and not prior knowledge, create a Elastic search query from the information in the context.
The query must return the documents that match with query and the context information and the query used for retrieve the results.
{schema}

"""

REFLECTION_PROMPT = """
You already created this output previously:
---------------------
{wrong_answer}
---------------------

This caused the error: {error}

Try again, the response must contain only valid Elasticsearch queries. Do not add any sentence before or after the JSON object.
Do not repeat the query.
"""

In [None]:
class ExtractionDone(Event):
 output: str
 passage: str


class ValidationErrorEvent(Event):
 error: str
 wrong_output: str
 passage: str

In [None]:
class ReflectionWorkflow(Workflow):
 model_retries: int = 0
 max_retries: int = 3

 @step()
 async def extract(
 self, ev: StartEvent | ValidationErrorEvent
 ) -> StopEvent | ExtractionDone:

 print("\n=== EXTRACT STEP ===\n")

 if isinstance(ev, StartEvent):
 model = ev.get("model")
 passage = ev.get("passage")

 if not passage:
 return StopEvent(result="Please provide some text in input")

 reflection_prompt = ""
 elif isinstance(ev, ValidationErrorEvent):
 passage = ev.passage
 model = ev.model

 reflection_prompt = REFLECTION_PROMPT.format(
 wrong_answer=ev.wrong_output, error=ev.error
 )

 llm = Groq(model=model, api_key=os.environ["GROQ_API_KEY"])

 prompt = EXTRACTION_PROMPT.format(
 passage=passage, schema=ElasticsearchRequest.get_mappings(_client)
 )
 if reflection_prompt:
 prompt += reflection_prompt

 output = await llm.acomplete(prompt)

 print(f"MODEL: {model}")
 print(f"OUTPUT: {output}")
 print("=================\n")

 return ExtractionDone(output=str(output), passage=passage, model=model)

 @step()
 async def validate(self, ev: ExtractionDone) -> StopEvent | ValidationErrorEvent:

 print("\n=== VALIDATE STEP ===\n")

 try:
 results = await ElasticsearchRequest.do_es_query(ev.output, _client)
 self.model_retries += 1

 if self.model_retries > self.max_retries and ev.model != "llama3-70b-8192":
 print(f"Max retries for model {ev.model} reached, changing model\n")
 model = "llama3-70b-8192" # if the some error occurs, the model will be changed to llama3-70b-8192
 else:
 model = ev.model

 print(f"ELASTICSEARCH RESULTS: {results}")

 if isinstance(results, Exception):
 print("STATUS: Validation failed, retrying...\n")
 print("===================\n")

 return ValidationErrorEvent(
 error=str(results),
 wrong_output=ev.output,
 passage=ev.passage,
 model=model,
 )

 except Exception as e:
 print("STATUS: Validation failed, retrying...\n")
 print("===================\n")

 return ValidationErrorEvent(
 error=str(e),
 wrong_output=ev.output,
 passage=ev.passage,
 model=model,
 )

 return StopEvent(result=ev.output)

## Execute workflow

In [None]:
w = ReflectionWorkflow(timeout=60, verbose=True)

user_prompt = "Rooms with smart TV, wifi, jacuzzi and price per night less than 300"

result = await w.run(
 passage=f"I need the best possible query for documents that have: {user_prompt}",
 model="mistral-saba-24b",
)

print(result)

## Cleaning environment

Delete the resources used to prevent them from consuming resources.

In [None]:
# Cleanup - Delete Index
_client.indices.delete(index=INDEX_NAME, ignore=[400, 404])