Skip to main content

Module

Distributed tracing middleware for Starlette-based applications. This middleware automatically extracts distributed tracing headers from incoming HTTP requests and makes them available to the Galileo logger within request handlers. Works with any ASGI framework built on Starlette:
  • FastAPI
  • Starlette
  • Any other Starlette-based framework
Example usage with FastAPI:
from fastapi import FastAPI
from galileo.middleware import TracingMiddleware, get_request_logger

app = FastAPI()
app.add_middleware(TracingMiddleware)

@app.post("/process")
async def process_request(data: dict):
    # Logger automatically continues the distributed trace
    logger = get_request_logger()
    logger.add_workflow_span(input=str(data), name="process_workflow")
    # ... process request ...
    logger.conclude(output="done")
    return {"status": "success"}
Example usage with Starlette:
from starlette.applications import Starlette
from starlette.routing import Route
from galileo.middleware import TracingMiddleware, get_request_logger

async def homepage(request):
    logger = get_request_logger()
    logger.add_workflow_span(input="homepage", name="homepage_handler")
    logger.conclude(output="success")
    return {"status": "ok"}

app = Starlette(
    routes=[Route("/", homepage)],
    middleware=[TracingMiddleware]
)

TracingMiddleware

Middleware that extracts distributed tracing headers from incoming requests. This middleware looks for the following headers in incoming HTTP requests:
  • X-Galileo-Trace-ID: The root trace ID
  • X-Galileo-Parent-ID: The parent span/trace ID to attach to
These values are stored in context variables, making them available to request handlers via the get_request_logger() function. The middleware is compatible with FastAPI and any Starlette-based framework. Note: Project and log_stream are configured per service via environment variables (GALILEO_PROJECT and GALILEO_LOG_STREAM). They are not propagated via headers, following standard distributed tracing patterns.

dispatch

async def dispatch(self,
                   request: Request,
                   call_next: RequestResponseEndpoint) -> Response
Process the request and extract tracing headers. Arguments
  • request (Request): The incoming HTTP request
  • call_next (RequestResponseEndpoint): The next middleware or route handler
Returns
  • Response: The HTTP response

get_request_logger

def get_request_logger() -> GalileoLogger
Get a request-scoped GalileoLogger configured for distributed mode. Note: Distributed mode enables distributed tracing across services by propagating trace context and sending updates immediately to the backend. This function should be called within a request handler after the TracingMiddleware has been registered. It creates a new GalileoLogger instance per request that automatically continues the distributed trace from the upstream service. The logger is configured using trace context extracted by the middleware:
  • X-Galileo-Trace-ID: Root trace ID
  • X-Galileo-Parent-ID: Parent span/trace ID to attach to
Project and log_stream are configured per service via environment variables (GALILEO_PROJECT and GALILEO_LOG_STREAM), not propagated via headers, following standard distributed tracing patterns. If no tracing headers were present in the request, a regular logger is returned (using GALILEO_PROJECT and GALILEO_LOG_STREAM env vars). Note: This creates a new logger per request, unlike the decorator’s get_logger_instance() which uses a singleton pattern. Returns
  • GalileoLogger: A logger instance configured for the current request’s trace context
Examples
@app.post("/process")
async def process_request(data: dict):
    logger = get_request_logger()

    # This span will be attached to the distributed trace
    logger.add_workflow_span(input=str(data), name="process_workflow")
    result = await process(data)
    logger.conclude(output=str(result))

    logger.flush()
    logger.terminate()

    return {"result": result}
@app.post("/retrieve")
async def retrieve_endpoint(query: str):
    # Get logger with trace context from upstream service
    logger = get_request_logger()

    # If trace context exists, this creates a workflow span
    # Otherwise, it starts a new trace
    if logger.trace_id:
        logger.add_workflow_span(input=query, name="retrieval_service")
    else:
        logger.start_trace(input=query, name="retrieval_service")

    results = retrieve(query, logger)

    logger.conclude(output=str(results))
    logger.flush()
    logger.terminate()

    return {"results": results}