diff --git a/.hydra_config/config.yaml b/.hydra_config/config.yaml index 69d170c6..e6169bed 100644 --- a/.hydra_config/config.yaml +++ b/.hydra_config/config.yaml @@ -51,6 +51,9 @@ reranker: model_name: ${oc.env:RERANKER_MODEL, Alibaba-NLP/gte-multilingual-reranker-base} top_k: ${oc.decode:${oc.env:RERANKER_TOP_K, 5}} # Number of documents to return after reranking. upgrade to 8 for better results if your llm has a wider context window base_url: ${oc.env:RERANKER_BASE_URL, http://reranker:${oc.env:RERANKER_PORT, 7997}} + # Temporal scoring parameters + temporal_weight: ${oc.decode:${oc.env:RERANKER_TEMPORAL_WEIGHT, 0.3}} # Weight for temporal scoring (0.0-1.0), 0.3 means 30% temporal, 70% relevance + temporal_decay_days: ${oc.decode:${oc.env:RERANKER_TEMPORAL_DECAY_DAYS, 365}} # Days for temporal score to decay to near zero map_reduce: # Number of documents to process in the initial mapping phase diff --git a/docker-compose.yaml b/docker-compose.yaml index a69ec3d5..2f473d9f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -4,8 +4,7 @@ include: - extern/infinity.yaml x-openrag: &openrag_template - image: ghcr.io/linagora/openrag:dev-latest - # image: linagoraai/openrag:latest + image: linagoraai/openrag:latest build: context: . dockerfile: Dockerfile @@ -58,7 +57,7 @@ x-vllm: &vllm_template services: # OpenRAG Indexer UI indexer-ui: - image: linagoraai/indexer-ui:v1.1 + image: linagoraai/indexer-ui:latest build: context: ./extern/indexer-ui dockerfile: Dockerfile diff --git a/docs/content/docs/documentation/API.mdx b/docs/content/docs/documentation/API.mdx index 04fd150a..5114da46 100644 --- a/docs/content/docs/documentation/API.mdx +++ b/docs/content/docs/documentation/API.mdx @@ -219,10 +219,45 @@ curl -X POST http://localhost:8080/v1/chat/completions \ } ], "temperature": 0.7, - "stream": false + "stream": false, + "metadata": { + "use_map_reduce": false, + "temporal_filter": { + "created_after": "2024-01-01T00:00:00Z", + "created_before": "2024-12-31T23:59:59Z" + } + } }' ``` +**Temporal Filtering:** + +OpenRAG supports temporal filtering to retrieve documents from specific time periods. You can include a `temporal_filter` object in the `metadata` field: + +- **Automatic extraction**: If no `temporal_filter` is provided, OpenRAG automatically extracts temporal expressions from your query (e.g., "documents from 2024", "last week's updates") +- **Manual filtering**: Explicitly specify date ranges using the following fields: + - `datetime_after` / `datetime_before`: Filter by document content datetime (highest priority) + - `modified_after` / `modified_before`: Filter by document modification date + - `created_after` / `created_before`: Filter by document creation date + - `indexed_after` / `indexed_before`: Filter by document indexing date + +All dates should be in ISO 8601 format (e.g., `2024-01-15T00:00:00Z`). + +**Example with temporal filter:** +```python +# Query documents from 2024 only +response = client.chat.completions.create( + model="openrag-my_partition", + messages=[{"role": "user", "content": "What are the latest updates?"}], + metadata={ + "temporal_filter": { + "created_after": "2024-01-01T00:00:00Z", + "created_before": "2024-12-31T23:59:59Z" + } + } +) +``` + * Text Completions ```http POST /v1/completions @@ -263,6 +298,51 @@ response = client.chat.completions.create( ) ``` +#### Example with Temporal Filtering + +```python +from openai import OpenAI +from datetime import datetime, timedelta, timezone + +client = OpenAI(api_key='your-auth-token', base_url="http://localhost:8080/v1") + +# Example 1: Query recent documents (last 30 days) +thirty_days_ago = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat() +response = client.chat.completions.create( + model="openrag-my_partition", + messages=[{"role": "user", "content": "What are the latest developments?"}], + extra_body={ + "metadata": { + "temporal_filter": { + "indexed_after": thirty_days_ago + } + } + } +) + +# Example 2: Query documents from a specific year +response = client.chat.completions.create( + model="openrag-my_partition", + messages=[{"role": "user", "content": "What happened in 2024?"}], + extra_body={ + "metadata": { + "temporal_filter": { + "created_after": "2024-01-01T00:00:00Z", + "created_before": "2024-12-31T23:59:59Z" + } + } + } +) + +# Example 3: Let OpenRAG automatically extract temporal context from query +# Queries like "last week", "documents from 2024", "recent changes" +# will automatically be filtered without explicit temporal_filter +response = client.chat.completions.create( + model="openrag-my_partition", + messages=[{"role": "user", "content": "Show me documents from last week"}] +) +``` + --- ## ⚠️ Error Handling diff --git a/docs/content/docs/documentation/data_model.md b/docs/content/docs/documentation/data_model.md index 9edb9881..3bbcb8ac 100644 --- a/docs/content/docs/documentation/data_model.md +++ b/docs/content/docs/documentation/data_model.md @@ -106,3 +106,45 @@ Defines the many-to-many relationship between **users** and **partitions**, incl --- +## **Vector Database Schema (Milvus)** + +In addition to the relational database, OpenRAG uses Milvus to store document chunks with embeddings and metadata. + +### Document Chunk Fields + +| Field Name | Type | Description | +|------------|------|-------------| +| `embedding` | FloatVector | Dense vector embedding of the chunk content | +| `sparse_vector` | SparseFloatVector | Sparse BM25 vector for hybrid search | +| `content` | VarChar | The actual text content of the chunk | +| `partition` | VarChar | Partition name this chunk belongs to | +| `filename` | VarChar | Source filename | +| `page` | Int64 | Page number in the source document | +| `_id` | VarChar (PK) | Unique chunk identifier | +| `datetime` | VarChar | Primary timestamp from document content (user-provided) | +| `modified_at` | VarChar | Document modification timestamp (ISO 8601 format) | +| `created_at` | VarChar | Document creation timestamp (ISO 8601 format) | +| `indexed_at` | VarChar | When the chunk was indexed into OpenRAG (ISO 8601 format) | + +### Temporal Fields Priority + +When filtering or scoring documents by time, OpenRAG uses the following priority: +1. **`datetime`** - Highest priority, user-provided timestamp from document content +2. **`modified_at`** - Document modification date +3. **`created_at`** - Document creation date +4. **`indexed_at`** - Fallback, when the document was indexed + +All temporal fields are stored in ISO 8601 format (e.g., `2024-01-15T10:30:00Z`). + +### Temporal Filtering + +Vector database queries support temporal filtering with the following parameters: +- `datetime_after` / `datetime_before` +- `modified_after` / `modified_before` +- `created_after` / `created_before` +- `indexed_after` / `indexed_before` + +These filters use **OR logic** between the different date fields to ensure maximum flexibility in retrieving time-relevant documents. + +--- + diff --git a/docs/content/docs/documentation/features_in_details.md b/docs/content/docs/documentation/features_in_details.md index 9dd70133..363f0c36 100644 --- a/docs/content/docs/documentation/features_in_details.md +++ b/docs/content/docs/documentation/features_in_details.md @@ -84,4 +84,49 @@ See the section on [distributed deployment in a ray cluster](#5-distributed-depl * **Contextual retrieval** - Anthropic's technique for enhanced chunk relevance * **Multilingual reranking** - using `Alibaba-NLP/gte-multilingual-reranker-base` + + +### 🕐 Temporal Awareness +OpenRAG includes intelligent temporal understanding to deliver more relevant, time-aware responses. + +
+ +Temporal Features + +* **Automatic date extraction** - Detects temporal expressions in queries across multiple languages + * ISO dates: `2024-01-15`, `2024-01-15T10:30:00` + * Numeric formats: `15/01/2024`, `01/15/2024`, `15.01.2024` + * Month-year: `01/2024`, `2024/01` + * Year only: `2024`, `2023` + * Relative time: "last 30 days", "últimos 7 días", "derniers 30 jours" + * Keywords: "today", "yesterday", "recent" (and multilingual equivalents) + +* **Document timestamp metadata** - Tracks temporal information for each document + * `datetime` - Primary timestamp from document content (user-provided) + * `modified_at` - Document modification timestamp + * `created_at` - Document creation timestamp + * `indexed_at` - When the document was indexed into OpenRAG + +* **Temporal filtering** - Automatically filters search results based on detected time ranges + * Queries like "documents from 2024" only retrieve relevant documents + * "Last week's updates" focuses on recent content + * Works across all retrieval methods (base, multi-query, HyDE) + +* **Temporal scoring in reranking** - Balances relevance with recency + * Combines semantic relevance score with temporal score + * Configurable temporal weight (default: 30% temporal, 70% relevance) + * Linear decay formula favors more recent documents + * Configurable decay period (default: 365 days) + * Priority hierarchy: `datetime` > `modified_at` > `created_at` > `indexed_at` + +* **Temporal-aware prompts** - LLM receives temporal context + * Current date/time injected into system prompt + * Document timestamps included in retrieved chunks + * LLM instructed to consider recency when answering + * Prioritizes newer information for time-sensitive queries + +* **Configuration options** via environment variables: + * `RERANKER_TEMPORAL_WEIGHT` - Weight for temporal scoring (0.0-1.0, default: 0.3) + * `RERANKER_TEMPORAL_DECAY_DAYS` - Days for temporal score decay (default: 365) +
\ No newline at end of file diff --git a/openrag/components/indexer/chunker/chunker.py b/openrag/components/indexer/chunker/chunker.py index 945cb9e2..d1a27992 100644 --- a/openrag/components/indexer/chunker/chunker.py +++ b/openrag/components/indexer/chunker/chunker.py @@ -2,6 +2,7 @@ from abc import ABC, abstractmethod from pathlib import Path from typing import Optional +from datetime import datetime, timezone from components.prompts import CHUNK_CONTEXTUALIZER from components.utils import get_llm_semaphore, load_config @@ -235,12 +236,12 @@ async def split_document(self, doc: Document, task_id: str = None): start_page = page_info["start_page"] end_page = page_info["end_page"] prev_page_num = end_page - filtered_chunks.append( - Document( - page_content=chunk_w_context, - metadata={**metadata, "page": start_page}, - ) - ) + chunk_meta = { + **metadata, + "page": start_page, + "indexed_at": datetime.now(timezone.utc).isoformat(), + } + filtered_chunks.append(Document(page_content=chunk_w_context, metadata=chunk_meta)) log.info("Document chunking completed") return filtered_chunks @@ -352,12 +353,12 @@ async def split_document(self, doc: Document, task_id: str = None): start_page = page_info["start_page"] end_page = page_info["end_page"] prev_page_num = end_page - filtered_chunks.append( - Document( - page_content=chunk_w_context, - metadata={**metadata, "page": start_page}, - ) - ) + chunk_meta = { + **metadata, + "page": start_page, + "indexed_at": datetime.now(timezone.utc).isoformat(), + } + filtered_chunks.append(Document(page_content=chunk_w_context, metadata=chunk_meta)) log.info("Document chunking completed") return filtered_chunks @@ -457,12 +458,12 @@ async def split_document(self, doc: Document, task_id: str = None): start_page = page_info["start_page"] end_page = page_info["end_page"] prev_page_num = end_page - filtered_chunks.append( - Document( - page_content=chunk_w_context, - metadata={**metadata, "page": start_page}, - ) - ) + chunk_meta = { + **metadata, + "page": start_page, + "indexed_at": datetime.now(timezone.utc).isoformat(), + } + filtered_chunks.append(Document(page_content=chunk_w_context, metadata=chunk_meta)) log.info("Document chunking completed") return filtered_chunks diff --git a/openrag/components/indexer/vectordb/vectordb.py b/openrag/components/indexer/vectordb/vectordb.py index f3f0fe39..ba99c847 100644 --- a/openrag/components/indexer/vectordb/vectordb.py +++ b/openrag/components/indexer/vectordb/vectordb.py @@ -262,6 +262,32 @@ def _create_schema(self): max_length=MAX_LENGTH, ) + # Add temporal fields for temporal awareness (priority: datetime > modified_at > created_at > indexed_at) + # datetime: user-provided primary temporal field (highest priority) + schema.add_field( + field_name="datetime", + datatype=DataType.VARCHAR, + max_length=64, + ) + + schema.add_field( + field_name="created_at", + datatype=DataType.VARCHAR, + max_length=64, + ) + + schema.add_field( + field_name="modified_at", + datatype=DataType.VARCHAR, + max_length=64, + ) + + schema.add_field( + field_name="indexed_at", + datatype=DataType.VARCHAR, + max_length=64, + ) + schema.add_field( field_name="vector", datatype=DataType.FLOAT_VECTOR, @@ -303,6 +329,31 @@ def _create_index(self): field_name="partition", index_type="INVERTED", index_name="partition_idx" ) + # Add indexes for temporal fields to enable efficient filtering + index_params.add_index( + field_name="datetime", + index_type="INVERTED", + index_name="datetime_idx", + ) + + index_params.add_index( + field_name="created_at", + index_type="INVERTED", + index_name="created_at_idx", + ) + + index_params.add_index( + field_name="modified_at", + index_type="INVERTED", + index_name="modified_at_idx", + ) + + index_params.add_index( + field_name="indexed_at", + index_type="INVERTED", + index_name="indexed_at_idx", + ) + # Add index for vector field index_params.add_index( field_name="vector", @@ -430,8 +481,49 @@ async def async_search( expr_parts.append(f"partition in {partition}") if filter: + # Handle temporal filters with OR logic across different timestamp fields + # This allows documents to match if ANY of their timestamp fields (datetime, modified_at, created_at, indexed_at) fall within the range + temporal_keys = [ + "datetime_after", "datetime_before", + "created_after", "created_before", + "modified_after", "modified_before", + "indexed_after", "indexed_before" + ] + + # Build temporal conditions with OR logic + temporal_conditions = [] + + # Extract the generic after/before values from created_after/created_before + after_value = filter.get("created_after") or filter.get("datetime_after") or filter.get("modified_after") or filter.get("indexed_after") + before_value = filter.get("created_before") or filter.get("datetime_before") or filter.get("modified_before") or filter.get("indexed_before") + + if after_value or before_value: + # For each timestamp field, create conditions + field_conditions = [] + for field in ["datetime", "modified_at", "created_at", "indexed_at"]: + field_parts = [] + if after_value: + field_parts.append(f"{field} >= '{after_value}'") + if before_value: + field_parts.append(f"{field} <= '{before_value}'") + + if field_parts: + # Each field condition: (field >= after AND field <= before) + field_condition = " and ".join(field_parts) + field_conditions.append(f"({field_condition})") + + # Combine all field conditions with OR + if field_conditions: + temporal_expr = " or ".join(field_conditions) + temporal_conditions.append(f"({temporal_expr})") + + if temporal_conditions: + expr_parts.extend(temporal_conditions) + + # Handle other filters (exact match) for key, value in filter.items(): - expr_parts.append(f"{key} == '{value}'") + if key not in temporal_keys: + expr_parts.append(f"{key} == '{value}'") # Join all parts with " and " only if there are multiple conditions expr = " and ".join(expr_parts) if expr_parts else "" diff --git a/openrag/components/pipeline.py b/openrag/components/pipeline.py index 48bf6f24..a27bedb6 100644 --- a/openrag/components/pipeline.py +++ b/openrag/components/pipeline.py @@ -1,10 +1,13 @@ import copy +import json +from datetime import datetime, timezone from enum import Enum from components.prompts import QUERY_CONTEXTUALIZER_PROMPT, SYS_PROMPT_TMPLT from langchain_core.documents.base import Document from openai import AsyncOpenAI from utils.logger import get_logger +from utils.temporal import TemporalQueryNormalizer from .llm import LLM from .map_reduce import RAGMapReduce @@ -41,9 +44,11 @@ def __init__(self, config) -> None: self.reranker = Reranker(logger, config) async def retrieve_docs( - self, partition: list[str], query: str, use_map_reduce: bool = False + self, partition: list[str], query: str, use_map_reduce: bool = False, temporal_filter: dict = None ) -> list[Document]: - docs = await self.retriever.retrieve(partition=partition, query=query) + docs = await self.retriever.retrieve( + partition=partition, query=query, temporal_filter=temporal_filter + ) top_k = ( max(self.map_reduce_max_docs, self.reranker_top_k) if use_map_reduce @@ -79,6 +84,9 @@ def __init__(self, config) -> None: # map reduce self.map_reduce: RAGMapReduce = RAGMapReduce(config=config) + + # temporal query normalizer + self.temporal_normalizer = TemporalQueryNormalizer() async def generate_query(self, messages: list[dict]) -> str: match RAGMODE(self.rag_mode): @@ -124,11 +132,29 @@ async def _prepare_for_chat_completion(self, partition: list[str], payload: dict metadata = payload.get("metadata", {}) use_map_reduce = metadata.get("use_map_reduce", False) - logger.info("Metadata parameters", use_map_reduce=use_map_reduce) + + # Extract temporal filter from query if not provided in metadata + temporal_filter = metadata.get("temporal_filter", None) + if not temporal_filter: + temporal_filter = self.temporal_normalizer.extract_temporal_filter(query) + if temporal_filter: + logger.info( + "Extracted temporal filter from query", + created_after=temporal_filter.get("created_after"), + created_before=temporal_filter.get("created_before"), + modified_after=temporal_filter.get("modified_after"), + modified_before=temporal_filter.get("modified_before"), + datetime_after=temporal_filter.get("datetime_after"), + datetime_before=temporal_filter.get("datetime_before"), + ) - # 2. get docs + logger.info( + "Metadata parameters", + use_map_reduce=use_map_reduce, + temporal_filter_present=temporal_filter is not None, + ) # 2. get docs docs = await self.retriever_pipeline.retrieve_docs( - partition=partition, query=query, use_map_reduce=use_map_reduce + partition=partition, query=query, use_map_reduce=use_map_reduce, temporal_filter=temporal_filter ) if use_map_reduce and docs: @@ -140,14 +166,25 @@ async def _prepare_for_chat_completion(self, partition: list[str], payload: dict # 4. prepare the output messages: list = copy.deepcopy(messages) + # Get current datetime in UTC for temporal awareness + current_datetime = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") + # prepend the messages with the system prompt + system_prompt_content = SYS_PROMPT_TMPLT.format( + context=context, current_datetime=current_datetime + ) messages.insert( 0, { "role": "system", - "content": SYS_PROMPT_TMPLT.format(context=context), + "content": system_prompt_content, }, ) + + # Debug: log the formatted system prompt + logger.debug("System prompt with context", prompt_length=len(system_prompt_content), doc_count=len(docs)) + logger.debug("Full system prompt", content=system_prompt_content) + payload["messages"] = messages return payload, docs diff --git a/openrag/components/reranker.py b/openrag/components/reranker.py index 474dfd57..faffffdc 100644 --- a/openrag/components/reranker.py +++ b/openrag/components/reranker.py @@ -1,9 +1,11 @@ import asyncio +from datetime import datetime, timezone from infinity_client import Client from infinity_client.api.default import rerank from infinity_client.models import RerankInput, ReRankResult from langchain_core.documents.base import Document + class Reranker: def __init__(self, logger, config): self.model_name = config.reranker["model_name"] @@ -12,7 +14,62 @@ def __init__(self, logger, config): self.semaphore = asyncio.Semaphore( 5 ) # Only allow 5 reranking operation at a time - self.logger.debug("Reranker initialized", model_name=self.model_name) + + # Temporal scoring parameters + self.temporal_weight = config.reranker.get("temporal_weight", 0.3) # Weight for temporal scoring + self.temporal_decay_days = config.reranker.get("temporal_decay_days", 365) # Days for full decay + + self.logger.debug( + "Reranker initialized", + model_name=self.model_name, + temporal_weight=self.temporal_weight, + temporal_decay_days=self.temporal_decay_days, + ) + + def _calculate_temporal_score(self, doc: Document) -> float: + """ + Calculate temporal score based on document recency. + More recent documents get higher scores (0.0 to 1.0). + + Uses linear decay based on document age. + Priority: datetime > modified_at > created_at > indexed_at + """ + try: + # Try datetime first (user-provided), then modified_at, then created_at, then indexed_at + date_str = ( + doc.metadata.get("datetime") or + doc.metadata.get("modified_at") or + doc.metadata.get("created_at") or + doc.metadata.get("indexed_at") + ) + + if not date_str: + # No temporal information, return neutral score + return 0.5 + + # Parse the date and ensure it's UTC-aware + if 'T' in date_str: + doc_date = datetime.fromisoformat(date_str.replace('Z', '+00:00')) + else: + doc_date = datetime.fromisoformat(date_str) + + # Ensure timezone awareness - assume UTC if naive + if doc_date.tzinfo is None: + doc_date = doc_date.replace(tzinfo=timezone.utc) + + # Calculate age in days using UTC now + now = datetime.now(timezone.utc) + days_old = (now - doc_date).total_seconds() / 86400 + + # Linear decay formula + # Score decreases linearly from 1.0 (today) to 0.0 (temporal_decay_days ago) + temporal_score = max(0.0, min(1.0, (1.0 - days_old / self.temporal_decay_days))) + + return temporal_score + + except Exception as e: + self.logger.warning(f"Error calculating temporal score: {e}") + return 0.5 # Neutral score on error async def rerank( self, query: str, documents: list[Document], top_k: int @@ -39,8 +96,34 @@ async def rerank( output = [] for rerank_res in rerank_result.results: doc = documents[rerank_res.index] - doc.metadata["relevance_score"] = rerank_res.relevance_score + relevance_score = rerank_res.relevance_score + + # Calculate temporal score + temporal_score = self._calculate_temporal_score(doc) + + # Combine relevance and temporal scores + # Final score = (1 - temporal_weight) * relevance + temporal_weight * temporal + combined_score = ( + (1 - self.temporal_weight) * relevance_score + + self.temporal_weight * temporal_score + ) + + # Store all scores in metadata + doc.metadata["relevance_score"] = relevance_score + doc.metadata["temporal_score"] = temporal_score + doc.metadata["combined_score"] = combined_score + output.append(doc) + + # Re-sort by combined score (descending) + output.sort(key=lambda d: d.metadata.get("combined_score", 0), reverse=True) + + self.logger.debug( + "Reranking complete with temporal scoring", + documents_returned=len(output), + temporal_weight=self.temporal_weight, + ) + return output except Exception as e: diff --git a/openrag/components/retriever.py b/openrag/components/retriever.py index 1b675e50..69ba5ae8 100644 --- a/openrag/components/retriever.py +++ b/openrag/components/retriever.py @@ -1,5 +1,6 @@ # Import necessary modules and classes from abc import ABC, abstractmethod +from typing import Optional from components.prompts import HYDE_PROMPT, MULTI_QUERY_PROMPT from langchain_core.documents.base import Document @@ -26,7 +27,12 @@ def __init__( pass @abstractmethod - async def retrieve(self, partition: list[str], query: str) -> list[Document]: + async def retrieve( + self, + partition: list[str], + query: str, + temporal_filter: Optional[dict] = None, + ) -> list[Document]: pass @@ -41,13 +47,37 @@ async def retrieve( self, partition: list[str], query: str, + temporal_filter: Optional[dict] = None, ) -> list[Document]: db = get_vectordb() + + # Build filter with temporal constraints if provided + # Priority: datetime > modified_at > created_at > indexed_at + filter_dict = {} + if temporal_filter: + if "datetime_after" in temporal_filter: + filter_dict["datetime_after"] = temporal_filter["datetime_after"] + if "datetime_before" in temporal_filter: + filter_dict["datetime_before"] = temporal_filter["datetime_before"] + if "created_after" in temporal_filter: + filter_dict["created_after"] = temporal_filter["created_after"] + if "created_before" in temporal_filter: + filter_dict["created_before"] = temporal_filter["created_before"] + if "modified_after" in temporal_filter: + filter_dict["modified_after"] = temporal_filter["modified_after"] + if "modified_before" in temporal_filter: + filter_dict["modified_before"] = temporal_filter["modified_before"] + if "indexed_after" in temporal_filter: + filter_dict["indexed_after"] = temporal_filter["indexed_after"] + if "indexed_before" in temporal_filter: + filter_dict["indexed_before"] = temporal_filter["indexed_before"] + chunks = await db.async_search.remote( query=query, partition=partition, top_k=self.top_k, similarity_threshold=self.similarity_threshold, + filter=filter_dict if filter_dict else None, ) return chunks @@ -79,7 +109,9 @@ def __init__( prompt | llm | StrOutputParser() | (lambda x: x.split("[SEP]")) ) - async def retrieve(self, partition: list[str], query: str) -> list[Document]: + async def retrieve( + self, partition: list[str], query: str, temporal_filter: Optional[dict] = None + ) -> list[Document]: db = get_vectordb() logger.debug("Generating multiple queries", k_queries=self.k_queries) generated_queries = await self.generate_queries.ainvoke( @@ -88,11 +120,34 @@ async def retrieve(self, partition: list[str], query: str) -> list[Document]: "k_queries": self.k_queries, } ) + + # Build filter with temporal constraints if provided + # Priority: datetime > modified_at > created_at > indexed_at + filter_dict = {} + if temporal_filter: + if "datetime_after" in temporal_filter: + filter_dict["datetime_after"] = temporal_filter["datetime_after"] + if "datetime_before" in temporal_filter: + filter_dict["datetime_before"] = temporal_filter["datetime_before"] + if "created_after" in temporal_filter: + filter_dict["created_after"] = temporal_filter["created_after"] + if "created_before" in temporal_filter: + filter_dict["created_before"] = temporal_filter["created_before"] + if "modified_after" in temporal_filter: + filter_dict["modified_after"] = temporal_filter["modified_after"] + if "modified_before" in temporal_filter: + filter_dict["modified_before"] = temporal_filter["modified_before"] + if "indexed_after" in temporal_filter: + filter_dict["indexed_after"] = temporal_filter["indexed_after"] + if "indexed_before" in temporal_filter: + filter_dict["indexed_before"] = temporal_filter["indexed_before"] + chunks = await db.async_multi_query_search.remote( queries=generated_queries, partition=partition, top_k_per_query=self.top_k, similarity_threshold=self.similarity_threshold, + filter=filter_dict if filter_dict else None, ) return chunks @@ -121,18 +176,40 @@ async def get_hyde(self, query: str): hyde_document = await self.hyde_generator.ainvoke({"query": query}) return hyde_document - async def retrieve(self, partition: list[str], query: str) -> list[Document]: + async def retrieve(self, partition: list[str], query: str, temporal_filter: Optional[dict] = None) -> list[Document]: db = get_vectordb() hyde = await self.get_hyde(query) queries = [hyde] if self.combine: queries.append(query) + # Build filter with temporal constraints if provided + # Priority: datetime > modified_at > created_at > indexed_at + filter_dict = {} + if temporal_filter: + if "datetime_after" in temporal_filter: + filter_dict["datetime_after"] = temporal_filter["datetime_after"] + if "datetime_before" in temporal_filter: + filter_dict["datetime_before"] = temporal_filter["datetime_before"] + if "created_after" in temporal_filter: + filter_dict["created_after"] = temporal_filter["created_after"] + if "created_before" in temporal_filter: + filter_dict["created_before"] = temporal_filter["created_before"] + if "modified_after" in temporal_filter: + filter_dict["modified_after"] = temporal_filter["modified_after"] + if "modified_before" in temporal_filter: + filter_dict["modified_before"] = temporal_filter["modified_before"] + if "indexed_after" in temporal_filter: + filter_dict["indexed_after"] = temporal_filter["indexed_after"] + if "indexed_before" in temporal_filter: + filter_dict["indexed_before"] = temporal_filter["indexed_before"] + return await db.async_multi_query_search.remote( queries=queries, partition=partition, top_k_per_query=self.top_k, similarity_threshold=self.similarity_threshold, + filter=filter_dict if filter_dict else None, ) diff --git a/openrag/components/utils.py b/openrag/components/utils.py index 35510e19..e92c9c7e 100644 --- a/openrag/components/utils.py +++ b/openrag/components/utils.py @@ -122,16 +122,36 @@ def format_context(docs: list[Document]) -> str: if not docs: return "No document found from the database" - context = "Extracted documents:\n" + def format_date(iso_date: str) -> str: + """Convert ISO date to readable format: 2025-11-02 14:30""" + try: + from datetime import datetime + dt = datetime.fromisoformat(iso_date.replace('Z', '+00:00')) + return dt.strftime("%Y-%m-%d %H:%M") + except: + return iso_date + + context = "Extracted documents:\n\n" for i, doc in enumerate(docs, start=1): - # doc_id = f"[doc_{i}]" - # document = f""" - # *source*: {doc_id} - # content: \n{doc.page_content.strip()}\n - # """ - document = f"""content: \n{doc.page_content.strip()}\n""" + # Build temporal metadata in a readable format + temporal_parts = [] + if doc.metadata.get("datetime"): + temporal_parts.append(f"Document date: {format_date(doc.metadata['datetime'])}") + elif doc.metadata.get("modified_at"): + temporal_parts.append(f"Last modified: {format_date(doc.metadata['modified_at'])}") + elif doc.metadata.get("created_at"): + temporal_parts.append(f"Created: {format_date(doc.metadata['created_at'])}") + elif doc.metadata.get("indexed_at"): + temporal_parts.append(f"Indexed: {format_date(doc.metadata['indexed_at'])}") + + # Format document with temporal metadata in a natural way + document = f"Document {i}:\n" + if temporal_parts: + document += f"{temporal_parts[0]}\n" + document += f"{doc.page_content.strip()}\n" + context += document - context += "-" * 40 + "\n\n" + context += "-" * 60 + "\n\n" return context diff --git a/openrag/routers/indexer.py b/openrag/routers/indexer.py index cf60d276..f69f6152 100644 --- a/openrag/routers/indexer.py +++ b/openrag/routers/indexer.py @@ -1,5 +1,5 @@ import json -from datetime import datetime +from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional @@ -137,7 +137,14 @@ async def add_file( # Append extra metadata metadata["file_size"] = human_readable_size(file_stat.st_size) - metadata["created_at"] = datetime.fromtimestamp(file_stat.st_ctime).isoformat() + + # Use provided created_at if available, otherwise extract from file system + if "created_at" not in metadata or not metadata["created_at"]: + metadata["created_at"] = datetime.fromtimestamp(file_stat.st_ctime, tz=timezone.utc).isoformat() + + # Extract file modification time (always from file system) + metadata["modified_at"] = datetime.fromtimestamp(file_stat.st_mtime, tz=timezone.utc).isoformat() + metadata["file_id"] = file_id # Indexing the file @@ -215,7 +222,14 @@ async def put_file( # Append extra metadata metadata["file_size"] = human_readable_size(file_stat.st_size) - metadata["created_at"] = datetime.fromtimestamp(file_stat.st_ctime).isoformat() + + # Use provided created_at if available, otherwise extract from file system + if "created_at" not in metadata or not metadata["created_at"]: + metadata["created_at"] = datetime.fromtimestamp(file_stat.st_ctime, tz=timezone.utc).isoformat() + + # Extract file modification time (always from file system) + metadata["modified_at"] = datetime.fromtimestamp(file_stat.st_mtime, tz=timezone.utc).isoformat() + metadata["file_id"] = file_id # Indexing the file diff --git a/openrag/utils/temporal.py b/openrag/utils/temporal.py new file mode 100644 index 00000000..823052a5 --- /dev/null +++ b/openrag/utils/temporal.py @@ -0,0 +1,286 @@ +""" +Temporal utilities for query normalization and date extraction. +Language-agnostic temporal expression extraction. +""" + +import re +from datetime import datetime, timedelta, timezone +from typing import Optional, Dict, Tuple + + +class TemporalQueryNormalizer: + """ + Extracts temporal expressions from queries and normalizes them to date ranges. + Works across languages by using: + 1. Universal date patterns (ISO dates, numeric formats) + 2. Number-based relative time extraction + 3. Common English patterns as fallback + """ + + def __init__(self): + # Universal date patterns (work across all languages) + # Order matters - more specific patterns first! + self.universal_patterns = { + # ISO date format: 2024-01-15, 2024-01-15T10:30:00 (HIGHEST PRIORITY) + r'(\d{4})-(\d{2})-(\d{2})(?:T[\d:]+)?': self._parse_iso_date, + + # Numeric date formats: 15/01/2024, 01/15/2024, 15.01.2024 (BEFORE year-only!) + r'\b(\d{1,2})[/\.\-](\d{1,2})[/\.\-](\d{4})\b': self._parse_numeric_date, + + # Month-year: 01/2024, 01-2024 + r'\b(\d{1,2})[/\-](\d{4})\b': self._parse_month_year, + + # Year-month: 2024/01, 2024-01 + r'\b(\d{4})[/\-](\d{1,2})\b': self._parse_month_year_reverse, + + # Year only: 2024, 2023 (LOWEST PRIORITY - catches lone years) + r'\b(20\d{2})\b': self._parse_year_only, + } + + # Pattern to extract numbers for relative time (language-agnostic) + # Examples: "7 días", "30 jours", "14 Tage", "últimos 7", "derniers 30" + # Matches: number + optional space + word OR word + optional space + number + self.relative_number_pattern = r'(\d+)\s*\w+|\w+\s+(\d+)' + + # English patterns for backward compatibility + self.english_patterns = { + r'\b(today|aujourd\'hui|heute|hoy|oggi|hoje)\b': lambda: self._get_today(), + r'\b(yesterday|hier|ayer|ieri|ontem)\b': lambda: self._get_yesterday(), + r'\b(last|past|recent)\b': lambda: self._get_last_n_days(30), + } + + def _parse_iso_date(self, match) -> Tuple[datetime, datetime]: + """Parse ISO date format: YYYY-MM-DD or YYYY/MM/DD""" + year, month, day = int(match.group(1)), int(match.group(2)), int(match.group(3)) + return self._get_specific_date(f"{year:04d}-{month:02d}-{day:02d}") + + def _parse_year_only(self, match) -> Tuple[datetime, datetime]: + """Parse year only: 2024""" + year = int(match.group(1)) + return self._get_year_range(year) + + def _parse_numeric_date(self, match) -> Tuple[datetime, datetime]: + """Parse numeric date: DD/MM/YYYY or MM/DD/YYYY""" + # Try DD/MM/YYYY format first (more common internationally) + try: + day, month, year = int(match.group(1)), int(match.group(2)), int(match.group(3)) + if 1 <= month <= 12 and 1 <= day <= 31: + return self._get_specific_date(f"{year:04d}-{month:02d}-{day:02d}") + except: + pass + + # Try MM/DD/YYYY format (US) + try: + month, day, year = int(match.group(1)), int(match.group(2)), int(match.group(3)) + if 1 <= month <= 12 and 1 <= day <= 31: + return self._get_specific_date(f"{year:04d}-{month:02d}-{day:02d}") + except: + pass + + # Default to current month if parsing fails + return self._get_this_month() + + def _parse_month_year(self, match) -> Tuple[datetime, datetime]: + """Parse month-year: MM/YYYY or MM-YYYY""" + month, year = int(match.group(1)), int(match.group(2)) + if 1 <= month <= 12: + return self._get_month_year_range_numeric(month, year) + return self._get_year_range(year) + + def _parse_month_year_reverse(self, match) -> Tuple[datetime, datetime]: + """Parse year-month: YYYY/MM or YYYY-MM""" + year, month = int(match.group(1)), int(match.group(2)) + if 1 <= month <= 12: + return self._get_month_year_range_numeric(month, year) + return self._get_year_range(year) + + def _get_today(self): + """Get date range for today in UTC.""" + now = datetime.now(timezone.utc) + start = now.replace(hour=0, minute=0, second=0, microsecond=0) + end = now.replace(hour=23, minute=59, second=59, microsecond=999999) + return start, end + + def _get_yesterday(self): + """Get date range for yesterday in UTC.""" + yesterday = datetime.now(timezone.utc) - timedelta(days=1) + start = yesterday.replace(hour=0, minute=0, second=0, microsecond=0) + end = yesterday.replace(hour=23, minute=59, second=59, microsecond=999999) + return start, end + + def _get_this_week(self): + """Get date range for current week (Monday to Sunday) in UTC.""" + now = datetime.now(timezone.utc) + start = now - timedelta(days=now.weekday()) # Monday + start = start.replace(hour=0, minute=0, second=0, microsecond=0) + end = start + timedelta(days=6, hours=23, minutes=59, seconds=59, microseconds=999999) + return start, end + + def _get_this_month(self): + """Get date range for current month in UTC.""" + now = datetime.now(timezone.utc) + start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + # Last day of current month + if now.month == 12: + end = now.replace(day=31, hour=23, minute=59, second=59, microsecond=999999) + else: + end = (now.replace(month=now.month + 1, day=1) - timedelta(days=1)).replace( + hour=23, minute=59, second=59, microsecond=999999 + ) + return start, end + + def _get_this_year(self): + """Get date range for current year in UTC.""" + now = datetime.now(timezone.utc) + start = now.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0) + end = now.replace(month=12, day=31, hour=23, minute=59, second=59, microsecond=999999) + return start, end + + def _get_last_n_days(self, n: int): + """Get date range for last N days from now in UTC.""" + end = datetime.now(timezone.utc) + start = end - timedelta(days=n) + return start, end + + def _get_year_range(self, year: int): + """Get date range for a specific year in UTC.""" + start = datetime(year, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + end = datetime(year, 12, 31, 23, 59, 59, 999999, tzinfo=timezone.utc) + return start, end + + def _get_month_year_range_numeric(self, month: int, year: int): + """Get date range for a specific month and year (numeric input) in UTC.""" + start = datetime(year, month, 1, 0, 0, 0, tzinfo=timezone.utc) + + # Get last day of month + if month == 12: + end = datetime(year, 12, 31, 23, 59, 59, 999999, tzinfo=timezone.utc) + else: + end = (datetime(year, month + 1, 1, tzinfo=timezone.utc) - timedelta(days=1)).replace( + hour=23, minute=59, second=59, microsecond=999999 + ) + return start, end + + def _get_specific_date(self, date_str: str): + """Get date range for a specific date (full day) in UTC.""" + date = datetime.fromisoformat(date_str) + # Ensure timezone is UTC + if date.tzinfo is None: + date = date.replace(tzinfo=timezone.utc) + start = date.replace(hour=0, minute=0, second=0, microsecond=0) + end = date.replace(hour=23, minute=59, second=59, microsecond=999999) + return start, end + + def _extract_relative_time_from_number(self, query: str) -> Optional[Tuple[datetime, datetime]]: + """ + Extract relative time from numbers in the query (language-agnostic). + Handles both "7 days" and "últimos 7" patterns. + Examples: "7 días", "30 jours", "14 Tage", "últimos 7", "derniers 30" + """ + matches = re.findall(self.relative_number_pattern, query) + + for match in matches: + try: + # Extract number from either position (before or after word) + number = None + if match[0] and match[0].strip(): # Number before word: "7 días" + number = int(match[0]) + elif match[1] and match[1].strip(): # Number after word: "últimos 7" + number = int(match[1]) + + if number is None: + continue + + # If number is reasonable for days/weeks/months + if 1 <= number <= 365: + # Most likely referring to days if small number + if number <= 31: + return self._get_last_n_days(number) + # Larger numbers likely weeks or days + elif number <= 52: + # Could be weeks, treat as days + return self._get_last_n_days(number) + else: + # Large number, likely days + return self._get_last_n_days(number) + + except (ValueError, IndexError): + continue + + return None + + def extract_temporal_filter(self, query: str) -> Optional[Dict[str, str]]: + """ + Extract temporal expressions from a query and convert to filter dict. + Works across multiple languages. + + Returns: + Dict with keys like 'created_after', 'created_before' + or None if no temporal expression is found. + """ + # 1. Try universal date patterns first (ISO dates, numeric dates, years) + for pattern, parse_func in self.universal_patterns.items(): + match = re.search(pattern, query) + if match: + try: + start, end = parse_func(match) + return { + 'created_after': start.isoformat(), + 'created_before': end.isoformat(), + } + except: + continue + + # 2. Try extracting numbers for relative time (language-agnostic) + relative_result = self._extract_relative_time_from_number(query) + if relative_result: + start, end = relative_result + return { + 'created_after': start.isoformat(), + 'created_before': end.isoformat(), + } + + # 3. Try English patterns as fallback + query_lower = query.lower() + for pattern, date_func in self.english_patterns.items(): + match = re.search(pattern, query_lower, re.IGNORECASE) + if match: + try: + start, end = date_func() + return { + 'created_after': start.isoformat(), + 'created_before': end.isoformat(), + } + except: + continue + + return None + + def augment_query(self, query: str, temporal_filter: Optional[Dict[str, str]] = None) -> str: + """ + Augment query with temporal context if temporal filter is detected. + + Args: + query: Original user query + temporal_filter: Extracted temporal filter dict + + Returns: + Augmented query string + """ + if not temporal_filter: + temporal_filter = self.extract_temporal_filter(query) + + if temporal_filter: + # Add temporal context to the query for better retrieval + date_context = [] + if 'created_after' in temporal_filter: + date_after = datetime.fromisoformat(temporal_filter['created_after']) + date_context.append(f"from {date_after.strftime('%B %Y')}") + if 'created_before' in temporal_filter: + date_before = datetime.fromisoformat(temporal_filter['created_before']) + date_context.append(f"until {date_before.strftime('%B %Y')}") + + if date_context: + return f"{query} ({' '.join(date_context)})" + + return query diff --git a/prompts/example1/sys_prompt_tmpl.txt b/prompts/example1/sys_prompt_tmpl.txt index 81d472bb..4becdebe 100644 --- a/prompts/example1/sys_prompt_tmpl.txt +++ b/prompts/example1/sys_prompt_tmpl.txt @@ -2,6 +2,8 @@ You are an AI conversational assistant specialized in **information retrieval an Your goal is to provide **precise, reliable, and well-structured answers** using **only the retrieved documents** (`Context`). Prioritize **clarity, accuracy, and completeness** in your responses. +**Current Date and Time:** {current_datetime} + ## Rules 1. Use only the provided Context @@ -10,10 +12,17 @@ Prioritize **clarity, accuracy, and completeness** in your responses. * If the context is **insufficient**, **invite the user** to clarify their query or provide additional keywords. 2. Language Consistency - * Always respond **in the same language** as the user’s query. + * Always respond **in the same language** as the user's query. 3. Structure and Readability * Use **headings**, **bullet points**, **numbered lists**, or **tables** to organize information clearly. * Ensure responses are **concise yet complete**, avoiding omission of key details. +4. Temporal Awareness + * Pay attention to the **temporal context** of both the query and the retrieved documents. + * Each document includes **creation_date** and **indexed_date** metadata indicating when it was created and indexed. + * When the user asks about **recent events**, **latest updates**, or uses temporal references (e.g., "last week", "yesterday", "this year"), prioritize documents with **more recent dates**. + * If documents contain conflicting information, prefer **newer documents** unless the user explicitly requests historical information. + * When appropriate, mention the **date or timeframe** of the information you're presenting to help users understand its recency. + Here are the retrieved documents: `{context}` \ No newline at end of file