Overview
In this tutorial, you’ll learn how to build a Retrieval-Augmented Generation (RAG) application that combines:
- Elasticsearch for document storage and semantic search using the ELSER model
- LangGraph for building conversational agents
- Galileo for end-to-end observability and logging
This tutorial is intended for Python developers who want to build production-ready RAG applications. By the end, you’ll have a working chatbot that can answer questions about your documents with full observability.
What you’ll build
You’ll create a RAG chatbot that:
- Use ELSER model for semantic search
- Stores documents in the Elasticsearch vector store
- Uses LangGraph to orchestrate retrieval and generation steps
- Monitor traces with Galileo
Prerequisites
Before starting, you’ll need:
- Python 3.10+ installed
- An Elasticsearch instance (we’ll use Elastic Cloud Serverless)
- OpenAI API key for the language model
- Galileo account for observability
Step 1: Set up Elasticsearch Cloud Serverless
First, let’s set up your Elasticsearch instance for document storage and retrieval.
Create your Elasticsearch project
- Navigate to cloud.elastic.co and create an account or log in
- Click Create serverless project
- Choose Elasticsearch as the project type
- Select Optimized for Vectors configuration
- Name your project (e.g., “rag-chatbot”) and click Create project
Create your first index
- Once your project is ready, you’ll see the index creation page
- Enter an index name:
demo
- Click Create my index
- Important: Copy and save your Elasticsearch URL and API key - you won’t see the API key again
ELSER (Elastic Learned Sparse EncodeR) provides semantic search capabilities:
- In your Elasticsearch project, go to Relevance → Inference Endpoints
- If ELSER does not exist, click Create endpoint
- Follow the ELSER docs or Elastic guide
- Note the model ID (typically
.elser_model_2_linux-x86_64
)
Step 2: Set up your Python environment
Create a new project and install the required dependencies in a virtual environment:
# Install dependencies
pip install \
elasticsearch \
langchain-elasticsearch \
langchain-openai \
langgraph \
galileo \
openai \
dotenv # Optionally depending on your environment
Create a .env
file or set these environment variables:
# Elasticsearch
ES_HOST="your-elasticsearch-host-here" # e.g., "https://your-cluster.es.us-central1.gcp.cloud.es.io:443"
ES_API_KEY="your-api-key-here"
ES_INDEX="demo"
ES_INDEX_CHAT_HISTORY="chat-history"
ELSER_MODEL=".elser_model_2_linux-x86_64" # Adjust based on your deployment
# OpenAI
OPENAI_API_KEY="your-openai-api-key-here"
# Replace with your actual Galileo credentials
GALILEO_API_KEY="your-galileo-api-key-here"
GALILEO_PROJECT="elasticsearch-rag-demo" # YOUR PROJECT NAME
GALILEO_LOG_STREAM="elastic_rag_stream"
Note: The ELSER model name varies by platform:
- Linux x86_64:
.elser_model_2_linux-x86_64
- Check your Elasticsearch ML models for the exact name
Step 4: Build the RAG Application
Now, let’s build the RAG application step-by-step. Create a Python file (e.g., demo.py
) and add the following code snippets.
Imports and Configuration
First, we import the necessary libraries and configure our environment variables. This part of the script loads your API keys and sets up the connection details for Elasticsearch, OpenAI, and Galileo.
from dotenv import load_dotenv
import os
import time
from typing import Annotated, Sequence
# Load environment variables from your .env file
load_dotenv()
from elasticsearch import Elasticsearch, NotFoundError
from langchain_core.tools.retriever import create_retriever_tool
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_core.documents import Document
from langchain_elasticsearch import ElasticsearchStore, SparseVectorStrategy
from langchain_elasticsearch import ElasticsearchChatMessageHistory
from langchain_openai import ChatOpenAI
from langgraph.graph import END, StateGraph, START
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from typing_extensions import TypedDict
# --- 1. Configuration ---
# Set up your connection details and index names.
# It's recommended to use environment variables for sensitive data.
ES_HOST = os.environ["ES_HOST"]
ES_API_KEY = os.environ["ES_API_KEY"]
ES_INDEX = os.environ["ES_INDEX"] # For example "demo"
ES_INDEX_CHAT_HISTORY = os.environ["ES_INDEX_CHAT_HISTORY"] # For example "chat-history"
ELSER_MODEL = os.getenv("ELSER_MODEL", ".elser_model_2_linux-x86_64")
1. Elasticsearch Setup
# --- 2. Elasticsearch Setup ---
# Connect to Elasticsearch ensure your IP is unblocked
print("Connecting to Elasticsearch...")
elasticsearch_client = Elasticsearch(hosts=[ES_HOST], api_key=ES_API_KEY)
print(elasticsearch_client.info())
def setup_elasticsearch():
"""
Ensures the ELSER model is deployed and sample documents are indexed.
"""
# 2a. Deploy ELSER Model (Elastic's NLP model for semantic search)
try:
elasticsearch_client.ml.get_trained_models(model_id=ELSER_MODEL)
print(f'ELSER model "{ELSER_MODEL}" is already available.')
except NotFoundError:
print(f'ELSER model "{ELSER_MODEL}" not found, starting deployment...')
elasticsearch_client.ml.put_trained_model(
model_id=ELSER_MODEL, input={"field_names": ["text_field"]}
)
while True:
status = elasticsearch_client.ml.get_trained_models(model_id=ELSER_MODEL, include="definition_status")
if status["trained_model_configs"][0]["fully_defined"]:
break
time.sleep(1)
elasticsearch_client.ml.start_trained_model_deployment(
model_id=ELSER_MODEL, wait_for="fully_allocated"
)
print(f'ELSER model "{ELSER_MODEL}" deployed successfully.')
store = ElasticsearchStore(
es_connection=elasticsearch_client,
index_name=ES_INDEX,
strategy=SparseVectorStrategy(model_id=ELSER_MODEL),
)
sample_docs = [
Document(page_content="Our company offers comprehensive health insurance including medical, dental, and vision coverage.", metadata={"source": "employee_handbook"}),
Document(page_content="Remote work policy allows employees to work from home up to 3 days per week.", metadata={"source": "employee_handbook"}),
Document(page_content="The company's vacation policy provides 15 days of paid time off for new employees, increasing to 20 days after 3 years of service.", metadata={"source": "employee_handbook"}),
]
store.add_documents(sample_docs)
time.sleep(2) # Give time for indexing
print(f"{len(sample_docs)} documents indexed successfully.")
return store
The code automatically:
- Connects to your Elasticsearch instance
- Connects to the ELSER model for semantic search
- Creates an index and stores sample documents
2. Agent Architecture
# --- 3. Agent Definition ---
# Define the state, tools, and the graph that powers the agent.
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], add_messages]
# The retriever tool searches Elasticsearch for relevant documents.
def setup_agent_and_graph(store: ElasticsearchStore):
"""
Sets up the agent, tools, and the LangGraph workflow.
"""
retriever = store.as_retriever()
retriever_tool = create_retriever_tool(
retriever,
"retrieve_workplace_documents",
"Search and return information about company policies, benefits, and processes.",
)
tools = [retriever_tool]
# Use a model that is good at tool use
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0, streaming=True, api_key=os.environ["OPENAI_API_KEY"])
agent_runnable = llm.bind_tools(tools)
# 3c. Define the Graph
# The graph defines the flow of control for the agent.
def run_agent(state: AgentState):
"""Invokes the agent to decide on the next action."""
return {"messages": [agent_runnable.invoke(state["messages"])]}
tool_node = ToolNode(tools)
workflow = StateGraph(AgentState)
workflow.add_node("agent", run_agent)
workflow.add_node("tools", tool_node)
workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", tools_condition)
workflow.add_edge("tools", "agent")
graph = workflow.compile()
return graph
- State Management: Uses
AgentState
to track conversation messages
- Tool Integration: Creates a retriever tool that searches Elasticsearch
- LangGraph Workflow: Defines the flow between agent reasoning and tool usage
3. Conversation Flow
# --- 4. Run the Agent ---
# Now, we can ask questions and get answers.
def ask_question(graph, question: str, session_id: str):
"""
Asks a question to the RAG agent and returns the answer.
"""
chat_history = ElasticsearchChatMessageHistory(
es_connection=elasticsearch_client,
index=ES_INDEX_CHAT_HISTORY,
session_id=session_id
)
inputs = {"messages": [HumanMessage(content=question)]}
final_state = graph.invoke(inputs, config={"recursion_limit": 5})
response = final_state["messages"][-1].content
# Save conversation history
chat_history.add_user_message(question)
chat_history.add_ai_message(response)
return response
- User asks a question
- Agent decides whether to use the retriever tool
- If needed, searches Elasticsearch for relevant documents
- Generates a response based on retrieved context
- Saves the conversation to chat history
Finally, we put everything together. This block of code initializes the Elasticsearch setup, compiles the agent, and starts a Q&A session. You can see how to call the ask_question
function with a sample query.
# Step 1: Set up Elasticsearch index and data
document_store = setup_elasticsearch()
# Step 2: Compile the agent and its workflow
rag_agent_graph = setup_agent_and_graph(document_store)
# Step 3: Start a Q&A session
print("\n--- Starting Q&A Session ---")
session_id = f"session-{int(time.time())}"
# Ask the first question
question1 = "How many vacation days do new hires get?"
print(f"\n❓ Question: {question1}")
answer1 = ask_question(rag_agent_graph, question1, session_id)
print(f"✅ Answer: {answer1}")
# Ask a follow-up question
question2 = "What about health insurance?"
print(f"\n❓ Question: {question2}")
answer2 = ask_question(rag_agent_graph, question2, session_id)
print(f"✅ Answer: {answer2}")
Step 4: Run the Application
To run your RAG application, save all the code into a single demo.py
file and execute it from your terminal:
5. Adding Galileo observability
- Open your Galileo Console
- Navigate to your project (f.e.
elasticsearch-rag-demo
)
- You’ll see traces for each question, showing:
- Document retrieval steps
- LLM generation
- Full conversation context
- Performance metrics
The script will:
- Connect to Elasticsearch and verify the connection
- Use the ELSER model
- Index sample documents about company policies
- Create the RAG agent with LangGraph workflow
- Run sample questions and display answers and log them to Galileo
Expected output:
Connecting to Elasticsearch...
{'name': 'your-cluster', 'cluster_name': '...', ...}
ELSER model ".elser_model_2_linux-x86_64" is already available.
3 documents indexed successfully.
--- Starting Q&A Session ---
❓ Question: How many vacation days do new hires get?
✅ Answer: New hires get 15 days of paid time off, which increases to 20 days after 3 years of service.
❓ Question: What about health insurance?
✅ Answer: The company offers comprehensive health insurance including medical, dental, and vision coverage.
Understanding the application flow
Troubleshooting
Connection issues
- Verify your Elasticsearch host URL and API key
- Ensure your IP is whitelisted if using Elastic Cloud if not using serverless
- Check that Elasticsearch is running and accessible
ELSER model issues
- Verify the model name matches your platform
- Ensure machine learning features are enabled
- Check that you have sufficient resources for model deployment
Missing documents in search
- Wait a few seconds after indexing for documents to be available
- Verify the index name matches your configuration
- Check Elasticsearch logs for indexing errors
Next steps
Now that you have a working RAG application, you can add evaluation metrics in Galileo to measure chunk attribution
Additional resources