Skip to content

Neo4j GraphDB Implementation

This section details the Neo4j implementation of the BaseGraphDB interface. The implementation is structured into several subclasses, each handling a specific group of methods from the BaseGraphDB interface. These subclasses are then inherited by the Neo4jGraphInterface, which serves as a unified interface for interacting with the Neo4j implementation.

Graph Interface

memora.graph_db.neo4j.Neo4jGraphInterface

Neo4jGraphInterface(
    uri: str,
    username: str,
    password: str,
    database: str,
    associated_vector_db: Optional[BaseVectorDB] = None,
    enable_logging: bool = False,
)

Bases: Neo4jOrganization, Neo4jAgent, Neo4jUser, Neo4jInteraction, Neo4jMemory

PARAMETER DESCRIPTION
uri

The URI of the Neo4j database.

TYPE: str

username

The username for authentication.

TYPE: str

password

The password for authentication.

TYPE: str

database

The name of the Neo4j database.

TYPE: str

associated_vector_db

The vector database to be associated with the graph for data consistency (e.g adding / deleting memories across both.)

TYPE: Optional[BaseVectorDB] DEFAULT: None

enable_logging

Whether to enable console logging

TYPE: bool DEFAULT: False

Example
from memora.graph_db.neo4j import Neo4jGraphInterface
from qdrant_client import AsyncQdrantClient
from memora.vector_db.qdrant import QdrantDB

neo4j_interface = Neo4jGraphInterface(
    uri="Neo4jURI",
    username="Neo4jUsername",
    password="Neo4jPassword",
    database="Neo4jDatabaseName",
    # Optional Association
    associated_vector_db=QdrantDB(async_client=AsyncQdrantClient(url="QDRANT_URL", api_key="QDRANT_API_KEY"))
)
Source code in memora/graph_db/neo4j/interface.py
def __init__(
    self,
    uri: str,
    username: str,
    password: str,
    database: str,
    associated_vector_db: Optional[BaseVectorDB] = None,
    enable_logging: bool = False,
):
    """
    A unified interface for interacting with the Neo4j graph database.

    Args:
        uri (str): The URI of the Neo4j database.
        username (str): The username for authentication.
        password (str): The password for authentication.
        database (str): The name of the Neo4j database.
        associated_vector_db (Optional[BaseVectorDB]): The vector database to be associated with the graph for data consistency (e.g adding / deleting memories across both.)
        enable_logging (bool): Whether to enable console logging

    Example:
        ```python
        from memora.graph_db.neo4j import Neo4jGraphInterface
        from qdrant_client import AsyncQdrantClient
        from memora.vector_db.qdrant import QdrantDB

        neo4j_interface = Neo4jGraphInterface(
            uri="Neo4jURI",
            username="Neo4jUsername",
            password="Neo4jPassword",
            database="Neo4jDatabaseName",
            # Optional Association
            associated_vector_db=QdrantDB(async_client=AsyncQdrantClient(url="QDRANT_URL", api_key="QDRANT_API_KEY"))
        )
        ```
    """

    self.driver = AsyncGraphDatabase.driver(uri=uri, auth=(username, password))
    self.database = database
    self.associated_vector_db = associated_vector_db

    # Configure logging
    self.logger = logging.getLogger(__name__)
    if enable_logging:
        logging.basicConfig(level=logging.INFO)

Attributes

associated_vector_db instance-attribute

associated_vector_db = associated_vector_db

database instance-attribute

database = database

driver instance-attribute

driver = driver(uri=uri, auth=(username, password))

logger instance-attribute

logger = getLogger(__name__)

Functions

close async

close()
Source code in memora/graph_db/neo4j/interface.py
@override
async def close(self):
    self.logger.info("Closing Neo4j driver")
    await self.driver.close()

get_associated_vector_db

get_associated_vector_db() -> Optional[BaseVectorDB]

The vector database associated with the graph database, these is used inside the graph transactional blocks to ensure data consistency when handling memories across both stores (e.g., saving memories to the vector store and creating corresponding nodes in the graph db).

Source code in memora/graph_db/neo4j/interface.py
@override
def get_associated_vector_db(self) -> Optional[BaseVectorDB]:
    """
    The vector database associated with the graph database, these is used inside the graph transactional blocks
    to ensure data consistency when handling memories across both stores (e.g., saving memories to the vector
    store and creating corresponding nodes in the graph db).
    """
    return self.associated_vector_db

migrate_to_schema_for_memora_v0_3_x async

migrate_to_schema_for_memora_v0_3_x(
    *args, **kwargs
) -> None

Migrate the Neo4j graph database schema to the version that works with Memora v0.3.x

This migration involves establishing :DATE_OBTAINED relationships between Memory nodes and Date nodes based on their org_id, user_id, obtained_at attributes and dropping the index :Interaction (updated_at) -> interaction_updated_timestamp (because Neo4j does not utilize it for index-backed sorting)

Source code in memora/graph_db/neo4j/interface.py
async def migrate_to_schema_for_memora_v0_3_x(self, *args, **kwargs) -> None:
    """
    Migrate the Neo4j graph database schema to the version that works with Memora v0.3.x

    This migration involves establishing :DATE_OBTAINED relationships between `Memory` nodes and `Date` nodes based on
    their `org_id, user_id, obtained_at` attributes and dropping the index `:Interaction (updated_at) -> interaction_updated_timestamp` (because
    Neo4j does not utilize it for index-backed sorting)
    """

    async def link_memories_to_dates(tx):
        self.logger.info(
            "Linking Memory nodes to Date nodes based on org_id, user_id, obtained_at"
        )
        await tx.run(
            """
            MATCH (memory:Memory)
            MERGE (date:Date {org_id: memory.org_id, user_id: memory.user_id, date: date(memory.obtained_at)})
            MERGE (memory)-[:DATE_OBTAINED]->(date)
            """
        )
        self.logger.info("Memory nodes successfully linked to their Date nodes")

    async def drop_index(tx):
        self.logger.info(
            "Dropping interaction_updated_timestamp_index index if it exists"
        )
        await tx.run(
            """
            DROP INDEX interaction_updated_timestamp_index IF EXISTS
            """
        )
        self.logger.info("Index dropped (if it existed)")

    self.logger.info(
        "Starting migration to graph schema that works with Memora v0.3.x"
    )
    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.WRITE_ACCESS
    ) as session:
        await session.execute_write(link_memories_to_dates)
        await session.execute_write(drop_index)
    self.logger.info("Migration of graph schema completed")

setup async

setup(*args, **kwargs) -> None

Sets up Neo4j database constraints and indices for the graph schema.

Source code in memora/graph_db/neo4j/interface.py
@override
async def setup(self, *args, **kwargs) -> None:
    """Sets up Neo4j database constraints and indices for the graph schema."""

    async def create_constraints_and_indexes(tx):
        self.logger.info("Creating constraints and indexes")
        # Organization node key
        await tx.run(
            """
            CREATE CONSTRAINT unique_org_id IF NOT EXISTS 
            FOR (o:Org) REQUIRE o.org_id IS NODE KEY
        """
        )

        # User node key
        await tx.run(
            """
            CREATE CONSTRAINT unique_org_user IF NOT EXISTS
            FOR (u:User) REQUIRE (u.org_id, u.user_id) IS NODE KEY
        """
        )

        # Agent node key
        await tx.run(
            """
            CREATE CONSTRAINT unique_org_agent IF NOT EXISTS 
            FOR (a:Agent) REQUIRE (a.org_id, a.agent_id) IS NODE KEY
        """
        )

        # Memory node key
        await tx.run(
            """
            CREATE CONSTRAINT unique_user_memory IF NOT EXISTS
            FOR (m:Memory) REQUIRE (m.org_id, m.user_id, m.memory_id) IS NODE KEY
        """
        )

        # Interaction node key
        await tx.run(
            """
            CREATE CONSTRAINT unique_user_interaction IF NOT EXISTS
            FOR (i:Interaction) REQUIRE (i.org_id, i.user_id, i.interaction_id) IS NODE KEY
        """
        )

        # Date node key
        await tx.run(
            """
            CREATE CONSTRAINT unique_user_date IF NOT EXISTS
            FOR (d:Date) REQUIRE (d.org_id, d.user_id, d.date) IS NODE KEY
        """
        )

    self.logger.info("Setting up Neo4j database constraints and indices")
    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.WRITE_ACCESS
    ) as session:
        await session.execute_write(create_constraints_and_indexes)
    self.logger.info("Setup complete")

Component Classes

memora.graph_db.neo4j.Neo4jOrganization

Bases: BaseGraphDB

Functions

create_organization async
create_organization(org_name: str) -> models.Organization

Creates a new organization in the Neo4j graph database.

PARAMETER DESCRIPTION
org_name

The name of the organization to create.

TYPE: str

RETURNS DESCRIPTION
Organization

Organization object containing:

  • org_id: Short UUID string
  • org_name: Organization name
  • created_at: DateTime object of when the organization was created
Source code in memora/graph_db/neo4j/organization.py
@override
async def create_organization(self, org_name: str) -> models.Organization:
    """
    Creates a new organization in the Neo4j graph database.

    Args:
        org_name (str): The name of the organization to create.

    Returns:
        Organization object containing:

            + org_id: Short UUID string
            + org_name: Organization name
            + created_at: DateTime object of when the organization was created
    """

    if not isinstance(org_name, str) or not org_name:
        raise TypeError("`org_name` must be a string and have a value.")

    org_id = shortuuid.uuid()
    self.logger.info(f"Creating organization with ID {org_id}")

    async def create_org_tx(tx):
        result = await tx.run(
            """
            CREATE (o:Org {
                org_id: $org_id,
                org_name: $org_name,
                created_at: datetime()
            })
            RETURN o{.org_id, .org_name, .created_at} as org
        """,
            org_id=org_id,
            org_name=org_name,
        )

        record = await result.single()
        return record["org"] if record else None

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.WRITE_ACCESS
    ) as session:

        org_data = await session.execute_write(create_org_tx)

        if org_data is None:
            self.logger.info(f"Failed to create organization {org_id}")
            raise neo4j.exceptions.Neo4jError("Failed to create organization.")

        self.logger.info(f"Successfully created organization {org_id}")
        return models.Organization(
            org_id=org_data["org_id"],
            org_name=org_data["org_name"],
            created_at=(org_data["created_at"]).to_native(),
        )
delete_organization async
delete_organization(org_id: str) -> None

Deletes an organization from the Neo4j graph database.

Warning

This operation will delete all nodes and relationships from this organization including users, agents, memories, interactions etc.

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization to delete.

TYPE: str

Source code in memora/graph_db/neo4j/organization.py
@override
async def delete_organization(self, org_id: str) -> None:
    """
    Deletes an organization from the Neo4j graph database.

    Warning:
        This operation will delete all nodes and relationships from this organization
        including users, agents, memories, interactions etc.

    Args:
        org_id (str): Short UUID string identifying the organization to delete.
    """

    if not isinstance(org_id, str) or not org_id:
        raise TypeError("`org_id` must be a string and have a value.")

    self.logger.info(f"Deleting organization {org_id} and all associated data")

    async def delete_org_tx(tx):
        # Delete all nodes and relationships associated with the org
        await tx.run(
            """
            CALL apoc.periodic.iterate("
            MATCH (o:Org {org_id: $org_id})
            CALL apoc.path.subgraphNodes(o, {}) YIELD node
            RETURN node",
            "DETACH DELETE node",
            {batchSize: 1000, parallel: true, params: {org_id: $org_id}})
        """,
            org_id=org_id,
        )

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.WRITE_ACCESS
    ) as session:
        await session.execute_write(delete_org_tx)
        self.logger.info(f"Successfully deleted organization {org_id}")
get_all_organizations async
get_all_organizations() -> List[models.Organization]

Gets all organizations from the graph database.

RETURNS DESCRIPTION
List[Organization]

List[Organization] each containing:

  • org_id: Short UUID string
  • org_name: Organization name
  • created_at: DateTime object of when the organization was created
Source code in memora/graph_db/neo4j/organization.py
@override
async def get_all_organizations(self) -> List[models.Organization]:
    """
    Gets all organizations from the graph database.

    Returns:
        List[Organization] each containing:

            + org_id: Short UUID string
            + org_name: Organization name
            + created_at: DateTime object of when the organization was created
    """

    self.logger.info("Getting all organizations")

    async def get_all_org_tx(tx):
        result = await tx.run(
            """
            MATCH (o:Org)
            RETURN o{.org_id, .org_name, .created_at} as org
        """
        )
        records = await result.value("org", [])
        return records

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.READ_ACCESS
    ) as session:

        all_org_data = await session.execute_read(get_all_org_tx)

        return [
            models.Organization(
                org_id=org_data["org_id"],
                org_name=org_data["org_name"],
                created_at=(org_data["created_at"]).to_native(),
            )
            for org_data in all_org_data
        ]
get_organization async
get_organization(org_id: str) -> models.Organization

Gets a specific organization from the Neo4j graph database.

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization to retrieve.

TYPE: str

RETURNS DESCRIPTION
Organization

Organization object containing:

  • org_id: Short UUID string
  • org_name: Organization name
  • created_at: DateTime object of when the organization was created
Source code in memora/graph_db/neo4j/organization.py
@override
async def get_organization(self, org_id: str) -> models.Organization:
    """
    Gets a specific organization from the Neo4j graph database.

    Args:
        org_id (str): Short UUID string identifying the organization to retrieve.

    Returns:
        Organization object containing:

            + org_id: Short UUID string
            + org_name: Organization name
            + created_at: DateTime object of when the organization was created
    """

    if not isinstance(org_id, str) or not org_id:
        raise TypeError("`org_id` must be a string and have a value.")

    async def get_org_tx(tx):
        result = await tx.run(
            """
            MATCH (o:Org {org_id: $org_id})
            RETURN o{.org_id, .org_name, .created_at} as org
        """,
            org_id=org_id,
        )
        record = await result.single()
        return record["org"] if record else None

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.READ_ACCESS
    ) as session:
        org_data = await session.execute_read(get_org_tx)

        if org_data is None:
            self.logger.info(f"Organization {org_id} not found")
            raise neo4j.exceptions.Neo4jError(
                "Organization (`org_id`) does not exist."
            )

        return models.Organization(
            org_id=org_data["org_id"],
            org_name=org_data["org_name"],
            created_at=(org_data["created_at"]).to_native(),
        )
update_organization async
update_organization(
    org_id: str, new_org_name: str
) -> models.Organization

Updates an existing organization in the Neo4j graph database.

PARAMETER DESCRIPTION
org_id

The Short UUID of the organization to update.

TYPE: str

new_org_name

The new name for the organization.

TYPE: str

RETURNS DESCRIPTION
Organization

Organization object containing:

  • org_id: Short UUID string
  • org_name: Organization name
  • created_at: DateTime object of when the organization was created
Source code in memora/graph_db/neo4j/organization.py
@override
async def update_organization(
    self, org_id: str, new_org_name: str
) -> models.Organization:
    """
    Updates an existing organization in the Neo4j graph database.

    Args:
        org_id (str): The Short UUID of the organization to update.
        new_org_name (str): The new name for the organization.

    Returns:
        Organization object containing:

            + org_id: Short UUID string
            + org_name: Organization name
            + created_at: DateTime object of when the organization was created
    """

    if not all(
        param and isinstance(param, str) for param in (org_id, new_org_name)
    ):
        raise ValueError(
            "Both `org_id` and `new_org_name` must be a string and have a value."
        )

    self.logger.info(f"Updating organization {org_id}")

    async def update_org_tx(tx):
        result = await tx.run(
            """
            MATCH (o:Org {org_id: $org_id})
            SET o.org_name = $new_org_name
            RETURN o{.org_id, .org_name, .created_at} as org
        """,
            org_id=org_id,
            new_org_name=new_org_name,
        )

        record = await result.single()
        return record["org"] if record else None

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.WRITE_ACCESS
    ) as session:

        org_data = await session.execute_write(update_org_tx)

        if org_data is None:
            self.logger.info(f"Organization {org_id} not found")
            raise neo4j.exceptions.Neo4jError(
                "Organization (`org_id`) does not exist."
            )

        self.logger.info(f"Successfully updated organization {org_id}")
        return models.Organization(
            org_id=org_data["org_id"],
            org_name=org_data["org_name"],
            created_at=(org_data["created_at"]).to_native(),
        )

memora.graph_db.neo4j.Neo4jAgent

Bases: BaseGraphDB

Functions

create_agent async
create_agent(
    org_id: str,
    agent_label: str,
    user_id: Optional[str] = None,
) -> models.Agent

Creates a new agent in the Neo4j graph database.

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization.

TYPE: str

agent_label

Label/name for the agent.

TYPE: str

user_id

Optional Short UUID of the user. This is used when the agent is created specifically for a user, indicating that both the organization and the user will have this agent.

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
Agent

Agent containing:

  • org_id: Short UUID string
  • user_id: Optional Short UUID string
  • agent_id: Short UUID string
  • agent_label: Agent label/name
  • created_at: DateTime object of when the agent was created
Source code in memora/graph_db/neo4j/agent.py
@override
async def create_agent(
    self, org_id: str, agent_label: str, user_id: Optional[str] = None
) -> models.Agent:
    """
    Creates a new agent in the Neo4j graph database.

    Args:
        org_id (str): Short UUID string identifying the organization.
        agent_label (str): Label/name for the agent.
        user_id (Optional[str]): Optional Short UUID of the user. This is used when the agent is created
            specifically for a user, indicating that both the organization and the
            user will have this agent.

    Returns:
        Agent containing:

            + org_id: Short UUID string
            + user_id: Optional Short UUID string
            + agent_id: Short UUID string
            + agent_label: Agent label/name
            + created_at: DateTime object of when the agent was created
    """

    if not all(param and isinstance(param, str) for param in (org_id, agent_label)):
        raise ValueError(
            "Both `org_id` and `agent_label` must be a string and have a value."
        )

    if user_id:
        if not isinstance(user_id, str):
            raise ValueError("`user_id` must be a string.")

    agent_id = shortuuid.uuid()
    self.logger.info(f"Creating new agent with ID {agent_id}")

    async def create_agent_tx(tx):
        if user_id:
            result = await tx.run(
                """
                MATCH (o:Org {org_id: $org_id}), (u:User {org_id: $org_id, user_id: $user_id})
                CREATE (a:Agent {
                    org_id: $org_id,
                    user_id: $user_id,
                    agent_id: $agent_id,
                    agent_label: $agent_label,
                    created_at: datetime()
                })
                CREATE (o)-[:HAS_AGENT]->(a)
                CREATE (u)-[:HAS_AGENT]->(a)
                RETURN a{.org_id, .user_id, .agent_id, .agent_label, .created_at} as agent
            """,
                org_id=org_id,
                user_id=user_id,
                agent_id=agent_id,
                agent_label=agent_label,
            )
        else:
            result = await tx.run(
                """
                MATCH (o:Org {org_id: $org_id})
                CREATE (a:Agent {
                    org_id: $org_id,
                    agent_id: $agent_id,
                    agent_label: $agent_label,
                    created_at: datetime()
                })
                CREATE (o)-[:HAS_AGENT]->(a)
                RETURN a{.org_id, .agent_id, .agent_label, .created_at} as agent
            """,
                org_id=org_id,
                agent_id=agent_id,
                agent_label=agent_label,
            )

        record = await result.single()
        return record["agent"] if record else None

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.WRITE_ACCESS
    ) as session:
        agent_data = await session.execute_write(create_agent_tx)

        if agent_data is None:
            self.logger.info(f"Failed to create agent {agent_id}")
            raise neo4j.exceptions.Neo4jError("Failed to create agent.")

        self.logger.info(f"Successfully created agent {agent_id}")
        return models.Agent(
            org_id=agent_data["org_id"],
            agent_id=agent_data["agent_id"],
            user_id=agent_data.get("user_id"),
            agent_label=agent_data["agent_label"],
            created_at=(agent_data["created_at"]).to_native(),
        )
delete_agent async
delete_agent(org_id: str, agent_id: str) -> None

Deletes an agent from the Neo4j graph database.

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization.

TYPE: str

agent_id

Short UUID string identifying the agent to delete.

TYPE: str

Source code in memora/graph_db/neo4j/agent.py
@override
async def delete_agent(self, org_id: str, agent_id: str) -> None:
    """
    Deletes an agent from the Neo4j graph database.

    Args:
        org_id (str): Short UUID string identifying the organization.
        agent_id (str): Short UUID string identifying the agent to delete.
    """

    if not all(param and isinstance(param, str) for param in (org_id, agent_id)):
        raise ValueError(
            "`org_id` and `agent_id` must be strings and have a value."
        )

    self.logger.info(f"Deleting agent {agent_id}")

    async def delete_agent_tx(tx):
        # Using node key (org_id, agent_id) for faster lookup
        await tx.run(
            """
            MATCH (a:Agent {org_id: $org_id, agent_id: $agent_id})
            DETACH DELETE a
        """,
            org_id=org_id,
            agent_id=agent_id,
        )

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.WRITE_ACCESS
    ) as session:
        await session.execute_write(delete_agent_tx)
        self.logger.info(f"Successfully deleted agent {agent_id}")
get_agent async
get_agent(org_id: str, agent_id: str) -> models.Agent

Gets a specific agent belonging to the specified organization from the Neo4j graph database.

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization.

TYPE: str

agent_id

Short UUID string identifying the agent to retrieve.

TYPE: str

RETURNS DESCRIPTION
Agent

Agent containing:

  • org_id: Short UUID string
  • user_id: Optional Short UUID string
  • agent_id: Short UUID string
  • agent_label: Agent label/name
  • created_at: DateTime object of when the agent was created
Source code in memora/graph_db/neo4j/agent.py
@override
async def get_agent(self, org_id: str, agent_id: str) -> models.Agent:
    """
    Gets a specific agent belonging to the specified organization from the Neo4j graph database.

    Args:
        org_id (str): Short UUID string identifying the organization.
        agent_id (str): Short UUID string identifying the agent to retrieve.

    Returns:
        Agent containing:

            + org_id: Short UUID string
            + user_id: Optional Short UUID string
            + agent_id: Short UUID string
            + agent_label: Agent label/name
            + created_at: DateTime object of when the agent was created
    """

    async def get_agent_tx(tx):
        result = await tx.run(
            """
            MATCH (a:Agent {org_id: $org_id, agent_id: $agent_id})
            RETURN a{.org_id, .user_id, .agent_id, .agent_label, .created_at} as agent
        """,
            org_id=org_id,
            agent_id=agent_id,
        )
        record = await result.single()
        return record["agent"] if record else None

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.READ_ACCESS
    ) as session:
        agent_data = await session.execute_read(get_agent_tx)

        if agent_data is None:
            self.logger.info(
                f"Failed to get agent {agent_id}: Agent does not exist"
            )
            raise neo4j.exceptions.Neo4jError(
                "Agent (`org_id`, `agent_id`) does not exist."
            )

        return models.Agent(
            org_id=agent_data["org_id"],
            agent_id=agent_data["agent_id"],
            user_id=agent_data.get("user_id"),
            agent_label=agent_data["agent_label"],
            created_at=(agent_data["created_at"]).to_native(),
        )
get_all_org_agents async
get_all_org_agents(org_id: str) -> List[models.Agent]

Gets all agents belonging to the specified organization from the Neo4j graph database.

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization.

TYPE: str

RETURNS DESCRIPTION
List[Agent]

A List[Agent], each containing:

  • org_id: Short UUID string
  • user_id: Optional Short UUID string
  • agent_id: Short UUID string
  • agent_label: Agent label/name
  • created_at: DateTime object of when the agent was created
Source code in memora/graph_db/neo4j/agent.py
@override
async def get_all_org_agents(self, org_id: str) -> List[models.Agent]:
    """
    Gets all agents belonging to the specified organization from the Neo4j graph database.

    Args:
        org_id (str): Short UUID string identifying the organization.

    Returns:
        A List[Agent], each containing:

            + org_id: Short UUID string
            + user_id: Optional Short UUID string
            + agent_id: Short UUID string
            + agent_label: Agent label/name
            + created_at: DateTime object of when the agent was created
    """

    if not isinstance(org_id, str) or not org_id:
        raise ValueError("`org_id` must be a string and have a value.")

    self.logger.info(f"Getting all agents for organization {org_id}")

    async def get_org_agents_tx(tx):
        result = await tx.run(
            """
            MATCH (o:Org {org_id: $org_id})-[:HAS_AGENT]->(a:Agent)
            RETURN a{.org_id, .user_id, .agent_id, .agent_label, .created_at} as agent
        """,
            org_id=org_id,
        )
        records = await result.value("agent", [])
        return records

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.READ_ACCESS
    ) as session:
        all_agents_data = await session.execute_read(get_org_agents_tx)

        return [
            models.Agent(
                org_id=agent_data["org_id"],
                agent_id=agent_data["agent_id"],
                user_id=agent_data.get("user_id"),
                agent_label=agent_data["agent_label"],
                created_at=(agent_data["created_at"]).to_native(),
            )
            for agent_data in all_agents_data
        ]
get_all_user_agents async
get_all_user_agents(
    org_id: str, user_id: str
) -> List[models.Agent]

Gets all agents for a user within an organization from the Neo4j graph database.

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization.

TYPE: str

user_id

Short UUID string identifying the user.

TYPE: str

RETURNS DESCRIPTION
List[Agent]

A List[Agent], each containing:

  • org_id: Short UUID string
  • user_id: Optional Short UUID string
  • agent_id: Short UUID string
  • agent_label: Agent label/name
  • created_at: DateTime object of when the agent was created
Source code in memora/graph_db/neo4j/agent.py
@override
async def get_all_user_agents(
    self, org_id: str, user_id: str
) -> List[models.Agent]:
    """
    Gets all agents for a user within an organization from the Neo4j graph database.

    Args:
        org_id (str): Short UUID string identifying the organization.
        user_id (str): Short UUID string identifying the user.

    Returns:
        A List[Agent], each containing:

            + org_id: Short UUID string
            + user_id: Optional Short UUID string
            + agent_id: Short UUID string
            + agent_label: Agent label/name
            + created_at: DateTime object of when the agent was created
    """

    if not all(param and isinstance(param, str) for param in (org_id, user_id)):
        raise ValueError("`org_id` and `user_id` must be strings and have a value.")

    self.logger.info(
        f"Getting all agents for user {user_id} in organization {org_id}"
    )

    async def get_user_agents_tx(tx):
        result = await tx.run(
            """
            MATCH (u:User {org_id: $org_id, user_id: $user_id})-[:HAS_AGENT]->(a:Agent)
            RETURN a{.org_id, .user_id, .agent_id, .agent_label, .created_at} as agent
        """,
            org_id=org_id,
            user_id=user_id,
        )
        records = await result.value("agent", [])
        return records

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.READ_ACCESS
    ) as session:
        all_agents_data = await session.execute_read(get_user_agents_tx)

        return [
            models.Agent(
                org_id=agent_data["org_id"],
                agent_id=agent_data["agent_id"],
                user_id=agent_data.get("user_id"),
                agent_label=agent_data["agent_label"],
                created_at=(agent_data["created_at"]).to_native(),
            )
            for agent_data in all_agents_data
        ]
update_agent async
update_agent(
    org_id: str, agent_id: str, new_agent_label: str
) -> models.Agent

Updates an existing agent in the Neo4j graph database.

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization.

TYPE: str

agent_id

Short UUID string identifying the agent to update.

TYPE: str

new_agent_label

New label/name for the agent.

TYPE: str

RETURNS DESCRIPTION
Agent

Agent containing:

  • org_id: Short UUID string
  • user_id: Optional Short UUID string
  • agent_id: Short UUID string
  • agent_label: Agent label/name
  • created_at: DateTime object of when the agent was created
Source code in memora/graph_db/neo4j/agent.py
@override
async def update_agent(
    self, org_id: str, agent_id: str, new_agent_label: str
) -> models.Agent:
    """
    Updates an existing agent in the Neo4j graph database.

    Args:
        org_id (str): Short UUID string identifying the organization.
        agent_id (str): Short UUID string identifying the agent to update.
        new_agent_label (str): New label/name for the agent.

    Returns:
        Agent containing:

            + org_id: Short UUID string
            + user_id: Optional Short UUID string
            + agent_id: Short UUID string
            + agent_label: Agent label/name
            + created_at: DateTime object of when the agent was created
    """

    if not all(
        param and isinstance(param, str)
        for param in (org_id, agent_id, new_agent_label)
    ):
        raise ValueError(
            "`org_id`, `agent_id` and `new_agent_name` must be strings and have a value."
        )

    self.logger.info(f"Updating agent {agent_id}")

    async def update_agent_tx(tx):
        result = await tx.run(
            """
            MATCH (a:Agent {org_id: $org_id, agent_id: $agent_id})
            SET a.agent_label = $new_agent_label
            RETURN a{.org_id, .user_id, .agent_id, .agent_label, .created_at} as agent
        """,
            org_id=org_id,
            agent_id=agent_id,
            new_agent_label=new_agent_label,
        )

        record = await result.single()
        return record["agent"] if record else None

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.WRITE_ACCESS
    ) as session:
        agent_data = await session.execute_write(update_agent_tx)

        if agent_data is None:
            self.logger.info(
                f"Failed to update agent {agent_id}: Agent does not exist"
            )
            raise neo4j.exceptions.Neo4jError(
                "Agent (`org_id`, `agent_id`) does not exist."
            )

        self.logger.info(f"Successfully updated agent {agent_id}")
        return models.Agent(
            org_id=agent_data["org_id"],
            agent_id=agent_data["agent_id"],
            user_id=agent_data.get("user_id"),
            agent_label=agent_data["agent_label"],
            created_at=(agent_data["created_at"]).to_native(),
        )

memora.graph_db.neo4j.Neo4jUser

Bases: BaseGraphDB

Functions

create_user async
create_user(org_id: str, user_name: str) -> models.User

Creates a new user in the Neo4j graph database.

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization.

TYPE: str

user_name

Name for the user.

TYPE: str

RETURNS DESCRIPTION
User

User containing:

  • org_id: Short UUID string
  • user_id: Short UUID string
  • user_name: User's name
  • created_at: DateTime object of when the user was created
Source code in memora/graph_db/neo4j/user.py
@override
async def create_user(self, org_id: str, user_name: str) -> models.User:
    """
    Creates a new user in the Neo4j graph database.

    Args:
        org_id (str): Short UUID string identifying the organization.
        user_name (str): Name for the user.

    Returns:
        User containing:

            + org_id: Short UUID string
            + user_id: Short UUID string
            + user_name: User's name
            + created_at: DateTime object of when the user was created
    """

    if not all(param and isinstance(param, str) for param in (org_id, user_name)):
        raise ValueError(
            "Both `org_id` and `user_name` must be a string and have a value."
        )

    user_id = shortuuid.uuid()
    self.logger.info(f"Creating new user with ID {user_id}")

    async def create_user_tx(tx):
        result = await tx.run(
            """
            MATCH (o:Org {org_id: $org_id})
            CREATE (u:User {
                org_id: $org_id,
                user_id: $user_id,
                user_name: $user_name,
                created_at: datetime()
            })
            CREATE (u)-[:BELONGS_TO]->(o)
            CREATE (ic:InteractionCollection {
                org_id: $org_id,
                user_id: $user_id
            })
            CREATE (mc:MemoryCollection {
                org_id: $org_id,
                user_id: $user_id
            })
            CREATE (u)-[:INTERACTIONS_IN]->(ic)
            CREATE (u)-[:HAS_MEMORIES]->(mc)
            RETURN u{.org_id, .user_id, .user_name, .created_at} as user
        """,
            org_id=org_id,
            user_id=user_id,
            user_name=user_name,
        )
        record = await result.single()
        return record["user"] if record else None

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.WRITE_ACCESS
    ) as session:
        user_data = await session.execute_write(create_user_tx)

        if user_data is None:
            self.logger.info(f"Failed to create user {user_id}")
            raise neo4j.exceptions.Neo4jError("Failed to create user.")

        return models.User(
            org_id=user_data["org_id"],
            user_id=user_data["user_id"],
            user_name=user_data["user_name"],
            created_at=(user_data["created_at"]).to_native(),
        )
delete_user async
delete_user(org_id: str, user_id: str) -> None

Deletes a user from the Neo4j graph database.

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization.

TYPE: str

user_id

Short UUID string identifying the user to delete.

TYPE: str

Source code in memora/graph_db/neo4j/user.py
@override
async def delete_user(self, org_id: str, user_id: str) -> None:
    """
    Deletes a user from the Neo4j graph database.

    Args:
        org_id (str): Short UUID string identifying the organization.
        user_id (str): Short UUID string identifying the user to delete.
    """

    if not all(param and isinstance(param, str) for param in (org_id, user_id)):
        raise ValueError("`org_id` and `user_id` must be strings and have a value.")

    self.logger.info(f"Deleting user {user_id}")

    async def delete_user_tx(tx):
        await tx.run(
            """
            MATCH (u:User {org_id: $org_id, user_id: $user_id})
            OPTIONAL MATCH (u)-[:INTERACTIONS_IN]->(interactioncollection)
            OPTIONAL MATCH (interactioncollection)-[:HAD_INTERACTION]->(interaction)
            OPTIONAL MATCH (interaction)-[:FIRST_MESSAGE|IS_NEXT*]->(message)
            OPTIONAL MATCH (u)-[:HAS_MEMORIES]->(memcollection)
            OPTIONAL MATCH (memcollection)-[:INCLUDES]->(memory)
            OPTIONAL MATCH (interaction)-[:HAS_OCCURRENCE_ON]->(date)
            DETACH DELETE u, interactioncollection, interaction, message, memcollection, memory, date
        """,
            org_id=org_id,
            user_id=user_id,
        )

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.WRITE_ACCESS
    ) as session:
        await session.execute_write(delete_user_tx)
get_all_org_users async
get_all_org_users(org_id: str) -> List[models.User]

Gets all users belonging to the specified organization from the graph database.

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization.

TYPE: str

RETURNS DESCRIPTION
List[User]

List[User], each containing:

  • org_id: Short UUID string
  • user_id: Short UUID string
  • user_name: User's name
  • created_at: DateTime object of when the user was created.
Source code in memora/graph_db/neo4j/user.py
@override
async def get_all_org_users(self, org_id: str) -> List[models.User]:
    """
    Gets all users belonging to the specified organization from the graph database.

    Args:
        org_id (str): Short UUID string identifying the organization.

    Returns:
        List[User], each containing:

            + org_id: Short UUID string
            + user_id: Short UUID string
            + user_name: User's name
            + created_at: DateTime object of when the user was created.
    """

    if not isinstance(org_id, str) or not org_id:
        raise ValueError("`org_id` must be a string and have a value.")

    self.logger.info(f"Getting all users for organization {org_id}")

    async def get_users_tx(tx):
        result = await tx.run(
            """
            MATCH (o:Org {org_id: $org_id})<-[:BELONGS_TO]-(u:User)
            RETURN u{.org_id, .user_id, .user_name, .created_at} as user
        """,
            org_id=org_id,
        )
        records = await result.value("user", [])
        return records

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.READ_ACCESS
    ) as session:

        all_users_data = await session.execute_read(get_users_tx)

        return [
            models.User(
                org_id=user_data["org_id"],
                user_id=user_data["user_id"],
                user_name=user_data["user_name"],
                created_at=(user_data["created_at"]).to_native(),
            )
            for user_data in all_users_data
        ]
get_user async
get_user(org_id: str, user_id: str) -> models.User

Gets a specific user belonging to the specified organization from the Neo4j graph database.

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization.

TYPE: str

user_id

Short UUID string identifying the user to retrieve.

TYPE: str

RETURNS DESCRIPTION
User

User containing:

  • org_id: Short UUID string
  • user_id: Short UUID string
  • user_name: User's name
  • created_at: DateTime object of when the user was created.
Source code in memora/graph_db/neo4j/user.py
@override
async def get_user(self, org_id: str, user_id: str) -> models.User:
    """
    Gets a specific user belonging to the specified organization from the Neo4j graph database.

    Args:
        org_id (str): Short UUID string identifying the organization.
        user_id (str): Short UUID string identifying the user to retrieve.

    Returns:
        User containing:

            + org_id: Short UUID string
            + user_id: Short UUID string
            + user_name: User's name
            + created_at: DateTime object of when the user was created.
    """

    if not all(param and isinstance(param, str) for param in (org_id, user_id)):
        raise ValueError("`org_id` and `user_id` must be strings and have a value.")

    async def get_user_tx(tx):
        result = await tx.run(
            """
            MATCH (u:User {org_id: $org_id, user_id: $user_id})
            RETURN u{.org_id, .user_id, .user_name, .created_at} as user
        """,
            org_id=org_id,
            user_id=user_id,
        )
        record = await result.single()
        return record["user"] if record else None

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.READ_ACCESS
    ) as session:
        user_data = await session.execute_read(get_user_tx)

        if user_data is None:
            self.logger.info(f"Failed to get user {user_id}: User does not exist")
            raise neo4j.exceptions.Neo4jError(
                "User (`org_id`, `user_id`) does not exist."
            )

        return models.User(
            org_id=user_data["org_id"],
            user_id=user_data["user_id"],
            user_name=user_data["user_name"],
            created_at=(user_data["created_at"]).to_native(),
        )
update_user async
update_user(
    org_id: str, user_id: str, new_user_name: str
) -> models.User

Updates an existing user in the Neo4j graph database.

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization.

TYPE: str

user_id

Short UUID string identifying the user to update.

TYPE: str

new_user_name

The new name for the user.

TYPE: str

RETURNS DESCRIPTION
User

User containing:

  • org_id: Short UUID string
  • user_id: Short UUID string
  • user_name: User's name
  • created_at: DateTime object of when the user was created.
Source code in memora/graph_db/neo4j/user.py
@override
async def update_user(
    self, org_id: str, user_id: str, new_user_name: str
) -> models.User:
    """
    Updates an existing user in the Neo4j graph database.

    Args:
        org_id (str): Short UUID string identifying the organization.
        user_id (str): Short UUID string identifying the user to update.
        new_user_name (str): The new name for the user.

    Returns:
        User containing:

            + org_id: Short UUID string
            + user_id: Short UUID string
            + user_name: User's name
            + created_at: DateTime object of when the user was created.
    """

    if not all(
        param and isinstance(param, str)
        for param in (org_id, user_id, new_user_name)
    ):
        raise ValueError(
            "`org_id`, `user_id` and `new_user_name` must be strings and have a value."
        )

    self.logger.info(f"Updating user {user_id}")

    async def update_user_tx(tx):
        result = await tx.run(
            """
            MATCH (u:User {org_id: $org_id, user_id: $user_id})
            SET u.user_name = $new_user_name
            RETURN u{.org_id, .user_id, .user_name, .created_at} as user
        """,
            org_id=org_id,
            user_id=user_id,
            new_user_name=new_user_name,
        )

        record = await result.single()
        return record["user"] if record else None

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.WRITE_ACCESS
    ) as session:
        user_data = await session.execute_write(update_user_tx)

        if user_data is None:
            self.logger.info(
                f"Failed to update user {user_id}: User does not exist"
            )
            raise neo4j.exceptions.Neo4jError(
                "User (`org_id`, `user_id`) does not exist."
            )

        return models.User(
            org_id=user_data["org_id"],
            user_id=user_data["user_id"],
            user_name=user_data["user_name"],
            created_at=(user_data["created_at"]).to_native(),
        )

memora.graph_db.neo4j.Neo4jInteraction

Bases: BaseGraphDB

Functions

delete_all_user_interactions_and_their_memories async
delete_all_user_interactions_and_their_memories(
    org_id: str, user_id: str
) -> None

Deletes all interactions and their associated memories for a specific user in an organization.

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization

TYPE: str

user_id

Short UUID string identifying the user whose interactions should be deleted

TYPE: str

Note

If the graph database is associated with a vector database, the memories are also deleted there for data consistency.

Source code in memora/graph_db/neo4j/interaction.py
@override
async def delete_all_user_interactions_and_their_memories(
    self,
    org_id: str,
    user_id: str,
) -> None:
    """
    Deletes all interactions and their associated memories for a specific user in an organization.

    Args:
        org_id (str): Short UUID string identifying the organization
        user_id (str): Short UUID string identifying the user whose interactions should be deleted

    Note:
        If the graph database is associated with a vector database, the memories are also deleted there for data consistency.
    """

    if not all(param and isinstance(param, str) for param in (org_id, user_id)):
        raise ValueError("`org_id` and `user_id` must be strings and have a value.")

    self.logger.info(f"Deleting all interactions and memories for user {user_id}")

    async def delete_all_tx(tx):
        await tx.run(
            """
            MATCH (u:User {org_id: $org_id, user_id: $user_id})-[:INTERACTIONS_IN]->(ic)-[:HAD_INTERACTION]->(interaction:Interaction)

            OPTIONAL MATCH (interaction)<-[:INTERACTION_SOURCE]-(memory:Memory)
            OPTIONAL MATCH (interaction)-[:HAS_OCCURRENCE_ON]->(date:Date)
            OPTIONAL MATCH (interaction)-[:FIRST_MESSAGE|IS_NEXT*]->(messages:MessageBlock)

            DETACH DELETE interaction, memory, date, messages
        """,
            org_id=org_id,
            user_id=user_id,
        )

        if (
            self.associated_vector_db
        ):  # If the graph database is associated with a vector database
            self.logger.info(
                f"Deleting all memories from vector database for user {user_id}"
            )
            await self.associated_vector_db.delete_all_user_memories(
                org_id, user_id
            )

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.WRITE_ACCESS
    ) as session:
        await session.execute_write(delete_all_tx)
        self.logger.info(
            f"Successfully deleted all interactions and memories for user {user_id}"
        )
delete_user_interaction_and_its_memories async
delete_user_interaction_and_its_memories(
    org_id: str, user_id: str, interaction_id: str
) -> None

Deletes an interaction record and its associated memories.

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization.

TYPE: str

user_id

Short UUID string identifying the user.

TYPE: str

interaction_id

Short UUID string identifying the interaction to delete.

TYPE: str

Note

If the graph database is associated with a vector database, the memories are also deleted there for data consistency.

Source code in memora/graph_db/neo4j/interaction.py
@override
async def delete_user_interaction_and_its_memories(
    self,
    org_id: str,
    user_id: str,
    interaction_id: str,
) -> None:
    """
    Deletes an interaction record and its associated memories.

    Args:
        org_id (str): Short UUID string identifying the organization.
        user_id (str): Short UUID string identifying the user.
        interaction_id (str): Short UUID string identifying the interaction to delete.

    Note:
        If the graph database is associated with a vector database, the memories are also deleted there for data consistency.
    """

    if not all(
        param and isinstance(param, str)
        for param in (org_id, user_id, interaction_id)
    ):
        raise ValueError(
            "`org_id`, `user_id` and `interaction_id` must be strings and have a value."
        )

    self.logger.info(
        f"Deleting interaction {interaction_id} and its memories for user {user_id}"
    )

    interaction_memories = (
        await self.get_interaction(
            org_id, user_id, interaction_id, with_messages=False, with_memories=True
        )
    ).memories

    interaction_memories_ids = [memory.memory_id for memory in interaction_memories]

    async def delete_tx(tx):
        # Delete the interaction, its messages and memories.
        await tx.run(
            """
            MATCH (interaction: Interaction {
                org_id: $org_id, 
                user_id: $user_id, 
                interaction_id: $interaction_id
                })-[r:FIRST_MESSAGE|IS_NEXT*]->(message:MessageBlock)

            OPTIONAL MATCH (interaction)<-[:INTERACTION_SOURCE]-(memory)
            OPTIONAL MATCH (interaction)-[:HAS_OCCURRENCE_ON]->(date:Date) WHERE NOT (date)<-[:HAS_OCCURRENCE_ON]-()

            DETACH DELETE interaction, message, memory, date
        """,
            org_id=org_id,
            user_id=user_id,
            interaction_id=interaction_id,
        )

        if (
            self.associated_vector_db and interaction_memories_ids
        ):  # If the graph database is associated with a vector database
            # Delete memories from vector DB.
            await self.associated_vector_db.delete_memories(
                interaction_memories_ids
            )

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.WRITE_ACCESS
    ) as session:
        await session.execute_write(delete_tx)
get_all_user_interactions async
get_all_user_interactions(
    org_id: str,
    user_id: str,
    with_their_messages: bool = True,
    with_their_memories: bool = True,
    skip: int = 0,
    limit: int = 100,
) -> List[models.Interaction]

Retrieves all interactions for a specific user in an organization.

Note

Interactions are sorted in descending order by their updated at datetime. (So most recent interactions are first).

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization.

TYPE: str

user_id

Short UUID string identifying the user.

TYPE: str

with_their_messages

Whether to also retrieve messages of an interaction.

TYPE: bool DEFAULT: True

with_their_memories

Whether to also retrieve memories gotten across all occurrences of an interaction.

TYPE: bool DEFAULT: True

skip

Number of interactions to skip. (Useful for pagination)

TYPE: int DEFAULT: 0

limit

Maximum number of interactions to retrieve. (Useful for pagination)

TYPE: int DEFAULT: 100

RETURNS DESCRIPTION
List[Interaction]

List[Interaction], each containing an Interaction with:

  • org_id: Short UUID string identifying the organization.
  • user_id: Short UUID string identifying the user.
  • agent_id: Short UUID string identifying the agent.
  • interaction_id: Short UUID string identifying the interaction.
  • created_at: DateTime object of when the interaction was created.
  • updated_at: DateTime object of when the interaction was last updated.
  • messages (if with_their_messages = True): List of messages in the interaction.
  • memories (if with_their_memories = True): List of memories gotten from all occurrences of this interaction.
Note

A memory won't have a message source, if its interaction was updated with a conflicting conversation thread that lead to truncation of the former thread. See graph.update_interaction_and_memories

Source code in memora/graph_db/neo4j/interaction.py
@override
async def get_all_user_interactions(
    self,
    org_id: str,
    user_id: str,
    with_their_messages: bool = True,
    with_their_memories: bool = True,
    skip: int = 0,
    limit: int = 100,
) -> List[models.Interaction]:
    """
    Retrieves all interactions for a specific user in an organization.

    Note:
        Interactions are sorted in descending order by their updated at datetime. (So most recent interactions are first).

    Args:
        org_id (str): Short UUID string identifying the organization.
        user_id (str): Short UUID string identifying the user.
        with_their_messages (bool): Whether to also retrieve messages of an interaction.
        with_their_memories (bool): Whether to also retrieve memories gotten across all occurrences of an interaction.
        skip (int): Number of interactions to skip. (Useful for pagination)
        limit (int): Maximum number of interactions to retrieve. (Useful for pagination)

    Returns:
        List[Interaction], each containing an Interaction with:

            + org_id: Short UUID string identifying the organization.
            + user_id: Short UUID string identifying the user.
            + agent_id: Short UUID string identifying the agent.
            + interaction_id: Short UUID string identifying the interaction.
            + created_at: DateTime object of when the interaction was created.
            + updated_at: DateTime object of when the interaction was last updated.
            + messages (if `with_their_messages` = True): List of messages in the interaction.
            + memories (if `with_their_memories` = True): List of memories gotten from all occurrences of this interaction.

    Note:
        A memory won't have a message source, if its interaction was updated with a conflicting conversation thread that lead to truncation of the former thread. See `graph.update_interaction_and_memories`
    """

    if not all(param and isinstance(param, str) for param in (org_id, user_id)):
        raise ValueError("`org_id` and `user_id` must be strings and have a value.")

    if not all(isinstance(param, int) for param in (skip, limit)):
        raise ValueError("`skip` and `limit` must be integers.")

    self.logger.info(
        f"Retrieving all interactions for user {user_id} with messages={with_their_messages} and memories={with_their_memories}"
    )

    async def get_interactions_tx(tx):

        query = """
            // Cleverly transverse through dates to get interactions sorted, avoiding having to sort all user interaction nodes.
            MATCH (d:Date {org_id: $org_id, user_id: $user_id})
            WITH d ORDER BY d.date DESC
            CALL (d) {
                MATCH (d)<-[:HAS_OCCURRENCE_ON]-(interaction)
                RETURN interaction ORDER BY interaction.updated_at DESC
            }
            WITH DISTINCT interaction SKIP $skip LIMIT $limit

            // Initialize messages/memories upfront
            WITH interaction, [] AS messages, [] AS memories 
        """

        if with_their_messages:
            query += """
            OPTIONAL MATCH (interaction)-[:FIRST_MESSAGE|IS_NEXT*]->(m:MessageBlock)
            WITH interaction, collect(m{.*}) as messages, memories
            """

        if with_their_memories:
            query += """
            OPTIONAL MATCH (interaction)<-[:INTERACTION_SOURCE]-(mem:Memory)
            OPTIONAL MATCH (mem)-[:MESSAGE_SOURCE]->(msg)

            WITH interaction, messages, mem, collect(msg{.*}) AS msg_sources

            OPTIONAL MATCH (user:User {org_id: mem.org_id, user_id: mem.user_id})              
            OPTIONAL MATCH (agent:Agent {org_id: mem.org_id, agent_id: mem.agent_id})

            WITH interaction, messages, collect(mem{
                                                    .*, 
                                                    memory: apoc.text.replace(
                                                        apoc.text.replace(mem.memory, '(?i)user_[a-z0-9\\-]+(?:\\'s)?', user.user_name), 
                                                        '(?i)agent_[a-z0-9\\-]+(?:\\'s)?',  agent.agent_label
                                                    ),
                                                    message_sources: msg_sources
                                                    }) as memories
            """

        query += """
            RETURN interaction{
                .org_id,
                .user_id,
                .agent_id,
                .interaction_id,
                .created_at,
                .updated_at,
                messages: messages,
                memories: memories
            } as interaction
        """

        result = await tx.run(
            query, org_id=org_id, user_id=user_id, skip=skip, limit=limit
        )

        records = await result.value("interaction", [])
        return records

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.READ_ACCESS
    ) as session:
        all_interactions_data = await session.execute_read(get_interactions_tx)

        return [
            models.Interaction(
                org_id=interaction_data["org_id"],
                user_id=interaction_data["user_id"],
                agent_id=interaction_data["agent_id"],
                interaction_id=interaction_data["interaction_id"],
                created_at=(interaction_data["created_at"]).to_native(),
                updated_at=(interaction_data["updated_at"]).to_native(),
                messages=[
                    models.MessageBlock(
                        role=message.get("role"),
                        content=message.get("content"),
                        msg_position=message["msg_position"],
                    )
                    for message in (interaction_data.get("messages") or [])
                ],
                memories=[
                    models.Memory(
                        org_id=memory["org_id"],
                        agent_id=memory["agent_id"],
                        user_id=memory["user_id"],
                        interaction_id=memory["interaction_id"],
                        memory_id=memory["memory_id"],
                        memory=memory["memory"],
                        obtained_at=(memory["obtained_at"]).to_native(),
                        message_sources=[
                            models.MessageBlock(
                                role=msg.get("role"),
                                content=msg.get("content"),
                                msg_position=msg["msg_position"],
                            )
                            for msg in (memory.get("message_sources") or [])
                        ],
                    )
                    for memory in (interaction_data.get("memories") or [])
                ],
            )
            for interaction_data in all_interactions_data
        ]
get_interaction async
get_interaction(
    org_id: str,
    user_id: str,
    interaction_id: str,
    with_messages: bool = True,
    with_memories: bool = True,
) -> models.Interaction

Retrieves all messages associated with a specific interaction.

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization.

TYPE: str

user_id

Short UUID string identifying the user.

TYPE: str

interaction_id

Short UUID string identifying the interaction.

TYPE: str

with_messages

Whether to retrieve messages along with the interaction.

TYPE: bool DEFAULT: True

with_memories

Whether to also retrieve memories gotten across all occurrences of this interaction.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
Interaction

Interaction containing:

  • org_id: Short UUID string identifying the organization.
  • user_id: Short UUID string identifying the user.
  • agent_id: Short UUID string identifying the agent.
  • interaction_id: Short UUID string identifying the interaction.
  • created_at: DateTime object of when the interaction was created.
  • updated_at: DateTime object of when the interaction was last updated.
  • messages (if with_messages = True): List of messages in the interaction.
  • memories (if with_memories = True): List of memories gotten from all occurrences of this interaction.
Note

A memory won't have a message source, if its interaction was updated with a conflicting conversation thread that lead to truncation of the former thread. See graph.update_interaction_and_memories

Source code in memora/graph_db/neo4j/interaction.py
@override
async def get_interaction(
    self,
    org_id: str,
    user_id: str,
    interaction_id: str,
    with_messages: bool = True,
    with_memories: bool = True,
) -> models.Interaction:
    """
    Retrieves all messages associated with a specific interaction.

    Args:
        org_id (str): Short UUID string identifying the organization.
        user_id (str): Short UUID string identifying the user.
        interaction_id (str): Short UUID string identifying the interaction.
        with_messages (bool): Whether to retrieve messages along with the interaction.
        with_memories (bool): Whether to also retrieve memories gotten across all occurrences of this interaction.

    Returns:
        Interaction containing:

            + org_id: Short UUID string identifying the organization.
            + user_id: Short UUID string identifying the user.
            + agent_id: Short UUID string identifying the agent.
            + interaction_id: Short UUID string identifying the interaction.
            + created_at: DateTime object of when the interaction was created.
            + updated_at: DateTime object of when the interaction was last updated.
            + messages (if `with_messages` = True): List of messages in the interaction.
            + memories (if `with_memories` = True): List of memories gotten from all occurrences of this interaction.

    Note:
        A memory won't have a message source, if its interaction was updated with a conflicting conversation thread that lead to truncation of the former thread. See `graph.update_interaction_and_memories`
    """

    if not all(
        param and isinstance(param, str)
        for param in (org_id, user_id, interaction_id)
    ):
        raise ValueError(
            "`org_id`, `user_id` and `interaction_id` must be strings and have a value."
        )

    self.logger.info(
        f"Retrieving interaction {interaction_id} for user {user_id} with messages={with_messages} and memories={with_memories}"
    )

    async def get_interaction_tx(tx):

        query = """
            MATCH (interaction: Interaction {
                org_id: $org_id, 
                user_id: $user_id, 
                interaction_id: $interaction_id
            })

            // Initialize messages/memories upfront
            WITH interaction, [] AS messages, [] AS memories
        """

        if with_messages:
            query += """
            OPTIONAL MATCH (interaction)-[:FIRST_MESSAGE|IS_NEXT*]->(m:MessageBlock)
            WITH interaction, collect(m{.*}) as messages, memories
            """

        if with_memories:
            query += """
            OPTIONAL MATCH (interaction)<-[:INTERACTION_SOURCE]-(mem:Memory)
            OPTIONAL MATCH (mem)-[:MESSAGE_SOURCE]->(msg)

            WITH interaction, messages, mem, collect(msg{.*}) AS msg_sources

            OPTIONAL MATCH (user:User {org_id: mem.org_id, user_id: mem.user_id})              
            OPTIONAL MATCH (agent:Agent {org_id: mem.org_id, agent_id: mem.agent_id})

            WITH interaction, messages, collect(mem{
                                                    .*, 
                                                    memory: apoc.text.replace(
                                                        apoc.text.replace(mem.memory, '(?i)user_[a-z0-9\\-]+(?:\\'s)?', user.user_name), 
                                                        '(?i)agent_[a-z0-9\\-]+(?:\\'s)?',  agent.agent_label
                                                    ),
                                                    message_sources: msg_sources
                                                    }) as memories
            """

        query += """
            RETURN interaction{
                .org_id,
                .user_id,
                .agent_id,
                .interaction_id,
                .created_at,
                .updated_at,
                messages: messages,
                memories: memories
            } as interaction
        """

        result = await tx.run(
            query,
            org_id=org_id,
            user_id=user_id,
            interaction_id=interaction_id,
        )

        record = await result.single()
        return record["interaction"] if record else None

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.READ_ACCESS
    ) as session:
        interaction_data = await session.execute_read(get_interaction_tx)

        if interaction_data is None:
            self.logger.info(
                f"Interaction {interaction_id} not found for user {user_id}"
            )
            raise neo4j.exceptions.Neo4jError(
                "Interaction (`org_id`, `user_id`, `interaction_id`) does not exist."
            )

        return models.Interaction(
            org_id=interaction_data["org_id"],
            user_id=interaction_data["user_id"],
            agent_id=interaction_data["agent_id"],
            interaction_id=interaction_data["interaction_id"],
            created_at=(interaction_data["created_at"]).to_native(),
            updated_at=(interaction_data["updated_at"]).to_native(),
            messages=[
                models.MessageBlock(
                    role=message.get("role"),
                    content=message.get("content"),
                    msg_position=message["msg_position"],
                )
                for message in (interaction_data.get("messages") or [])
            ],
            memories=[
                models.Memory(
                    org_id=memory["org_id"],
                    agent_id=memory["agent_id"],
                    user_id=memory["user_id"],
                    interaction_id=memory["interaction_id"],
                    memory_id=memory["memory_id"],
                    memory=memory["memory"],
                    obtained_at=(memory["obtained_at"]).to_native(),
                    message_sources=[
                        models.MessageBlock(
                            role=msg.get("role"),
                            content=msg.get("content"),
                            msg_position=msg["msg_position"],
                        )
                        for msg in (memory.get("message_sources") or [])
                    ],
                )
                for memory in (interaction_data.get("memories") or [])
            ],
        )
save_interaction_with_memories async
save_interaction_with_memories(
    org_id: str,
    agent_id: str,
    user_id: str,
    memories_and_interaction: MemoriesAndInteraction,
) -> Tuple[str, datetime]

Creates a new interaction record with associated memories.

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization.

TYPE: str

agent_id

Short UUID string identifying the agent.

TYPE: str

user_id

Short UUID string identifying the user.

TYPE: str

memories_and_interaction

Contains both the interaction and the associated memories.

TYPE: MemoriesAndInteraction

Note

If the graph database is associated with a vector database, the memories are also stored there for data consistency.

RETURNS DESCRIPTION
Tuple[str, datetime]

Tuple[str, datetime] containing:

  • interaction_id: Short UUID string identifying the created interaction
  • created_at: DateTime object of when the interaction was created.
Source code in memora/graph_db/neo4j/interaction.py
@override
async def save_interaction_with_memories(
    self,
    org_id: str,
    agent_id: str,
    user_id: str,
    memories_and_interaction: MemoriesAndInteraction,
) -> Tuple[str, datetime]:
    """
    Creates a new interaction record with associated memories.

    Args:
        org_id (str): Short UUID string identifying the organization.
        agent_id (str): Short UUID string identifying the agent.
        user_id (str): Short UUID string identifying the user.
        memories_and_interaction (MemoriesAndInteraction): Contains both the interaction and the associated memories.

    Note:
        If the graph database is associated with a vector database, the memories are also stored there for data consistency.

    Returns:
        Tuple[str, datetime] containing:

            + interaction_id: Short UUID string identifying the created interaction
            + created_at: DateTime object of when the interaction was created.
    """

    if not all(
        param and isinstance(param, str) for param in (org_id, user_id, agent_id)
    ):
        raise ValueError(
            "`org_id`, `user_id` and `agent_id` must be strings and have a value."
        )

    interaction_id = shortuuid.uuid()
    new_memory_ids = [
        str(uuid.uuid4()) for _ in range(len(memories_and_interaction.memories))
    ]
    new_contrary_memory_ids = [
        str(uuid.uuid4())
        for _ in range(len(memories_and_interaction.contrary_memories))
    ]

    self.logger.info(
        f"Saving interaction {interaction_id} for user {user_id} with agent {agent_id}"
    )

    async def save_tx(tx):

        # Create interaction and connect to date of occurance.
        await tx.run(
            """
            MATCH (u:User {org_id: $org_id, user_id: $user_id})-[:INTERACTIONS_IN]->(ic)
            CREATE (interaction:Interaction {
                org_id: $org_id,
                user_id: $user_id,
                agent_id: $agent_id,
                interaction_id: $interaction_id,
                created_at: datetime($interaction_date),
                updated_at: datetime($interaction_date)
            })
            CREATE (ic)-[:HAD_INTERACTION]->(interaction)

            WITH interaction, u
            MERGE (d:Date {
                org_id: $org_id,
                user_id: $user_id,
                date: date(datetime($interaction_date))
            })
            CREATE (interaction)-[:HAS_OCCURRENCE_ON]->(d)

        """,
            org_id=org_id,
            user_id=user_id,
            agent_id=agent_id,
            interaction_id=interaction_id,
            interaction_date=memories_and_interaction.interaction_date.isoformat(),
        )

        if not memories_and_interaction.interaction:
            self.logger.info(
                f"No messages to save for interaction {interaction_id}"
            )
            return (
                interaction_id,
                memories_and_interaction.interaction_date.isoformat(),
            )

        # Add the messages to the interaction.
        self.logger.info(f"Adding messages to interaction {interaction_id}")
        await self._add_messages_to_interaction_from_top(
            tx,
            org_id,
            user_id,
            interaction_id,
            memories_and_interaction.interaction,
        )

        if new_memory_ids or new_contrary_memory_ids:
            # Add the all memories (new & new contrary) and connect to their interaction message source.
            self.logger.info("Adding memories and linking to their message source")
            await self._add_memories_with_their_source_links(
                tx,
                org_id,
                user_id,
                agent_id,
                interaction_id,
                memories_and_interaction,
                new_memory_ids,
                new_contrary_memory_ids,
            )

        if new_contrary_memory_ids:
            # Link the new contary memories as updates to the old memory they contradicted.
            self.logger.info(
                "Linking contrary memories to existing memories they contradicted"
            )
            await self._link_update_contrary_memories_to_existing_memories(
                tx,
                org_id,
                user_id,
                new_contrary_memory_ids,
                memories_and_interaction,
            )

        if new_memory_ids or new_contrary_memory_ids:
            if (
                self.associated_vector_db
            ):  # If the graph database is associated with a vector database
                # Add memories to vector DB within this transcation function to ensure data consistency (They succeed or fail together).
                await self.associated_vector_db.add_memories(
                    org_id=org_id,
                    user_id=user_id,
                    agent_id=agent_id,
                    memory_ids=(
                        new_memory_ids + new_contrary_memory_ids
                    ),  # All memory ids
                    memories=[
                        memory_obj.memory
                        for memory_obj in (
                            memories_and_interaction.memories
                            + memories_and_interaction.contrary_memories
                        )
                    ],  # All memories
                    obtained_at=memories_and_interaction.interaction_date.isoformat(),
                )

        return interaction_id, memories_and_interaction.interaction_date

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.WRITE_ACCESS
    ) as session:
        result = await session.execute_write(save_tx)
        self.logger.info(
            f"Successfully saved interaction {interaction_id} for user {user_id}"
        )
        return result
update_interaction_and_memories async
update_interaction_and_memories(
    org_id: str,
    agent_id: str,
    user_id: str,
    interaction_id: str,
    updated_memories_and_interaction: MemoriesAndInteraction,
) -> Tuple[str, datetime]

Update an existing interaction record and add new memories.

Compares updated interaction with existing one
  • If differences are found, truncates existing record from that point and replaces with updated version. Old memories from truncated message(s) remain but become standalone (no longer linked to truncated messages).
  • If no differences, appends new messages from the update.

New memories are always added, regardless of interaction changes.

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization.

TYPE: str

agent_id

Short UUID string identifying the agent in the updated interaction.

TYPE: str

user_id

Short UUID string identifying the user.

TYPE: str

interaction_id

Short UUID string identifying the interaction to update.

TYPE: str

updated_memories_and_interaction

Contains both the updated interaction and the associated new memories.

TYPE: MemoriesAndInteraction

Note

If the graph database is associated with a vector database, the memories are also stored there for data consistency.

RETURNS DESCRIPTION
Tuple[str, datetime]

Tuple[str, datetime] containing:

  • interaction_id: Short UUID string identifying the updated interaction
  • updated_at: DateTime object of when the interaction was last updated.
Source code in memora/graph_db/neo4j/interaction.py
@override
async def update_interaction_and_memories(
    self,
    org_id: str,
    agent_id: str,
    user_id: str,
    interaction_id: str,
    updated_memories_and_interaction: MemoriesAndInteraction,
) -> Tuple[str, datetime]:
    """
    Update an existing interaction record and add new memories.

    Compares updated interaction with existing one:
        - If differences are found, truncates existing record from that point and
        replaces with updated version. Old memories from truncated message(s)
        remain but become standalone (no longer linked to truncated messages).
        - If no differences, appends new messages from the update.

    New memories are always added, regardless of interaction changes.

    Args:
        org_id (str): Short UUID string identifying the organization.
        agent_id (str): Short UUID string identifying the agent in the updated interaction.
        user_id (str): Short UUID string identifying the user.
        interaction_id (str): Short UUID string identifying the interaction to update.
        updated_memories_and_interaction (MemoriesAndInteraction): Contains both the updated interaction and the associated new memories.

    Note:
        If the graph database is associated with a vector database, the memories are also stored there for data consistency.

    Returns:
        Tuple[str, datetime] containing:

            + interaction_id: Short UUID string identifying the updated interaction
            + updated_at: DateTime object of when the interaction was last updated.
    """

    if not all(
        param and isinstance(param, str) for param in (org_id, user_id, agent_id)
    ):
        raise ValueError(
            "`org_id`, `user_id` and `agent_id` must be strings and have a value."
        )

    self.logger.info(
        f"Updating interaction {interaction_id} for user {user_id} with agent {agent_id}"
    )

    new_memory_ids = [
        str(uuid.uuid4())
        for _ in range(len(updated_memories_and_interaction.memories))
    ]
    new_contrary_memory_ids = [
        str(uuid.uuid4())
        for _ in range(len(updated_memories_and_interaction.contrary_memories))
    ]

    # First get the existing messages.
    existing_messages: List[models.MessageBlock] = (
        await self.get_interaction(
            org_id, user_id, interaction_id, with_messages=True, with_memories=False
        )
    ).messages

    updated_interaction_length = len(updated_memories_and_interaction.interaction)
    existing_interaction_length = len(existing_messages)

    async def update_tx(tx):

        # Case 1: Empty updated interaction - delete all existing messages
        if updated_interaction_length == 0:
            self.logger.info(
                f"Truncating all messages from interaction {interaction_id} as updated interaction is empty"
            )
            await self._truncate_interaction_message_below_point(
                tx, org_id, user_id, interaction_id, truncation_point_inclusive=0
            )

        # Case 2: Empty existing interaction - add all new messages from the top
        elif existing_interaction_length == 0:
            self.logger.info(f"Adding all messages to interaction {interaction_id}")
            await self._add_messages_to_interaction_from_top(
                tx,
                org_id,
                user_id,
                interaction_id,
                updated_memories_and_interaction.interaction,
            )

        # Case 3: Both interactions have messages - compare and update
        else:
            # Find first point of difference
            truncate_from = -1
            for i in range(
                min(existing_interaction_length, updated_interaction_length)
            ):
                if (
                    existing_messages[i].role
                    != updated_memories_and_interaction.interaction[i].get("role")
                ) or (
                    existing_messages[i].content
                    != updated_memories_and_interaction.interaction[i].get(
                        "content"
                    )
                ):
                    truncate_from = i
                    break

            # If no differences found in prefix messages, but updated interaction is shorter
            if (
                truncate_from == -1
                and updated_interaction_length < existing_interaction_length
            ):
                truncate_from = updated_interaction_length

            # Handle different cases based on where the difference was found
            if truncate_from == -1:
                # Append the new messages at the bottom.
                self.logger.info(
                    f"Appending new messages to interaction {interaction_id}"
                )
                await self._append_messages_to_interaction(
                    tx,
                    org_id,
                    user_id,
                    interaction_id,
                    updated_memories_and_interaction.interaction,
                )

            elif truncate_from == 0:
                # Complete replacement needed
                self.logger.info(
                    f"Storing latest interaction {interaction_id} messages"
                )
                await self._truncate_interaction_message_below_point(
                    tx,
                    org_id,
                    user_id,
                    interaction_id,
                    truncation_point_inclusive=0,
                )
                await self._add_messages_to_interaction_from_top(
                    tx,
                    org_id,
                    user_id,
                    interaction_id,
                    updated_memories_and_interaction.interaction,
                )

            elif truncate_from > 0:
                # Partial replacement needed
                self.logger.info(
                    f"Updating messages in interaction {interaction_id} from position {truncate_from}"
                )
                await self._truncate_interaction_message_below_point(
                    tx, org_id, user_id, interaction_id, truncate_from
                )
                await self._append_messages_to_interaction(
                    tx,
                    org_id,
                    user_id,
                    interaction_id,
                    updated_memories_and_interaction.interaction,
                )

        if new_memory_ids or new_contrary_memory_ids:
            self.logger.info("Adding memories and linking to their source messages")
            await self._add_memories_with_their_source_links(
                tx,
                org_id,
                user_id,
                agent_id,
                interaction_id,
                updated_memories_and_interaction,
                new_memory_ids,
                new_contrary_memory_ids,
            )

        if new_contrary_memory_ids:
            self.logger.info(
                "Linking contrary memories to existing memories they contradicted"
            )
            await self._link_update_contrary_memories_to_existing_memories(
                tx,
                org_id,
                user_id,
                new_contrary_memory_ids,
                updated_memories_and_interaction,
            )

        # Update the interaction agent, updated_at datetime, and connect occurance to the particular date.
        await tx.run(
            """
            MATCH (i:Interaction {
                org_id: $org_id,
                user_id: $user_id,
                interaction_id: $interaction_id
            })
            SET i.updated_at = datetime($updated_date), i.agent_id = $agent_id
            MERGE (d:Date {
                org_id: $org_id,
                user_id: $user_id,
                date: date(datetime($updated_date))
            })
            MERGE (i)-[:HAS_OCCURRENCE_ON]->(d)
        """,
            org_id=org_id,
            user_id=user_id,
            agent_id=agent_id,
            interaction_id=interaction_id,
            updated_date=updated_memories_and_interaction.interaction_date.isoformat(),
        )

        if new_memory_ids or new_contrary_memory_ids:
            if self.associated_vector_db:
                # If the graph database is associated with a vector database
                # Add memories to vector DB within this transcation function to ensure data consistency (They succeed or fail together).
                await self.associated_vector_db.add_memories(
                    org_id=org_id,
                    user_id=user_id,
                    agent_id=agent_id,
                    memory_ids=(
                        new_memory_ids + new_contrary_memory_ids
                    ),  # All memory ids
                    memories=[
                        memory_obj.memory
                        for memory_obj in (
                            updated_memories_and_interaction.memories
                            + updated_memories_and_interaction.contrary_memories
                        )
                    ],  # All memories
                    obtained_at=updated_memories_and_interaction.interaction_date.isoformat(),
                )

        return (
            interaction_id,
            updated_memories_and_interaction.interaction_date,
        )

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.WRITE_ACCESS
    ) as session:
        result = await session.execute_write(update_tx)
        self.logger.info(f"Successfully updated interaction {interaction_id}")
        return result

memora.graph_db.neo4j.Neo4jMemory

Bases: BaseGraphDB

Functions

delete_all_user_memories async
delete_all_user_memories(org_id: str, user_id: str) -> None

Deletes all memories of a specific user.

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization

TYPE: str

user_id

Short UUID string identifying the user

TYPE: str

Note

If the graph database is associated with a vector database, the memories are also deleted there for data consistency.

Source code in memora/graph_db/neo4j/memory.py
@override
async def delete_all_user_memories(
    self,
    org_id: str,
    user_id: str,
) -> None:
    """
    Deletes all memories of a specific user.

    Args:
        org_id (str): Short UUID string identifying the organization
        user_id (str): Short UUID string identifying the user

    Note:
        If the graph database is associated with a vector database, the memories are also deleted there for data consistency.
    """

    if not all(param and isinstance(param, str) for param in (org_id, user_id)):
        raise ValueError("`org_id` and `user_id` must be strings and have a value.")

    self.logger.info(f"Deleting all memories for user {user_id}")

    async def delete_all_memories_tx(tx):
        await tx.run(
            """
            MATCH (u:User {org_id: $org_id, user_id: $user_id})-[:HAS_MEMORIES]->(mc:MemoryCollection)
            MATCH (mc)-[:INCLUDES]->(memory:Memory)
            DETACH DELETE memory
        """,
            org_id=org_id,
            user_id=user_id,
        )

        if (
            self.associated_vector_db
        ):  # If the graph database is associated with a vector database
            # Delete all memories from vector DB.
            await self.associated_vector_db.delete_all_user_memories(
                org_id, user_id
            )

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.WRITE_ACCESS
    ) as session:
        await session.execute_write(delete_all_memories_tx)
        self.logger.info(f"Successfully deleted all memories for user {user_id}")
delete_user_memory async
delete_user_memory(
    org_id: str, user_id: str, memory_id: str
) -> None

Deletes a specific memory.

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization

TYPE: str

user_id

Short UUID string identifying the user

TYPE: str

memory_id

UUID string identifying the memory to delete

TYPE: str

Note

If the graph database is associated with a vector database, the memory is also deleted there for data consistency.

Source code in memora/graph_db/neo4j/memory.py
@override
async def delete_user_memory(
    self,
    org_id: str,
    user_id: str,
    memory_id: str,
) -> None:
    """
    Deletes a specific memory.

    Args:
        org_id (str): Short UUID string identifying the organization
        user_id (str): Short UUID string identifying the user
        memory_id (str): UUID string identifying the memory to delete

    Note:
        If the graph database is associated with a vector database, the memory is also deleted there for data consistency.
    """

    if not all(
        param and isinstance(param, str) for param in (org_id, user_id, memory_id)
    ):
        raise ValueError(
            "`org_id`, `user_id` and `memory_id` must be strings and have a value."
        )

    self.logger.info(f"Deleting memory {memory_id}")

    async def delete_memory_tx(tx):
        await tx.run(
            """
            MATCH (m:Memory {org_id: $org_id, user_id: $user_id, memory_id: $memory_id})
            DETACH DELETE m
        """,
            org_id=org_id,
            user_id=user_id,
            memory_id=memory_id,
        )

        if (
            self.associated_vector_db and memory_id
        ):  # If the graph database is associated with a vector database
            # Delete memory from vector DB.
            await self.associated_vector_db.delete_memory(memory_id)

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.WRITE_ACCESS
    ) as session:
        await session.execute_write(delete_memory_tx)
        self.logger.info(f"Successfully deleted memory {memory_id}")
fetch_user_memories_resolved async
fetch_user_memories_resolved(
    org_user_mem_ids: List[Dict[str, str]]
) -> List[models.Memory]

Fetches memories from the Neo4j GraphDB by their IDs, resolves any contrary updates, and replaces user/agent placeholders with actual names.

This method performs several operations
  1. Retrieves memories using (org_id, user_id, memory_ids)
  2. If a memory has a CONTRARY_UPDATE relationship, uses the newer memory version
  3. Replaces user_id & agent_id placeholders (e.g 'user_abc123' or 'agent_xyz789') in memories with actual user names / agent labels
PARAMETER DESCRIPTION
org_user_mem_ids

List of Dicts containing org, user, and memory ids of the memories to fetch and process

TYPE: List[Dict[str, str]]

RETURNS DESCRIPTION
List[Memory]

List[Memory] containing memory details:

  • org_id: Short UUID string identifying the organization
  • agent_id: Short UUID string identifying the agent
  • user_id: Short UUID string identifying the user
  • interaction_id: Short UUID string identifying the interaction the memory was sourced from
  • memory_id: Full UUID string identifying the memory
  • memory: The resolved memory
  • obtained_at: DateTime object of when the memory was obtained
  • message_sources: List of messages in the interaction that triggered the memory
Example
>>> org_user_mem_ids = [{'memory_id': '443ac3a8-fe87-49a4-93d2-05d3eb58ddeb', 'org_id': 'gmDr4sUiWMNqbGAiV8ijbU', 'user_id': 'CcyKXxhi2skEcDpRzNZim7'}, ...]
>>> memories = graphInstance.fetch_memories_resolved(org_user_mem_ids)
>>> print([memoryObj.memory for memoryObj in memories])
["John asked for help with a wedding ring", "Sarah is allergic to peanuts"]
Note
  • Org, user, and memory IDs are typically retrieved from a vector database before being passed to this method.
  • A memory won't have a message source, if its interaction was updated with a conflicting conversation thread that lead to truncation of the former thread. See graph.update_interaction_and_memories
Source code in memora/graph_db/neo4j/memory.py
@override
async def fetch_user_memories_resolved(
    self, org_user_mem_ids: List[Dict[str, str]]
) -> List[models.Memory]:
    """
    Fetches memories from the Neo4j GraphDB by their IDs, resolves any contrary updates, and replaces user/agent placeholders with actual names.

    This method performs several operations:
      1. Retrieves memories using (org_id, user_id, memory_ids)
      2. If a memory has a CONTRARY_UPDATE relationship, uses the newer memory version
      3. Replaces user_id & agent_id placeholders (e.g 'user_abc123' or 'agent_xyz789') in memories with actual user names / agent labels

    Args:
        org_user_mem_ids (List[Dict[str, str]]): List of Dicts containing org, user, and memory ids of the memories to fetch and process

    Returns:
        List[Memory] containing memory details:

            + org_id: Short UUID string identifying the organization
            + agent_id: Short UUID string identifying the agent
            + user_id: Short UUID string identifying the user
            + interaction_id: Short UUID string identifying the interaction the memory was sourced from
            + memory_id: Full UUID string identifying the memory
            + memory: The resolved memory
            + obtained_at: DateTime object of when the memory was obtained
            + message_sources: List of messages in the interaction that triggered the memory

    Example:
        ```python
        >>> org_user_mem_ids = [{'memory_id': '443ac3a8-fe87-49a4-93d2-05d3eb58ddeb', 'org_id': 'gmDr4sUiWMNqbGAiV8ijbU', 'user_id': 'CcyKXxhi2skEcDpRzNZim7'}, ...]
        >>> memories = graphInstance.fetch_memories_resolved(org_user_mem_ids)
        >>> print([memoryObj.memory for memoryObj in memories])
        ["John asked for help with a wedding ring", "Sarah is allergic to peanuts"]
        ```

    Note:
        - Org, user, and memory IDs are typically retrieved from a vector database before being passed to this method.
        - A memory won't have a message source, if its interaction was updated with a conflicting conversation thread that lead to truncation of the former thread. See `graph.update_interaction_and_memories`
    """

    results = await self.fetch_user_memories_resolved_batch([org_user_mem_ids])
    return results[0]
fetch_user_memories_resolved_batch async
fetch_user_memories_resolved_batch(
    batch_org_user_mem_ids: List[List[Dict[str, str]]]
) -> List[List[models.Memory]]

Fetches memories from the Neo4j GraphDB by their IDs, resolves any contrary updates, and replaces user/agent placeholders with actual names.

This method performs several operations
  1. Retrieves memories using (org_id, user_id, memory_ids)
  2. If a memory has a CONTRARY_UPDATE relationship, uses the newer memory version
  3. Replaces user_id & agent_id placeholders (e.g 'user_abc123' or 'agent_xyz789') in memories with actual user names / agent labels
PARAMETER DESCRIPTION
batch_org_user_mem_ids

List of lists containing Dicts with org, user, and memory ids of the memories to fetch and process

TYPE: List[List[Dict[str, str]]]

RETURNS DESCRIPTION
List[List[Memory]]

List[List[Memory]] with memory details:

  • org_id: Short UUID string identifying the organization
  • agent_id: Short UUID string identifying the agent
  • user_id: Short UUID string identifying the user
  • interaction_id: Short UUID string identifying the interaction the memory was sourced from
  • memory_id: Full UUID string identifying the memory
  • memory: The resolved memory
  • obtained_at: DateTime object of when the memory was obtained
  • message_sources: List of messages in the interaction that triggered the memory
Example
>>> batch_org_user_mem_ids = [[{"memory_id": "413ac3a8-fe87-49a4-93d2-05d3eb58ddeb", "org_id": "gmDr4sUiWMNqbGAiV8ijbU", "user_id": "CcyKXxhi2skEcDpRzNZim7"}, ...], [{...}, ...]]
>>> batch_memories = graphInstance.fetch_memories_resolved_batch(batch_org_user_mem_ids)
>>> print([[memoryObj.memory for memoryObj in memories] for memories in batch_memories])
[["John asked for help with a wedding ring", "Sarah is allergic to peanuts"], ["John is about to propose to Sarah"]]
Note
  • Batch org, user, and memory IDs are typically retrieved from a vector database before being passed to this method.
  • A memory won't have a message source, if its interaction was updated with a conflicting conversation thread that lead to truncation of the former thread. See graph.update_interaction_and_memories
Source code in memora/graph_db/neo4j/memory.py
@override
async def fetch_user_memories_resolved_batch(
    self, batch_org_user_mem_ids: List[List[Dict[str, str]]]
) -> List[List[models.Memory]]:
    """
    Fetches memories from the Neo4j GraphDB by their IDs, resolves any contrary updates, and replaces user/agent placeholders with actual names.

    This method performs several operations:
      1. Retrieves memories using (org_id, user_id, memory_ids)
      2. If a memory has a CONTRARY_UPDATE relationship, uses the newer memory version
      3. Replaces user_id & agent_id placeholders (e.g 'user_abc123' or 'agent_xyz789') in memories with actual user names / agent labels

    Args:
        batch_org_user_mem_ids (List[List[Dict[str, str]]]): List of lists containing Dicts with org, user, and memory ids of the memories to fetch and process

    Returns:
        List[List[Memory]] with memory details:

            + org_id: Short UUID string identifying the organization
            + agent_id: Short UUID string identifying the agent
            + user_id: Short UUID string identifying the user
            + interaction_id: Short UUID string identifying the interaction the memory was sourced from
            + memory_id: Full UUID string identifying the memory
            + memory: The resolved memory
            + obtained_at: DateTime object of when the memory was obtained
            + message_sources: List of messages in the interaction that triggered the memory

    Example:
        ```python
        >>> batch_org_user_mem_ids = [[{"memory_id": "413ac3a8-fe87-49a4-93d2-05d3eb58ddeb", "org_id": "gmDr4sUiWMNqbGAiV8ijbU", "user_id": "CcyKXxhi2skEcDpRzNZim7"}, ...], [{...}, ...]]
        >>> batch_memories = graphInstance.fetch_memories_resolved_batch(batch_org_user_mem_ids)
        >>> print([[memoryObj.memory for memoryObj in memories] for memories in batch_memories])
        [["John asked for help with a wedding ring", "Sarah is allergic to peanuts"], ["John is about to propose to Sarah"]]
        ```

    Note:
        - Batch org, user, and memory IDs are typically retrieved from a vector database before being passed to this method.
        - A memory won't have a message source, if its interaction was updated with a conflicting conversation thread that lead to truncation of the former thread. See `graph.update_interaction_and_memories`
    """

    async def fetch_resolved_batch_tx(tx):
        result = await tx.run(
            """
            UNWIND $batch_ids AS ids_dict_list

            CALL (ids_dict_list) {

                UNWIND ids_dict_list AS ids_dict
                MATCH (memory:Memory {org_id: ids_dict.org_id, user_id: ids_dict.user_id, memory_id: ids_dict.memory_id})

                // Use the most up to date contrary update memory if it exists
                OPTIONAL MATCH (memory)-[:CONTRARY_UPDATE*]->(contraryMemory:Memory) WHERE NOT (contraryMemory)-[:CONTRARY_UPDATE]->()
                WITH coalesce(contraryMemory, memory) AS memoryToReturn

                OPTIONAL MATCH (memoryToReturn)-[:MESSAGE_SOURCE]->(msgSource)
                WITH memoryToReturn, collect(msgSource{.*}) as msgSources

                MATCH (user:User {org_id: memoryToReturn.org_id, user_id: memoryToReturn.user_id})              
                MATCH (agent:Agent {org_id: memoryToReturn.org_id, agent_id: memoryToReturn.agent_id})

                // Case-insensitive 'user_' or 'agent_' followed by UUID and optional ('s) placeholders are replaced with actual names
                RETURN collect(DISTINCT memoryToReturn{
                                            .org_id,
                                            .agent_id,
                                            .user_id,
                                            .interaction_id,
                                            .memory_id, 
                                            .obtained_at,
                                            memory: apoc.text.replace(
                                                apoc.text.replace(memoryToReturn.memory, '(?i)user_[a-z0-9\\-]+(?:\\'s)?', user.user_name), 
                                                '(?i)agent_[a-z0-9\\-]+(?:\\'s)?',  agent.agent_label
                                            ),
                                            message_sources: msgSources
                                        }) as resolved_memories
            }
            RETURN resolved_memories

        """,
            batch_ids=batch_org_user_mem_ids,
        )

        records = await result.value("resolved_memories", [])
        return records

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.READ_ACCESS
    ) as session:

        all_resolved_memories = await session.execute_read(fetch_resolved_batch_tx)

        return [
            [
                models.Memory(
                    org_id=resolved_memory["org_id"],
                    agent_id=resolved_memory["agent_id"],
                    user_id=resolved_memory["user_id"],
                    interaction_id=resolved_memory["interaction_id"],
                    memory_id=resolved_memory["memory_id"],
                    memory=resolved_memory["memory"],
                    obtained_at=(resolved_memory["obtained_at"]).to_native(),
                    message_sources=[
                        models.MessageBlock(
                            role=msg_source["role"],
                            content=msg_source["content"],
                            msg_position=msg_source["msg_position"],
                        )
                        for msg_source in (resolved_memory["message_sources"] or [])
                    ],
                )
                for resolved_memory in resolved_memories
            ]
            for resolved_memories in all_resolved_memories
        ]
get_all_user_memories async
get_all_user_memories(
    org_id: str,
    user_id: str,
    agent_id: Optional[str] = None,
    skip: int = 0,
    limit: int = 1000,
) -> List[models.Memory]

Retrieves all memories associated with a specific user.

Note

Memories are sorted in descending order by their obtained at datetime. (So most recent memories are first).

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization

TYPE: str

user_id

Short UUID string identifying the user

TYPE: str

agent_id

Optional short UUID string identifying the agent. If provided, only memories obtained from interactions with this agent are returned. Otherwise, all memories associated with the user are returned.

TYPE: Optional[str] DEFAULT: None

skip

Number of interactions to skip. (Useful for pagination)

TYPE: int DEFAULT: 0

limit

Maximum number of interactions to retrieve. (Useful for pagination)

TYPE: int DEFAULT: 1000

RETURNS DESCRIPTION
List[Memory]

List[Memory] containing memory details:

  • org_id: Short UUID string identifying the organization
  • agent_id: Short UUID string identifying the agent
  • user_id: Short UUID string identifying the user
  • interaction_id: Short UUID string identifying the interaction the memory was sourced from
  • memory_id: Full UUID string identifying the memory
  • memory: The resolved memory
  • obtained_at: DateTime object of when the memory was obtained
  • message_sources: List of messages in the interaction that triggered the memory
Note
  • A memory won't have a message source, if its interaction was updated with a conflicting conversation thread that lead to truncation of the former thread. See graph.update_interaction_and_memories
Source code in memora/graph_db/neo4j/memory.py
@override
async def get_all_user_memories(
    self,
    org_id: str,
    user_id: str,
    agent_id: Optional[str] = None,
    skip: int = 0,
    limit: int = 1000,
) -> List[models.Memory]:
    """
    Retrieves all memories associated with a specific user.

    Note:
        Memories are sorted in descending order by their obtained at datetime. (So most recent memories are first).

    Args:
        org_id (str): Short UUID string identifying the organization
        user_id (str): Short UUID string identifying the user
        agent_id (Optional[str]): Optional short UUID string identifying the agent. If provided, only memories obtained from
            interactions with this agent are returned.
            Otherwise, all memories associated with the user are returned.
        skip (int): Number of interactions to skip. (Useful for pagination)
        limit (int): Maximum number of interactions to retrieve. (Useful for pagination)

    Returns:
        List[Memory] containing memory details:

            + org_id: Short UUID string identifying the organization
            + agent_id: Short UUID string identifying the agent
            + user_id: Short UUID string identifying the user
            + interaction_id: Short UUID string identifying the interaction the memory was sourced from
            + memory_id: Full UUID string identifying the memory
            + memory: The resolved memory
            + obtained_at: DateTime object of when the memory was obtained
            + message_sources: List of messages in the interaction that triggered the memory

    Note:
        - A memory won't have a message source, if its interaction was updated with a conflicting conversation thread that lead to truncation of the former thread. See `graph.update_interaction_and_memories`
    """

    if not all(param and isinstance(param, str) for param in (org_id, user_id)):
        raise ValueError("`org_id` and `user_id` must be strings and have a value.")

    if agent_id:
        if not isinstance(agent_id, str):
            raise ValueError("`agent_id` must be a string.")

    self.logger.info(f" and agent {agent_id}" if agent_id else "")

    async def get_all_memories_tx(tx):
        query = """
            // Cleverly transverse through dates to get memories sorted, avoiding having to sort all user memory nodes.
            MATCH (d:Date {{org_id: $org_id, user_id: $user_id}})
            WITH d ORDER BY d.date DESC
            CALL (d) {{
                MATCH (d)<-[:DATE_OBTAINED]-(memory)
                {agent_filter}
                RETURN memory ORDER BY memory.obtained_at DESC
            }}

            WITH memory AS m SKIP $skip LIMIT $limit

            OPTIONAL MATCH (m)-[:MESSAGE_SOURCE]->(msgSource)
            WITH m, collect(msgSource{{.*}}) as msgSources

            MATCH (user:User {{org_id: m.org_id, user_id: m.user_id}})
            MATCH (agent:Agent {{org_id: m.org_id, agent_id: m.agent_id}})

            RETURN m{{
                .org_id,
                .agent_id,
                .user_id,
                .interaction_id,
                .memory_id, 
                .obtained_at,
                memory: apoc.text.replace(
                    apoc.text.replace(m.memory, '(?i)user_[a-z0-9\\-]+(?:\\'s)?', user.user_name), 
                    '(?i)agent_[a-z0-9\\-]+(?:\\'s)?',  agent.agent_label
                ),
                message_sources: msgSources
            }} as memory
        """
        agent_filter = "WHERE memory.agent_id = $agent_id" if agent_id else ""
        result = await tx.run(
            query.format(agent_filter=agent_filter),
            org_id=org_id,
            user_id=user_id,
            agent_id=agent_id,
            skip=skip,
            limit=limit,
        )

        records = await result.value("memory", [])
        return records

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.READ_ACCESS
    ) as session:
        memories = await session.execute_read(get_all_memories_tx)
        return [
            models.Memory(
                org_id=memory["org_id"],
                agent_id=memory["agent_id"],
                user_id=memory["user_id"],
                interaction_id=memory["interaction_id"],
                memory_id=memory["memory_id"],
                memory=memory["memory"],
                obtained_at=(memory["obtained_at"]).to_native(),
                message_sources=[
                    models.MessageBlock(
                        role=msg_source["role"],
                        content=msg_source["content"],
                        msg_position=msg_source["msg_position"],
                    )
                    for msg_source in (memory["message_sources"] or [])
                ],
            )
            for memory in memories
        ]
get_user_memory async
get_user_memory(
    org_id: str, user_id: str, memory_id: str
) -> models.Memory

Retrieves a specific memory.

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization

TYPE: str

user_id

Short UUID string identifying the user

TYPE: str

memory_id

UUID string identifying the memory

TYPE: str

RETURNS DESCRIPTION
Memory

Memory containing memory details:

  • org_id: Short UUID string identifying the organization
  • agent_id: Short UUID string identifying the agent
  • user_id: Short UUID string identifying the user
  • interaction_id: Short UUID string identifying the interaction the memory was sourced from
  • memory_id: Full UUID string identifying the memory
  • memory: The resolved memory
  • obtained_at: DateTime object of when the memory was obtained
  • message_sources: List of messages in the interaction that triggered the memory
Note
  • The memory won't have a message source, if its interaction was updated with a conflicting conversation thread that lead to truncation of the former thread. See graph.update_interaction_and_memories
Source code in memora/graph_db/neo4j/memory.py
@override
async def get_user_memory(
    self, org_id: str, user_id: str, memory_id: str
) -> models.Memory:
    """
    Retrieves a specific memory.

    Args:
        org_id (str): Short UUID string identifying the organization
        user_id (str): Short UUID string identifying the user
        memory_id (str): UUID string identifying the memory

    Returns:
        Memory containing memory details:

            + org_id: Short UUID string identifying the organization
            + agent_id: Short UUID string identifying the agent
            + user_id: Short UUID string identifying the user
            + interaction_id: Short UUID string identifying the interaction the memory was sourced from
            + memory_id: Full UUID string identifying the memory
            + memory: The resolved memory
            + obtained_at: DateTime object of when the memory was obtained
            + message_sources: List of messages in the interaction that triggered the memory

    Note:
        - The memory won't have a message source, if its interaction was updated with a conflicting conversation thread that lead to truncation of the former thread. See `graph.update_interaction_and_memories`
    """

    if not all(
        param and isinstance(param, str) for param in (org_id, user_id, memory_id)
    ):
        raise ValueError(
            "`org_id`, `user_id` and `memory_id` must be strings and have a value."
        )

    self.logger.info(f"Getting memory {memory_id} for user {user_id}")

    async def get_memory_tx(tx):
        result = await tx.run(
            """
            MATCH (m:Memory {org_id: $org_id, user_id: $user_id, memory_id: $memory_id})

            MATCH (m)-[:MESSAGE_SOURCE]->(msgSource)
            WITH m, collect(msgSource{.*}) as msgSources

            MATCH (user:User {org_id: m.org_id, user_id: m.user_id})              
            MATCH (agent:Agent {org_id: m.org_id, agent_id: m.agent_id})
            RETURN m{
                    .org_id,
                    .agent_id,
                    .user_id,
                    .interaction_id,
                    .memory_id, 
                    .obtained_at,
                    memory: apoc.text.replace(
                        apoc.text.replace(m.memory, '(?i)user_[a-z0-9\\-]+(?:\\'s)?', user.user_name), 
                        '(?i)agent_[a-z0-9\\-]+(?:\\'s)?',  agent.agent_label
                    ),
                    message_sources: msgSources
                } as memory
        """,
            org_id=org_id,
            user_id=user_id,
            memory_id=memory_id,
        )
        record = await result.single()
        return record["memory"] if record else None

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.READ_ACCESS
    ) as session:
        memory = await session.execute_read(get_memory_tx)

        if not memory:
            self.logger.info(
                f"Failed to get memory {memory_id}: Memory does not exist"
            )
            raise neo4j.exceptions.Neo4jError(
                "Memory (`org_id`, `user_id`, `memory_id`) does not exist."
            )

        return models.Memory(
            org_id=memory["org_id"],
            agent_id=memory["agent_id"],
            user_id=memory["user_id"],
            interaction_id=memory["interaction_id"],
            memory_id=memory["memory_id"],
            memory=memory["memory"],
            obtained_at=(memory["obtained_at"]).to_native(),
            message_sources=[
                models.MessageBlock(
                    role=msg_source["role"],
                    content=msg_source["content"],
                    msg_position=msg_source["msg_position"],
                )
                for msg_source in (memory["message_sources"] or [])
            ],
        )
get_user_memory_history async
get_user_memory_history(
    org_id: str, user_id: str, memory_id: str
) -> List[models.Memory]

Retrieves the history of a specific memory.

PARAMETER DESCRIPTION
org_id

Short UUID string identifying the organization

TYPE: str

user_id

Short UUID string identifying the user

TYPE: str

memory_id

UUID string identifying the memory

TYPE: str

RETURNS DESCRIPTION
List[Memory]

List[Memory] containing the history of memory details in descending order (starting with the current version, to the oldest version):

  • org_id: Short UUID string identifying the organization
  • agent_id: Short UUID string identifying the agent
  • user_id: Short UUID string identifying the user
  • interaction_id: Short UUID string identifying the interaction the memory was sourced from
  • memory_id: Full UUID string identifying the memory
  • memory: The resolved memory
  • obtained_at: DateTime object of when the memory was obtained
  • message_sources: List of messages in the interaction that triggered the memory
Note
  • A memory won't have a message source, if its interaction was updated with a conflicting conversation thread that lead to truncation of the former thread. See graph.update_interaction_and_memories
Source code in memora/graph_db/neo4j/memory.py
@override
async def get_user_memory_history(
    self, org_id: str, user_id: str, memory_id: str
) -> List[models.Memory]:
    """
    Retrieves the history of a specific memory.

    Args:
        org_id (str): Short UUID string identifying the organization
        user_id (str): Short UUID string identifying the user
        memory_id (str): UUID string identifying the memory

    Returns:
        List[Memory] containing the history of memory details in descending order (starting with the current version, to the oldest version):

            + org_id: Short UUID string identifying the organization
            + agent_id: Short UUID string identifying the agent
            + user_id: Short UUID string identifying the user
            + interaction_id: Short UUID string identifying the interaction the memory was sourced from
            + memory_id: Full UUID string identifying the memory
            + memory: The resolved memory
            + obtained_at: DateTime object of when the memory was obtained
            + message_sources: List of messages in the interaction that triggered the memory

    Note:
        - A memory won't have a message source, if its interaction was updated with a conflicting conversation thread that lead to truncation of the former thread. See `graph.update_interaction_and_memories`
    """

    if not all(
        param and isinstance(param, str) for param in (org_id, user_id, memory_id)
    ):
        raise ValueError(
            "`org_id`, `user_id` and `memory_id` must be strings and have a value."
        )

    self.logger.info(f"Getting memory history for memory {memory_id}")

    async def get_memory_history_tx(tx):
        result = await tx.run(
            """
            MATCH path=(m:Memory {org_id: $org_id, user_id: $user_id, memory_id: $memory_id})<-[:CONTRARY_UPDATE*0..]-(olderMemory:Memory)
            WHERE NOT (olderMemory)<-[:CONTRARY_UPDATE]-()
            WITH nodes(path) AS memory_history
            UNWIND memory_history AS memory

            OPTIONAL MATCH (memory)-[:MESSAGE_SOURCE]->(msgSource)
            WITH memory, collect(msgSource{.*}) as msgSources

            MATCH (user:User {org_id: memory.org_id, user_id: memory.user_id})              
            MATCH (agent:Agent {org_id: memory.org_id, agent_id: memory.agent_id})
            RETURN memory{
                    .org_id,
                    .agent_id,
                    .user_id,
                    .interaction_id,
                    .memory_id, 
                    .obtained_at,
                    memory: apoc.text.replace(
                        apoc.text.replace(memory.memory, '(?i)user_[a-z0-9\\-]+(?:\\'s)?', user.user_name), 
                        '(?i)agent_[a-z0-9\\-]+(?:\\'s)?',  agent.agent_label
                    ),
                    message_sources: msgSources
                } as memory
        """,
            org_id=org_id,
            user_id=user_id,
            memory_id=memory_id,
        )
        records = await result.value("memory", [])
        return records

    async with self.driver.session(
        database=self.database, default_access_mode=neo4j.READ_ACCESS
    ) as session:
        memory_history = await session.execute_read(get_memory_history_tx)
        return [
            models.Memory(
                org_id=memory["org_id"],
                agent_id=memory["agent_id"],
                user_id=memory["user_id"],
                interaction_id=memory["interaction_id"],
                memory_id=memory["memory_id"],
                memory=memory["memory"],
                obtained_at=(memory["obtained_at"]).to_native(),
                message_sources=[
                    models.MessageBlock(
                        role=msg_source["role"],
                        content=msg_source["content"],
                        msg_position=msg_source["msg_position"],
                    )
                    for msg_source in (memory["message_sources"] or [])
                ],
            )
            for memory in memory_history
        ]