Skip to content

Qdrant VectorDB Implementation

This section details the Qdrant implementation of the BaseVectorDB interface.

memora.vector_db.qdrant.QdrantDB

QdrantDB(
    async_client: AsyncQdrantClient = None,
    collection_name: str = "memory_collection_v0_2",
    embed_models_cache_dir: str = "./cache",
    enable_logging: bool = False,
)

Bases: BaseVectorDB

PARAMETER DESCRIPTION
async_client

A pre-initialized Async Qdrant client

TYPE: AsyncQdrantClient DEFAULT: None

collection_name

Name of the Qdrant collection

TYPE: str DEFAULT: 'memory_collection_v0_2'

embed_models_cache_dir

Directory to cache the embedding models

TYPE: str DEFAULT: './cache'

enable_logging

Whether to enable console logging

TYPE: bool DEFAULT: False

Example
from qdrant_client import AsyncQdrantClient
from memora.vector_db.qdrant import QdrantDB

qdrant_db = QdrantDB(
                async_client=AsyncQdrantClient(url="QDRANT_URL", api_key="QDRANT_API_KEY")
            )
Source code in memora/vector_db/qdrant.py
def __init__(
    self,
    async_client: AsyncQdrantClient = None,
    collection_name: str = "memory_collection_v0_2",
    embed_models_cache_dir: str = "./cache",
    enable_logging: bool = False,
):
    """
    Initialize the QdrantDB class.

    Args:
        async_client (AsyncQdrantClient): A pre-initialized Async Qdrant client
        collection_name (str): Name of the Qdrant collection
        embed_models_cache_dir (str): Directory to cache the embedding models
        enable_logging (bool): Whether to enable console logging

    Example:
        ```python
        from qdrant_client import AsyncQdrantClient
        from memora.vector_db.qdrant import QdrantDB

        qdrant_db = QdrantDB(
                        async_client=AsyncQdrantClient(url="QDRANT_URL", api_key="QDRANT_API_KEY")
                    )
        ```
    """

    # Set Qdrant Client.
    self.async_client: AsyncQdrantClient = async_client

    # Set both dense and sparse embedding models to use for hybrid search.
    self.vector_embedding_model: str = (
        "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
    )
    self.sparse_vector_embedding_model: str = "prithivida/Splade_PP_en_v1"

    self.async_client.set_model(
        self.vector_embedding_model, cache_dir=embed_models_cache_dir
    )
    self.async_client.set_sparse_model(
        self.sparse_vector_embedding_model, cache_dir=embed_models_cache_dir
    )

    # Set the collection name.
    self.collection_name = collection_name

    # Configure logging
    self.logger = logging.getLogger(__name__)
    if enable_logging:
        logging.basicConfig(level=logging.INFO)

Attributes

async_client instance-attribute

async_client: AsyncQdrantClient = async_client

collection_name instance-attribute

collection_name = collection_name

logger instance-attribute

logger = getLogger(__name__)

sparse_vector_embedding_model instance-attribute

sparse_vector_embedding_model: str = (
    "prithivida/Splade_PP_en_v1"
)

vector_embedding_model instance-attribute

vector_embedding_model: str = (
    "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
)

Functions

add_memories async

add_memories(
    org_id: str,
    user_id: str,
    agent_id: str,
    memory_ids: List[uuid.UUID],
    memories: List[str],
    obtained_at: str,
) -> None

Add memories to collection with their org_id, user_id, agent_id, and obtained_at datetime as metadata.

PARAMETER DESCRIPTION
org_id

Organization ID for the memories

TYPE: str

user_id

User ID for the memories

TYPE: str

agent_id

Agent ID for the memories

TYPE: str

memory_ids

List of UUIDs for each memory

TYPE: List[UUID]

memories

List of memory strings to add

TYPE: List[str]

obtained_at

ISO format datetime string when the memories were obtained

TYPE: str

Source code in memora/vector_db/qdrant.py
@override
async def add_memories(
    self,
    org_id: str,
    user_id: str,
    agent_id: str,
    memory_ids: List[uuid.UUID],
    memories: List[str],
    obtained_at: str,
) -> None:
    """
    Add memories to collection with their org_id, user_id, agent_id, and obtained_at datetime as metadata.

    Args:
        org_id (str): Organization ID for the memories
        user_id (str): User ID for the memories
        agent_id (str): Agent ID for the memories
        memory_ids (List[uuid.UUID]): List of UUIDs for each memory
        memories (List[str]): List of memory strings to add
        obtained_at (str): ISO format datetime string when the memories were obtained
    """

    if not memories:
        raise ValueError("At least one memory and its memory id is required")

    if len(memories) != len(memory_ids):
        raise ValueError("Length of memories and memory_ids must match")

    metadata = [
        {
            "org_id": org_id,
            "org_user_id": f"{org_id}:{user_id}",
            "user_id": user_id,
            "agent_id": agent_id,
            "obtained_at": obtained_at,
        }
        for _ in memories
    ]

    await self.async_client.add(
        collection_name=self.collection_name,
        documents=memories,
        metadata=metadata,
        ids=[str(memory_id) for memory_id in memory_ids],
        # parallel=_  # Use all CPU cores
    )
    self.logger.info(
        f"Added {len(memories)} memories to collection: {self.collection_name}"
    )

close async

close() -> None

Closes the qdrant database connection.

Source code in memora/vector_db/qdrant.py
@override
async def close(self) -> None:
    """Closes the qdrant database connection."""

    await self.async_client.close()
    self.logger.info("QdrantDB connection closed")

delete_all_organization_memories async

delete_all_organization_memories(org_id: str) -> None

Delete all memories associated with an organization.

PARAMETER DESCRIPTION
org_id

ID of the organization whose memories should be deleted

TYPE: str

Source code in memora/vector_db/qdrant.py
@override
async def delete_all_organization_memories(self, org_id: str) -> None:
    """
    Delete all memories associated with an organization.

    Args:
        org_id (str): ID of the organization whose memories should be deleted
    """

    filter_conditions = [
        models.FieldCondition(key="org_id", match=models.MatchValue(value=org_id))
    ]

    await self.async_client.delete(
        collection_name=self.collection_name,
        points_selector=models.Filter(must=filter_conditions),
    )
    self.logger.info(f"Deleted all memories for organization {org_id}")

delete_all_user_memories async

delete_all_user_memories(org_id: str, user_id: str) -> None

Delete all memories associated with a specific user.

PARAMETER DESCRIPTION
org_id

Organization ID the user belongs to

TYPE: str

user_id

ID of the user whose memories should be deleted

TYPE: str

Source code in memora/vector_db/qdrant.py
@override
async def delete_all_user_memories(self, org_id: str, user_id: str) -> None:
    """
    Delete all memories associated with a specific user.

    Args:
        org_id (str): Organization ID the user belongs to
        user_id (str): ID of the user whose memories should be deleted
    """

    await self.async_client.delete(
        collection_name=self.collection_name,
        points_selector=models.Filter(
            must=models.FieldCondition(
                key="org_user_id",
                match=models.MatchValue(value=f"{org_id}:{user_id}"),
            )
        ),
    )
    self.logger.info(
        f"Deleted all memories for user {user_id} in organization {org_id}"
    )

delete_memories async

delete_memories(memory_ids: List[str]) -> None

Delete multiple memories by their IDs.

PARAMETER DESCRIPTION
memory_ids

List of memory IDs to delete

TYPE: List[str]

Source code in memora/vector_db/qdrant.py
@override
async def delete_memories(self, memory_ids: List[str]) -> None:
    """
    Delete multiple memories by their IDs.

    Args:
        memory_ids (List[str]): List of memory IDs to delete
    """

    if memory_ids:
        await self.async_client.delete(
            collection_name=self.collection_name,
            points_selector=models.PointIdsList(points=memory_ids),
        )
        self.logger.info(f"Deleted memories with IDs: {memory_ids}")

delete_memory async

delete_memory(memory_id: str) -> None

Delete a memory by its ID with optional org/user filtering.

PARAMETER DESCRIPTION
memory_id

ID of the memory to delete

TYPE: str

Source code in memora/vector_db/qdrant.py
@override
async def delete_memory(self, memory_id: str) -> None:
    """
    Delete a memory by its ID with optional org/user filtering.

    Args:
        memory_id (str): ID of the memory to delete
    """
    if memory_id:
        await self.async_client.delete(
            collection_name=self.collection_name,
            points_selector=models.PointIdsList(points=[memory_id]),
        )
        self.logger.info(f"Deleted memory with ID: {memory_id}")

search_memories async

search_memories(
    queries: List[str],
    memory_search_scope: MemorySearchScope,
    org_id: str,
    user_id: Optional[str] = None,
    agent_id: Optional[str] = None,
) -> List[List[Tuple[schema_models.Memory, float]]]

Batch memory search with optional user/agent filtering.

PARAMETER DESCRIPTION
queries

List of search query strings

TYPE: List[str]

memory_search_scope

Memory search scope (organization or user)

TYPE: MemorySearchScope

org_id

Organization ID for filtering

TYPE: str

user_id

Optional user ID for filtering

TYPE: Optional[str] DEFAULT: None

agent_id

Optional agent ID for filtering

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
List[List[Tuple[Memory, float]]]

List[List[Tuple[Memory, float]]] of search results for each query, with a tuple containing: Memory:

+ org_id: str
+ agent_id: str
+ user_id: str
+ memory_id: str
+ memory: str
+ obtained_at: datetime

float: Score of the memory

Source code in memora/vector_db/qdrant.py
@override
async def search_memories(
    self,
    queries: List[str],
    memory_search_scope: MemorySearchScope,
    org_id: str,
    user_id: Optional[str] = None,
    agent_id: Optional[str] = None,
) -> List[List[Tuple[schema_models.Memory, float]]]:
    """
    Batch memory search with optional user/agent filtering.

    Args:
        queries (List[str]): List of search query strings
        memory_search_scope (MemorySearchScope): Memory search scope (organization or user)
        org_id (str): Organization ID for filtering
        user_id (Optional[str]): Optional user ID for filtering
        agent_id (Optional[str]): Optional agent ID for filtering

    Returns:
        List[List[Tuple[Memory, float]]] of search results for each query, with a tuple containing:
            Memory:

                + org_id: str
                + agent_id: str
                + user_id: str
                + memory_id: str
                + memory: str
                + obtained_at: datetime

            float: Score of the memory
    """

    if not queries:
        raise ValueError("At least one query is required")

    # Build filter conditions
    filter_conditions = []

    if (
        memory_search_scope == MemorySearchScope.ORGANIZATION
    ):  # Search memories across the organization.
        filter_conditions.append(
            models.FieldCondition(
                key="org_id", match=models.MatchValue(value=org_id)
            )
        )
    elif (
        memory_search_scope == MemorySearchScope.USER
    ):  # Search memories for a specific user in an organization.

        if user_id is None:
            raise ValueError(
                "user_id is required in addition to org_id for user-specific search"
            )

        filter_conditions.append(
            models.FieldCondition(
                key="org_user_id",
                match=models.MatchValue(value=f"{org_id}:{user_id}"),
            )
        )

    if (
        agent_id
    ):  # If agent id is provided, filter by agent also regardless of memory search scope.
        filter_conditions.append(
            models.FieldCondition(
                key="agent_id", match=models.MatchValue(value=agent_id)
            )
        )

    # Embed queries
    dense_embeddings = self._dense_embed_queries(queries)
    sparse_embeddings = self._sparse_embed_queries(queries)

    search_results = await self.async_client.query_batch_points(
        collection_name=self.collection_name,
        requests=[
            models.QueryRequest(
                prefetch=[
                    models.Prefetch(
                        query=models.SparseVector(
                            indices=sparse.indices, values=sparse.values
                        ),
                        using=self.async_client.get_sparse_vector_field_name(),
                        limit=12,
                    ),
                    models.Prefetch(
                        query=dense,
                        using=self.async_client.get_vector_field_name(),
                        score_threshold=0.4,
                        limit=12,
                    ),
                ],
                filter=(
                    models.Filter(must=filter_conditions)
                    if filter_conditions
                    else None
                ),
                with_payload=True,
                query=models.FusionQuery(fusion=models.Fusion.RRF),
                params=models.SearchParams(
                    quantization=models.QuantizationSearchParams(rescore=False)
                ),
            )
            for sparse, dense in zip(sparse_embeddings, dense_embeddings)
        ],
    )

    search_results = [
        [
            (
                schema_models.Memory(
                    org_id=point.payload["org_id"],
                    agent_id=point.payload["agent_id"],
                    user_id=point.payload["user_id"],
                    memory_id=point.id,
                    memory=point.payload["document"],
                    obtained_at=datetime.fromisoformat(
                        point.payload["obtained_at"]
                    ),
                ),
                point.score,
            )
            for point in query.points
            if point.score > 0.35  # Filter out low relevance memory.
        ]
        for query in search_results
    ]
    return search_results

search_memory async

search_memory(
    query: str,
    memory_search_scope: MemorySearchScope,
    org_id: str,
    user_id: Optional[str] = None,
    agent_id: Optional[str] = None,
) -> List[Tuple[schema_models.Memory, float]]

Memory search with optional user/agent filtering.

PARAMETER DESCRIPTION
query

Search query string

TYPE: str

memory_search_scope

Memory search scope (organization or user)

TYPE: MemorySearchScope

org_id

Organization ID for filtering

TYPE: str

user_id

Optional user ID for filtering

TYPE: Optional[str] DEFAULT: None

agent_id

Optional agent ID for filtering

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
List[Tuple[Memory, float]]

List[Tuple[Memory, float]] containing tuple of search results and score: Memory:

+ org_id: str
+ agent_id: str
+ user_id: str
+ memory_id: str
+ memory: str
+ obtained_at: datetime

float: Score of the memory

Source code in memora/vector_db/qdrant.py
@override
async def search_memory(
    self,
    query: str,
    memory_search_scope: MemorySearchScope,
    org_id: str,
    user_id: Optional[str] = None,
    agent_id: Optional[str] = None,
) -> List[Tuple[schema_models.Memory, float]]:
    """
    Memory search with optional user/agent filtering.

    Args:
        query (str): Search query string
        memory_search_scope (MemorySearchScope): Memory search scope (organization or user)
        org_id (str): Organization ID for filtering
        user_id (Optional[str]): Optional user ID for filtering
        agent_id (Optional[str]): Optional agent ID for filtering

    Returns:
        List[Tuple[Memory, float]] containing tuple of search results and score:
            Memory:

                + org_id: str
                + agent_id: str
                + user_id: str
                + memory_id: str
                + memory: str
                + obtained_at: datetime

            float: Score of the memory
    """

    if not query:
        raise ValueError("A query is required")

    results = await self.search_memories(
        queries=[query],
        memory_search_scope=memory_search_scope,
        org_id=org_id,
        user_id=user_id,
        agent_id=agent_id,
    )
    return results[0] if results else []

setup async

setup(*args, **kwargs) -> None

Setup the QdrantDB by creating the collection and payload indices.

Source code in memora/vector_db/qdrant.py
@override
async def setup(self, *args, **kwargs) -> None:
    """Setup the QdrantDB by creating the collection and payload indices."""

    await self._create_collection_if_not_exists()
    await self._create_payload_indices()
    self.logger.info("QdrantDB setup completed")