Azure CosmosDB Vector Search with LlamaIndex

Implementing Vector Searches in Azure with LlamaIndex and CosmosDB for MongoDB vCore

Why vector database in Azure?

If you start working with LLMs you will find very quickly the limitations of the models. One way to break those limitations and increase the amount of information you can query about is to build indices over the information and use those indices to filter and only use the LLM on a subset of all the documents. LLMs and embeddings makes this easy since they give you a numerical vector representation of the information of a query, and you can use operations such as cosine similarity to find which documents are closest.

There are a lot of resources out there about why and how this works, but here we are interested in how we can do this in a managed vector database in Azure. A vector database gives you the ability to run this queries in a very efficient way, that will scale well as you increase your usage of it. As of today, the issue is that most vector databases needs to be either hosted yourself or are third party services, both of these present it’s own risks, so Microsoft has released a version of CosmosDB for Mongo DB that enables the MongoDB capabilities to do this vector searches. Here is the documentation:

Integrated vector store - Azure Cosmos DB for MongoDB vCore

Use the integrated vector store in Azure Cosmos DB for MongoDB vCore to enhance AI-based applications.

This can leverage all the CosmosDB capabilities for scaling, networking and maintanibily, even if the vCore model is less flexible that the other options that CosmosDB offers.

LLamaIndex implementation

LLamaIndex already has a MongoDB Atlas connector, you can find it here in this documentation:

This is how you would use it out of the box:

# Provide URI to constructor, or use environment variable
import pymongo
from llama_index.vector_stores.mongodb import MongoDBAtlasVectorSearch
from llama_index.indices.vector_store.base import VectorStoreIndex
from llama_index.storage.storage_context import StorageContext
from llama_index.readers.file.base import SimpleDirectoryReader

# mongo_uri = os.environ["MONGO_URI"]
mongo_uri = "mongodb+srv://<username>:<password>@<host>?retryWrites=true&w=majority"
mongodb_client = pymongo.MongoClient(mongo_uri)

# construct store
store = MongoDBAtlasVectorSearch(mongodb_client)
storage_context = StorageContext.from_defaults(vector_store=store)
uber_docs = SimpleDirectoryReader(input_files=["../data/10k/uber_2021.pdf"]).load_data()

# construct index
index = VectorStoreIndex.from_documents(uber_docs, storage_context=storage_context)

Microsofts implemetation of the vector operations diverges from MongoDB Atlas, so we can not use it by itself, but we can make use of it as a base and override the key vector operations. Save the bellow code to a module cosmosdb.py

This module overwrites the query() method to do vector queries based on the CosmosDB syntax, as well as adds a create_index() utility to generate the vector index the first time you use it.

"""CosmosDB MongoDB Vector store index.

An index that that is built on top of an existing vector store.
"""

import logging
from typing import Any, Dict, Optional

from llama_index.schema import TextNode
from llama_index.vector_stores.mongodb import MongoDBAtlasVectorSearch
from llama_index.vector_stores.types import VectorStoreQuery, VectorStoreQueryResult
from llama_index.vector_stores.utils import (
    legacy_metadata_dict_to_node,
    metadata_dict_to_node,
)

logger = logging.getLogger(__name__)

from pymongo import IndexModel
from scipy import spatial


class CosmosDBMongoDBVectorSearch(MongoDBAtlasVectorSearch):
    def __init__(
        self,
        mongodb_client: Optional[Any] = None,
        db_name: str = "default_db",
        collection_name: str = "default_collection",
        index_name: str = "default",
        id_key: str = "id",
        embedding_key: str = "embedding",
        text_key: str = "text",
        metadata_key: str = "metadata",
        insert_kwargs: Optional[Dict] = None,
        **kwargs: Any,
    ):
        super().__init__(
            mongodb_client=mongodb_client,
            db_name=db_name,
            collection_name=collection_name,
            index_name=index_name,
            id_key=id_key,
            embedding_key=embedding_key,
            text_key=text_key,
            metadata_key=metadata_key,
            insert_kwargs=insert_kwargs,
            **kwargs,
        )

    def create_index(self, dimensions=1536):  # Dimensions of gpt-3
        idx = IndexModel(
            name=self._index_name,
            keys=[(self._embedding_key, "cosmosSearch")],
            cosmosSearchOptions={
                "kind": "vector-ivf",
                "numLists": 1,
                "similarity": "COS",
                "dimensions": dimensions,
            },
        )
        self._collection.create_indexes([idx])

    def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult:
        """Query index for top k most similar nodes.

        Args:
            query_embedding (List[float]): query embedding
            similarity_top_k (int): top k most similar nodes

        """

        cosmos_search: Dict[str, Any] = {
            "vector": query.query_embedding,
            "path": self._embedding_key,
            "k": query.similarity_top_k,
        }

        pipeline = [
            {
                "$search": {
                    # "index": self._index_name,
                    "cosmosSearch": cosmos_search,
                    "returnStoredSource": True,
                }
            },
        ]

        logger.debug("Running query pipeline: %s", pipeline)
        cursor = self._collection.aggregate(pipeline)  # type: ignore
        top_k_nodes = []
        top_k_ids = []
        top_k_scores = []
        for res in cursor:
            text = res.pop(self._text_key)
            score = 1 - spatial.distance.cosine(
                query.query_embedding, res[self._embedding_key]
            )
            object_id = res.pop(self._id_key)
            metadata_dict = res.pop(self._metadata_key)

            try:
                node = metadata_dict_to_node(metadata_dict)
                node.set_content(text)
            except Exception:
                # NOTE: deprecated legacy logic for backward compatibility
                metadata, node_info, relationships = legacy_metadata_dict_to_node(
                    metadata_dict
                )

                node = TextNode(
                    text=text,
                    id_=object_id,
                    metadata=metadata,
                    start_char_idx=node_info.get("start", None),
                    end_char_idx=node_info.get("end", None),
                    relationships=relationships,
                )

            top_k_ids.append(object_id)
            top_k_nodes.append(node)
            top_k_scores.append(score)

        query_result = VectorStoreQueryResult(
            nodes=top_k_nodes, similarities=top_k_scores, ids=top_k_ids
        )
        logger.debug("Result of query: %s", query_result)
        return query_result

And here is how we are using this module as part of LlamaIndex:

# %%
%pip install pymongo
%pip install scipy
%pip install llama_index

# %%
import pymongo

from cosmosdb import CosmosDBMongoDBVectorSearch
from llama_index.indices.vector_store.base import VectorStoreIndex
from llama_index.storage.storage_context import StorageContext
from llama_index.readers.file.base import SimpleDirectoryReader

import os

mongo_uri = os.environ["MONGO_DB_STRING"]
mongodb_client = pymongo.MongoClient(mongo_uri)

# %%
cog_search_content = open("./text.txt").read()

# %%
from typing import Any, Dict, List, Optional

# %%
from llama_index import (
    PromptHelper,
    LLMPredictor,
    ServiceContext,
)

from langchain import OpenAI

from llama_index import VectorStoreIndex, Document

# Prompt Config
max_input_size = 4096
num_output = 250
max_chunk_overlap = 0.1

# OpenAI Config
temperature = 0
model_name = "text-davinci-003"
max_tokens = 512

# %%
# Define OpenAI service
prompt_helper = PromptHelper(max_input_size, num_output, max_chunk_overlap)
llm_predictor = LLMPredictor(
    llm=OpenAI(
        temperature=temperature,
        model_name=model_name,
        max_tokens=num_output,
    )
)
service_context = ServiceContext.from_defaults(
    llm_predictor=llm_predictor, prompt_helper=prompt_helper
)

# construct store
store = CosmosDBMongoDBVectorSearch(
    mongodb_client, collection_name="document_collection"
)
storage_context = StorageContext.from_defaults(vector_store=store)

# %%
# First time to create the index in the mongo collection
if store._index_name not in store._collection.index_information():
    store.create_index()

# %%
# construct index
if store._collection.count_documents({}) == 0:
    print("Creating new index")
    documents = [Document(text=cog_search_content)]
    index = VectorStoreIndex.from_documents(
        documents, storage_context=storage_context, service_context=service_context
    )
else:
    print("Reading existing index")
    index = VectorStoreIndex.from_vector_store(
        vector_store=store, service_context=service_context
    )

# %%
user_queries = [
    "Is there support included in this agreement?",
]

# %%
engine = index.as_query_engine()
results = [engine.query(query) for query in user_queries]

# %%
for query, r in zip(user_queries, results):
    print(f"{query.strip()}: {r}")
    print()
Share: X (Twitter) Facebook LinkedIn