diff --git a/.github/workflows/email_test.yaml b/.github/workflows/email_test.yaml new file mode 100644 index 00000000..517ac761 --- /dev/null +++ b/.github/workflows/email_test.yaml @@ -0,0 +1,102 @@ +name: E-mail test + +on: + push: + workflow_dispatch: + +jobs: + email-test: + runs-on: ubuntu-latest + steps: + - name: Update apt and install packages + run: | + df -h + echo ${PWD} + sudo apt update + sudo apt install -y curl git jq + sudo apt clean + sudo apt autoremove --purge -y + + - + name: Set up Docker Compose + uses: docker/setup-compose-action@v1 + with: + version: v2.34.0 + + - + name: Free up space and move Docker storage to /mnt + run: | + echo "Stopping Docker..." + sudo systemctl stop docker + + echo "Creating new Docker root at /mnt/docker..." + sudo mkdir -p /mnt/docker + sudo rsync -aqxP /var/lib/docker/ /mnt/docker + + echo "Updating Docker daemon config..." + echo '{"data-root": "/mnt/docker"}' | sudo tee /etc/docker/daemon.json + + cat /etc/docker/daemon.json + + echo "Restarting Docker..." + sudo systemctl start docker + + echo "Verifying Docker root directory:" + docker info | grep "Docker Root Dir" + + - + name: Checkout current branch + uses: actions/checkout@v5 + with: + submodules: true + + - + name: Set up env + run: | + cp .github/workflows/email_test/.env ./ + cp .github/workflows/email_test/*.yaml ./ + + - + name: Patch vllm to v0.9.2 + run: sed -Ei 's/checkout v[0-9\.]+/checkout v0.9.2/' extern/vllm/Dockerfile.cpu + + - + name: Build + run: docker compose --profile cpu build + + - + name: Run + run: | + docker compose --profile cpu up -d || docker logs openrag-vllm-cpu-1 + .github/workflows/smoke_test/wait_for_healthy.sh openrag-vllm-cpu-1 + + - + name: Cleanup + run: | + docker container prune -f + docker image prune -f + docker builder prune -f + df -h + + - + name: List containers + run: docker container ls + + - + name: Install Python venv + run: | + python3 -m venv venv + source venv/bin/activate + pip3 install -r utility/requirements.txt + + - + name: Wait for OpenRag to start + run: | + .github/workflows/related_docs_test/wait_for_services.sh + + - + name: Execute test + run: | + .github/workflows/email_test/execute_test.sh emails + + diff --git a/.github/workflows/email_test/.env b/.github/workflows/email_test/.env new file mode 100644 index 00000000..98eb8de3 --- /dev/null +++ b/.github/workflows/email_test/.env @@ -0,0 +1,85 @@ +# LLM +BASE_URL=http://mock-llm:8080/v1/ +API_KEY=sk- +MODEL=mock-model +LLM_SEMAPHORE=10 + +# VLM +VLM_BASE_URL=http://mock-llm:8080/v1/ +VLM_API_KEY=sk- +VLM_MODEL=mock-model +VLM_SEMAPHORE=10 + +# LLM for benchmark purpose, using LLM to judge the output +# You can use the same endpoint as the one used for OpenRAG +JUDGE_BASE_URL= +JUDGE_MODEL= +JUDGE_API_KEY= + +# App +APP_PORT=8080 # this is the forwarded port +API_NUM_WORKERS=1 # Number of uvicorn workers for the FastAPI app +# ENABLE_RAY_SERVE=true # Use Ray Serve for the API instead of Uvicorn + +RAGMODE=SimpleRag + +# # VLLM +# VLLM_PORT=8000 + +# RETRIEVER +RETRIEVER_TYPE=email +CONTEXTUAL_RETRIEVAL=false +RETRIEVER_TOP_K=20 # Number of documents to return before reranking + +# EMBEDDER +EMBEDDER_MODEL_NAME=Qwen/Qwen3-Embedding-0.6B +EMBEDDER_BASE_URL=http://vllm:8000/v1 +EMBEDDER_API_KEY=EMPTY + +# RERANKER +RERANKER_ENABLED=false +RERANKER_MODEL=Alibaba-NLP/gte-multilingual-reranker-base # or jinaai/jina-reranker-v2-base-multilingual or jinaai/jina-colbert-v2 if you want +RERANKER_TOP_K=20 # Number of documents to return after reranking. upgrade to 8 for better results if your llm has a wider context window + +# Prompts +PROMPTS_DIR=../prompts/example1 # you ca + +# Loaders +PDFLoader=MarkerLoader +XDG_CACHE_HOME=/app/model_weights +# If using MarkerLoader +MARKER_MAX_TASKS_PER_CHILD=100 +MARKER_MAX_PROCESSES=1 +MARKER_MIN_PROCESSES=1 +# MARKER_POOL_SIZE=1 # Value au increment if you have a cluster of machines +MARKER_NUM_GPUS=0.01 + +# set to true if you want to deploy chainlit ui +WITH_CHAINLIT_UI=true + +# Ray +RAY_DEDUP_LOGS=0 +RAY_DASHBOARD_PORT=8265 + +RAY_NUM_GPUS=0.1 +RAY_POOL_SIZE=1 # Number of serializer actor instances +RAY_MAX_TASKS_PER_WORKER=2 # Number of tasks per serializer instance +RAY_DASHBOARD_PORT=8265 +RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING=1 +RAY_task_retry_delay_ms=3000 +RAY_ENABLE_UV_RUN_RUNTIME_ENV=0 +RAY_memory_monitor_refresh_ms=0 + +# SAVE UPLOADED FILES +SAVE_UPLOADED_FILES=true # usefull for chainlit source viewing + + +## Variables to add to activate indexer-ui +INDEXERUI_COMPOSE_FILE=extern/indexer-ui/docker-compose.yaml # Required path to the docker-compose file +INDEXERUI_PORT=8067 # Port to expose the Indexer UI (default is 3042) +INDEXERUI_URL='http://10.3.0.139:8067' # Base URL of the Indexer UI (required to prevent CORS issues) +VITE_API_BASE_URL='http://10.3.0.139:8080' # Base URL of your FastAPI backend. Used by the frondend +VITE_INCLUDE_CREDENTIALS=true + +SUPER_ADMIN_MODE=true + diff --git a/.github/workflows/email_test/docker-compose.yaml b/.github/workflows/email_test/docker-compose.yaml new file mode 100644 index 00000000..3ea6d3b2 --- /dev/null +++ b/.github/workflows/email_test/docker-compose.yaml @@ -0,0 +1,144 @@ +include: + - vdb/milvus.yaml + +x-openrag: &openrag_template + build: + context: . + dockerfile: Dockerfile + volumes: + - ${CONFIG_VOLUME:-./.hydra_config}:/app/.hydra_config # For dev mode + - ${DATA_VOLUME:-./data}:/app/data + - ${MODEL_WEIGHTS_VOLUME:-~/.cache/huggingface}:/app/model_weights # Model weights for RAG + - ./openrag:/app/openrag # For dev mode + - /$SHARED_ENV:/ray_mount/.env # Shared environment variables + - ./logs:/app/logs # For dev mode + ports: + - ${APP_PORT:-8080}:${APP_iPORT:-8080} + - ${RAY_DASHBOARD_PORT:-8265}:8265 # Disable when in cluster mode + networks: + default: + aliases: + - openrag + env_file: + - ${SHARED_ENV:-.env} + shm_size: 10.24gb + +x-vllm: &vllm_template + networks: + default: + aliases: + - vllm + restart: on-failure + environment: + - HUGGING_FACE_HUB_TOKEN + ipc: "host" + volumes: + - ${VLLM_CACHE:-/root/.cache/huggingface}:/root/.cache/huggingface # put ./vllm_cache if you want to have the weights on the vllm_cache folder in your project + command: > + --model ${EMBEDDER_MODEL_NAME:-jinaai/jina-embeddings-v3} + --trust-remote-code + --task embed + --gpu_memory_utilization 0.3 + --max-model-len ${MAX_MODEL_LEN:-8194} + # --max-num-seqs 1 + # gpu_memory_utilization, max-num-seqs et max-model-len can be tuned depending on your GPU memory + + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/health"] + interval: 20s + timeout: 5s + retries: 4 + start_period: 90s + # ports: + # - ${VLLM_PORT:-8000}:8000 +services: + # GPU - default + openrag: + <<: *openrag_template + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: all + capabilities: [gpu] + profiles: + - "" + depends_on: + rdb: + condition: service_started + milvus: + condition: service_healthy + vllm-gpu: + condition: service_healthy + + # No GPU + openrag-cpu: + <<: *openrag_template + deploy: {} + profiles: + - "cpu" + depends_on: + rdb: + condition: service_started + milvus: + condition: service_healthy +# vllm-cpu: +# condition: service_healthy + + rdb: + image: postgres:15 + environment: + - POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-root_password} + - POSTGRES_USER=${POSTGRES_USER:-root} + volumes: + - ${DB_VOLUME:-./db}:/var/lib/postgresql/data + expose: + - 5432 + + vllm-gpu: + <<: *vllm_template + image: vllm/vllm-openai:v0.9.2 + runtime: nvidia + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: all + capabilities: [gpu] + profiles: + - "" # Empty string gives default behavior (but does not run when cpu requested) + + vllm-cpu: + <<: *vllm_template + build: + context: extern/vllm + dockerfile: Dockerfile.cpu + target: vllm-openai + image: openrag-vllm-openai-cpu + deploy: {} + environment: + - VLLM_CPU_KVCACHE_SPACE=8 + # Default value isn't sufficient for full context length + command: > + --model ${EMBEDDER_MODEL_NAME:-jinaai/jina-embeddings-v3} + --trust-remote-code + --dtype float32 + --max-model-len ${MAX_MODEL_LEN:-8194} + # dtype is required for aarch64 (https://github.com/vllm-project/vllm/issues/11327) and improves speed on amd64. + # max-num-batched-tokens is required for aarch64 because chunked prefill isn't supported by V1 vllm backend + # for aarch64 yet. On aarch64 max-num-batched-tokens must be equal max-model-len for now (without chunked prefill). + # For details see https://github.com/vllm-project/vllm/issues/21179 + profiles: + - "cpu" + + mock-llm: + build: + context: .github/workflows/related_docs_test/mock-llm + dockerfile: Dockerfile.mock-llm + ports: + - 8001:8080 + profiles: + - 'cpu' + diff --git a/.github/workflows/email_test/execute_test.sh b/.github/workflows/email_test/execute_test.sh new file mode 100755 index 00000000..0e0be85a --- /dev/null +++ b/.github/workflows/email_test/execute_test.sh @@ -0,0 +1,133 @@ +#!/usr/bin/env bash +set -euo pipefail + +PARTITION_NAME=$1 + +# Thread 1 +echo "Thread 1: root e-mail" \ + | .github/workflows/email_test/index_root_email.sh http://localhost:8080 ${PARTITION_NAME} thread1_root.txt thread1_root.txt + +sleep 2s + +echo "Thread 1 child 1 ( 1 . 1 ) Penguinone is an organic compound with the molecular formula C10H14O. Its name comes from the fact that its 2-dimensional molecular structure resembles a penguin." \ + | .github/workflows/email_test/index_child_email.sh http://localhost:8080 ${PARTITION_NAME} \ + thread1_child1.txt thread1_root.txt thread1_root.txt + +sleep 2s + +for k in `seq 1 3`; +do + echo "Thread 1 leaf e-mail $k for child 1 ( 1 . 1 . $k )" \ + | .github/workflows/email_test/index_child_email.sh http://localhost:8080 ${PARTITION_NAME} \ + thread1_child1_leaf${k}.txt thread1_child1.txt thread1_root.txt + echo + echo thread1_child1_leaf${k}.txt + sleep 2s +done + +for j in `seq 2 3`; +do + echo "Thread 1 child $j ( 1 . $j )" \ + | .github/workflows/email_test/index_child_email.sh http://localhost:8080 ${PARTITION_NAME} \ + thread1_child${j}.txt thread1_root.txt thread1_root.txt + + sleep 2s + + for k in `seq 1 3`; + do + echo "Thread 1 leaf e-mail $k for child $j ( 1 . $j . $k )" \ + | .github/workflows/email_test/index_child_email.sh http://localhost:8080 ${PARTITION_NAME} \ + thread1_child${j}_leaf${k}.txt thread1_child${j}.txt thread1_root.txt + + sleep 2s + done +done + + +# Thread 2 +echo "Thread 2: root e-mail" \ + | .github/workflows/email_test/index_root_email.sh http://localhost:8080 ${PARTITION_NAME} thread2_root.txt thread2_root.txt + +sleep 2s + +echo "Thread 1 child 1 ( 1 . 1 ) Administratively, Paris is divided into twenty arrondissements (districts), each having their own cultural identity." \ + | .github/workflows/email_test/index_child_email.sh http://localhost:8080 ${PARTITION_NAME} \ + thread2_child1.txt thread2_root.txt thread2_root.txt + +sleep 2s + +for k in `seq 1 3`; +do + echo "Thread 2 leaf e-mail $k for child 1 ( 2 . 1 . $k )" \ + | .github/workflows/email_test/index_child_email.sh http://localhost:8080 ${PARTITION_NAME} \ + thread2_child1_leaf${k}.txt thread2_child1.txt thread2_root.txt + + sleep 2s +done + +for j in `seq 2 3`; +do + echo "Thread 2 child $j ( 2 . $j )" \ + | .github/workflows/email_test/index_child_email.sh http://localhost:8080 ${PARTITION_NAME} \ + thread2_child${j}.txt thread2_root.txt thread2_root.txt + + sleep 2s + + for k in `seq 1 3`; + do + echo "Thread 2 leaf e-mail $k for child $j ( 2 . $j . $k )" \ + | .github/workflows/email_test/index_child_email.sh http://localhost:8080 ${PARTITION_NAME} \ + thread2_child${j}_leaf${k}.txt thread2_child${j}.txt thread2_root.txt + + sleep 2s + done +done + +# Verification : thread 1 + +# Positive tests +for target in thread1_root thread1_child1 thread1_child1_leaf1 thread1_child1_leaf2 thread1_child1_leaf3 +do + if echo "Penguinone" | .github/workflows/related_docs_test/chat_completion.sh http://localhost:8080 ${PARTITION_NAME} | grep file_id | grep $target + then + echo "Found $target" + else + echo "Not found $target" + exit 1; + fi +done + +# Negative tests +for target in thread2 child2 child3 +do + if echo "Penguinone" | .github/workflows/related_docs_test/chat_completion.sh http://localhost:8080 ${PARTITION_NAME} | grep file_id | grep $target + then + echo "False positive: $target in the output" + exit 1 + fi +done + +# Verification : thread 2 + +# Positive tests +for target in thread2_root thread2_child1 thread2_child1_leaf1 thread2_child1_leaf2 thread2_child1_leaf3 +do + if echo "Paris" | .github/workflows/related_docs_test/chat_completion.sh http://localhost:8080 ${PARTITION_NAME} | grep file_id | grep $target + then + echo "Found $target" + else + echo "Not found $target" + exit 1; + fi +done + +# Negative tests +for target in thread1 child2 child3 +do + if echo "Paris" | .github/workflows/related_docs_test/chat_completion.sh http://localhost:8080 ${PARTITION_NAME} | grep file_id | grep $target + then + echo "False positive: $target in the output" + exit 1 + fi +done + diff --git a/.github/workflows/email_test/index_child_email.sh b/.github/workflows/email_test/index_child_email.sh new file mode 100755 index 00000000..3af38932 --- /dev/null +++ b/.github/workflows/email_test/index_child_email.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash +set -euo pipefail + +ENDPOINT_URL=$1 +PARTITION_NAME=$2 +FILE_NAME=$3 +PARENT_FILE_NAME=$4 +THREAD_ID=$5 +CONTENT=$(cat) + +metadata=$(jq -nc \ + --arg parent_file_name "${PARENT_FILE_NAME}" \ + --arg thread_id "${THREAD_ID}" \ + '{ + mimetype: "text/plain", + "email.subject": "subject", + datetime: "datetime", + parent_id: $parent_file_name, + relationship_id: $thread_id, + doctype: "com.linagora.email", + "email.preview": "e-mail preview" + }') + +echo ${metadata} + +curl -X 'POST' \ + ${ENDPOINT_URL}/indexer/partition/${PARTITION_NAME}/file/${FILE_NAME} \ + -H 'accept: application/json' \ + -H 'Content-Type: multipart/form-data' \ + -F "file=@-;filename=${FILE_NAME};type=text/plain" \ + -F "metadata=$metadata" <<< "$CONTENT" + diff --git a/.github/workflows/email_test/index_root_email.sh b/.github/workflows/email_test/index_root_email.sh new file mode 100755 index 00000000..8993172f --- /dev/null +++ b/.github/workflows/email_test/index_root_email.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +set -euo pipefail + +ENDPOINT_URL=$1 +PARTITION_NAME=$2 +FILE_NAME=$3 +THREAD_ID=$4 +CONTENT=$(cat) + +metadata=$(jq -nc \ + --arg thread_id "${THREAD_ID}" \ + '{ + mimetype: "text/plain", + "email.subject": "subject", + datetime: "datetime", + parent_id: "", + relationship_id: $thread_id, + doctype: "com.linagora.email", + "email.preview": "e-mail preview" + }') + +echo ${metadata} + +curl -X 'POST' \ + ${ENDPOINT_URL}/indexer/partition/${PARTITION_NAME}/file/${FILE_NAME} \ + -H 'accept: application/json' \ + -H 'Content-Type: multipart/form-data' \ + -F "file=@-;filename=${FILE_NAME};type=text/plain" \ + -F "metadata=$metadata" <<< "$CONTENT" + diff --git a/.github/workflows/related_docs_test.yaml b/.github/workflows/related_docs_test.yaml new file mode 100644 index 00000000..41df992f --- /dev/null +++ b/.github/workflows/related_docs_test.yaml @@ -0,0 +1,117 @@ +name: Related docs test + +on: + push: + workflow_dispatch: + +jobs: + related-docs-test: + runs-on: ubuntu-latest + steps: + - name: Update apt and install packages + run: | + df -h + echo ${PWD} + sudo apt update + sudo apt install -y curl git jq + sudo apt clean + sudo apt autoremove --purge -y + + - + name: Set up Docker Compose + uses: docker/setup-compose-action@v1 + with: + version: v2.34.0 + + - + name: Free up space and move Docker storage to /mnt + run: | + echo "Stopping Docker..." + sudo systemctl stop docker + + echo "Creating new Docker root at /mnt/docker..." + sudo mkdir -p /mnt/docker + sudo rsync -aqxP /var/lib/docker/ /mnt/docker + + echo "Updating Docker daemon config..." + echo '{"data-root": "/mnt/docker"}' | sudo tee /etc/docker/daemon.json + + cat /etc/docker/daemon.json + + echo "Restarting Docker..." + sudo systemctl start docker + + echo "Verifying Docker root directory:" + docker info | grep "Docker Root Dir" + + - + name: Checkout current branch + uses: actions/checkout@v5 + with: + submodules: true + + - + name: Set up env + run: | + cp .github/workflows/related_docs_test/.env ./ + cp .github/workflows/related_docs_test/*.yaml ./ + + - + name: Patch vllm to v0.9.2 + run: sed -Ei 's/checkout v[0-9\.]+/checkout v0.9.2/' extern/vllm/Dockerfile.cpu + + - + name: Build + run: docker compose --profile cpu build + + - + name: Run + run: | + docker compose --profile cpu up -d || docker logs openrag-vllm-cpu-1 + .github/workflows/smoke_test/wait_for_healthy.sh openrag-vllm-cpu-1 + + - + name: Cleanup + run: | + docker container prune -f + docker image prune -f + docker builder prune -f + df -h + + - + name: List containers + run: docker container ls + + - + name: Install Python venv + run: | + python3 -m venv venv + source venv/bin/activate + pip3 install -r utility/requirements.txt + + - + name: Wait for OpenRag to start + run: | + .github/workflows/related_docs_test/wait_for_services.sh + + - + name: Index small documents (relations test) + run: | + echo "Sun is shining" | .github/workflows/related_docs_test/index_file.sh http://localhost:8080 rel_test root.txt + sleep 10s + echo "Cats meow" | .github/workflows/related_docs_test/index_child_file.sh http://localhost:8080 rel_test child_a.txt root.txt + sleep 10s + echo "There are letters in the text" | .github/workflows/related_docs_test/index_child_file.sh http://localhost:8080 rel_test leaf_a.txt child_a.txt + sleep 30s + + - + name: Query small documents (relations test) + run: | + echo "Sun Is Shining" | .github/workflows/related_docs_test/chat_completion.sh http://localhost:8080 rel_test | grep -v '{\\"' | grep file_id | grep "root.txt" + echo "Sun Is Shining" | .github/workflows/related_docs_test/chat_completion.sh http://localhost:8080 rel_test | grep -v '{\\"' | grep file_id | grep "child_a.txt" + echo "Cats meow" | .github/workflows/related_docs_test/chat_completion.sh http://localhost:8080 rel_test | grep -v '{\\"' | grep file_id | grep "root.txt" + echo "Cats meow" | .github/workflows/related_docs_test/chat_completion.sh http://localhost:8080 rel_test | grep -v '{\\"' | grep file_id | grep "child_a.txt" + echo "Cats meow" | .github/workflows/related_docs_test/chat_completion.sh http://localhost:8080 rel_test | grep -v '{\\"' | grep file_id | grep "leaf_a.txt" + echo "There are letters in the text" | .github/workflows/related_docs_test/chat_completion.sh http://localhost:8080 rel_test | grep -v '{\\"' | grep file_id | grep "child_a.txt" + echo "There are letters in the text" | .github/workflows/related_docs_test/chat_completion.sh http://localhost:8080 rel_test | grep -v '{\\"' | grep file_id | grep "leaf_a.txt" + diff --git a/.github/workflows/related_docs_test/.env b/.github/workflows/related_docs_test/.env new file mode 100644 index 00000000..e37d7c91 --- /dev/null +++ b/.github/workflows/related_docs_test/.env @@ -0,0 +1,67 @@ +# LLM +BASE_URL=http://mock-llm:8080/v1/ +API_KEY=sk- +MODEL=mock-model + +# VLM (Visual Language Model) you can set it to the same as LLM if your LLM supports images +VLM_BASE_URL=http://mock-llm:8080/v1/ +VLM_API_KEY=sk- +VLM_MODEL=mock-model + +RAGMODE=SimpleRag +## FastAPI App (no need to change it) +# APP_PORT=8080 # this is the forwarded port +# API_NUM_WORKERS=1 # Number of uvicorn workers for the FastAPI app + +## To enable API HTTP authentication via HTTPBearer +# AUTH_TOKEN=sk-openrag-1234 + +# SAVE_UPLOADED_FILES=true # usefull for chainlit source viewing + +# Set to true, it will mount chainlit chat ui to the fastapi app (Default: true) +## WITH_CHAINLIT_UI=true + +# RETRIEVER +CONTEXTUAL_RETRIEVAL=false + +# EMBEDDER +EMBEDDER_MODEL_NAME=Qwen/Qwen3-Embedding-0.6B +EMBEDDER_BASE_URL=http://vllm:8000/v1 +# EMBEDDER_API_KEY=EMPTY + +# RERANKER +RERANKER_ENABLED=false +RERANKER_MODEL=Alibaba-NLP/gte-multilingual-reranker-base # or jinaai/jina-reranker-v2-base-multilingual + +# Prompts +PROMPTS_DIR=../prompts/example1 + +# Loaders +PDFLoader=MarkerLoader +XDG_CACHE_HOME=/app/model_weights +# If using MarkerLoader +MARKER_MAX_TASKS_PER_CHILD=1 +MARKER_MAX_PROCESSES=1 +MARKER_MIN_PROCESSES=1 +MARKER_POOL_SIZE=1 # Value au increment if you have a cluster of machines +MARKER_NUM_GPUS=0.01 + +# Ray +RAY_POOL_SIZE=1 # Number of serializer actor instances +RAY_MAX_TASKS_PER_WORKER=2 # Number of tasks per serializer +RAY_DEDUP_LOGS=0 # turns off ray log deduplication that appear across multiple processes +RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING=1 # # to enable logs at task level in ray dashboard +RAY_task_retry_delay_ms=3000 +RAY_ENABLE_UV_RUN_RUNTIME_ENV=0 # critical with the newest version of UV +RAY_memory_monitor_refresh_ms=0 + +# Indexer UI +## 1. replace X.X.X.X with localhost if launching local or with your server IP +## 2. APP_PORT with your FastAPI port (8080 by default) +## 3. Base URL of the Indexer UI (required to prevent CORS issues). Replace INDEXERUI_PORT with its value +## 4. Base URL of your FastAPI backend. Used by the frondend. Replace APP_PORT with the actual port number of your FastAPI backend + +VITE_INCLUDE_CREDENTIALS=false # set true if fastapi authentification is enabled +INDEXERUI_PORT=8060 # Port to expose the Indexer UI (default is 3042) +INDEXERUI_URL='http://X.X.X.X:INDEXERUI_PORT' +VITE_API_BASE_URL='http://X.X.X.X:APP_PORT' diff --git a/.github/workflows/related_docs_test/chat_completion.sh b/.github/workflows/related_docs_test/chat_completion.sh new file mode 100755 index 00000000..58e5fdbe --- /dev/null +++ b/.github/workflows/related_docs_test/chat_completion.sh @@ -0,0 +1,34 @@ +#!/usr/bin/env bash +set -euo pipefail + +ENDPOINT_URL=$1 +PARTITION_NAME=$2 +QUERY=$(cat) + +payload=$(jq -nc \ + --arg model "openrag-${PARTITION_NAME}" \ + --arg query "${QUERY}" \ + '{ + model: $model, + messages: [{role: "user", content: $query}], + temperature: 0.3, + top_p: 1, + stream: false, + max_tokens: 1024, + logprobs: 0, + metadata: {use_map_reduce: false} + }') + +#echo ${payload} + +response=`curl --connect-timeout 600 -X POST "${ENDPOINT_URL}/v1/chat/completions" \ + -H "accept: application/json" \ + -H "Content-Type: application/json" \ + -d "$payload"` + +#echo $response | jq . + +extra=`echo $response | jq '.extra | fromjson'` + +echo $extra | jq . + diff --git a/.github/workflows/related_docs_test/docker-compose.yaml b/.github/workflows/related_docs_test/docker-compose.yaml new file mode 100644 index 00000000..ca0dc465 --- /dev/null +++ b/.github/workflows/related_docs_test/docker-compose.yaml @@ -0,0 +1,141 @@ +include: + - vdb/milvus.yaml +# - extern/infinity.yaml + +x-openrag: &openrag_template + #image: ghcr.io/linagora/openrag:dev-latest + build: + context: . + dockerfile: Dockerfile + volumes: + - ${CONFIG_VOLUME:-./.hydra_config}:/app/.hydra_config + - ${DATA_VOLUME:-./data}:/app/data + - ${MODEL_WEIGHTS_VOLUME:-~/.cache/huggingface}:/app/model_weights # Model weights for RAG + - ./openrag:/app/openrag # For dev mode + - /$SHARED_ENV:/ray_mount/.env # Shared environment variables + - ./ray_mount/logs:/app/logs + ports: + - ${APP_PORT:-8080}:${APP_iPORT:-8080} + - ${RAY_DASHBOARD_PORT:-8265}:8265 # Disable when in cluster mode + networks: + default: + aliases: + - openrag + env_file: + - ${SHARED_ENV:-.env} + shm_size: 10.24gb + +x-vllm: &vllm_template + networks: + default: + aliases: + - vllm + restart: none # Better to fail in the CI context + environment: + - HUGGING_FACE_HUB_TOKEN + ipc: "host" + volumes: + - ${VLLM_CACHE:-/root/.cache/huggingface}:/root/.cache/huggingface # put ./vllm_cache if you want to have the weights on the vllm_cache folder in your project + command: > + --model ${EMBEDDER_MODEL_NAME:-jinaai/jina-embeddings-v3} + --trust-remote-code + --task embed + --gpu_memory_utilization 0.3 + # --max-num-seqs 1 + # --max-model-len ${MOX_MODEL_LEN:-2048} + # gpu_memory_utilization, max-num-seqs et max-model-len can be tuned depending on your GPU memory + + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 360s + # ports: + # - ${VLLM_PORT:-8000}:8000 +services: + # GPU - default + openrag: + <<: *openrag_template + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: all + capabilities: [ gpu ] + profiles: + - '' + depends_on: + milvus: + condition: service_healthy + vllm-gpu: + condition: service_healthy + + # No GPU + openrag-cpu: + <<: *openrag_template + deploy: {} + profiles: + - 'cpu' + depends_on: + milvus: + condition: service_healthy + vllm-cpu: + condition: service_healthy + + rdb: + image: postgres:15 + environment: + - POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-root_password} + - POSTGRES_USER=${POSTGRES_USER:-root} + volumes: + - ${DB_VOLUME:-./db}:/var/lib/postgresql/data + + vllm-gpu: + <<: *vllm_template + image: vllm/vllm-openai:v0.9.2 + runtime: nvidia + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: all + capabilities: [gpu] + profiles: + - '' # Empty string gives default behavior (but does not run when cpu requested) + + vllm-cpu: + <<: *vllm_template + build: + context: extern/vllm + dockerfile: Dockerfile.cpu + target: vllm-openai + image: openrag-vllm-openai-cpu + deploy: {} + environment: + # - VLLM_CPU_KVCACHE_SPACE=8 # Default value isn't sufficient for full context length + - VLLM_USE_V1=0 # for ibm-granite/granite-embedding-small-english-r2 + command: > + --model ${EMBEDDER_MODEL_NAME:-jinaai/jina-embeddings-v3} + --trust-remote-code + --dtype float32 + --max-num-batched-tokens 32768 + # dtype is required for aarch64 (https://github.com/vllm-project/vllm/issues/11327) and improves speed on amd64. + # max-num-batched-tokens is required for aarch64 because chunked prefill isn't supported by V1 vllm backend + # for aarch64 yet. On aarch64 max-num-batched-tokens must be equal max-model-len for now (without chunked prefill). + # For details see https://github.com/vllm-project/vllm/issues/21179 + + profiles: + - 'cpu' + + mock-llm: + build: + context: .github/workflows/related_docs_test/mock-llm + dockerfile: Dockerfile.mock-llm + ports: + - 8001:8080 + profiles: + - 'cpu' + diff --git a/.github/workflows/related_docs_test/index_child_file.sh b/.github/workflows/related_docs_test/index_child_file.sh new file mode 100755 index 00000000..4acc9db5 --- /dev/null +++ b/.github/workflows/related_docs_test/index_child_file.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +set -euo pipefail + +ENDPOINT_URL=$1 +PARTITION_NAME=$2 +FILE_NAME=$3 +PARENT_FILE_NAME=$4 +CONTENT=$(cat) + +metadata=$(jq -nc \ + --arg parent_file_name "${PARENT_FILE_NAME}" \ + '{ + mimetype: "text/plain", + rels: [ + { + target: $parent_file_name, + type: "parent" + } + ] + }') + +echo ${metadata} + +curl -X 'POST' \ + ${ENDPOINT_URL}/indexer/partition/${PARTITION_NAME}/file/${FILE_NAME} \ + -H 'accept: application/json' \ + -H 'Content-Type: multipart/form-data' \ + -F "file=@-;filename=${FILE_NAME};type=text/plain" \ + -F "metadata=$metadata" <<< "$CONTENT" + diff --git a/.github/workflows/related_docs_test/index_file.sh b/.github/workflows/related_docs_test/index_file.sh new file mode 100755 index 00000000..f39d7d32 --- /dev/null +++ b/.github/workflows/related_docs_test/index_file.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash +set -euo pipefail + +ENDPOINT_URL=$1 +PARTITION_NAME=$2 +FILE_NAME=$3 +CONTENT=$(cat) + +curl -X 'POST' \ + ${ENDPOINT_URL}/indexer/partition/${PARTITION_NAME}/file/${FILE_NAME} \ + -H 'accept: application/json' \ + -H 'Content-Type: multipart/form-data' \ + -F "file=@-;filename=${FILE_NAME};type=text/plain" \ + -F 'metadata={"mimetype":"text/plain"}' <<< "$CONTENT" + diff --git a/.github/workflows/related_docs_test/mock-llm/Dockerfile.mock-llm b/.github/workflows/related_docs_test/mock-llm/Dockerfile.mock-llm new file mode 100644 index 00000000..9191e9f2 --- /dev/null +++ b/.github/workflows/related_docs_test/mock-llm/Dockerfile.mock-llm @@ -0,0 +1,20 @@ +# Dockerfile +FROM python:3.12-slim + +WORKDIR /app + +RUN apt-get update && apt-get install -y curl vim && rm -rf /var/lib/apt/lists/* + +RUN pip install --no-cache-dir flask + +COPY mock_openai_server.py . + +RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app +USER appuser + +HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \ + CMD curl -f http://localhost:8080/health || exit 1 + +EXPOSE 8080 + +CMD ["python", "mock_openai_server.py"] diff --git a/.github/workflows/related_docs_test/mock-llm/mock_openai_server.py b/.github/workflows/related_docs_test/mock-llm/mock_openai_server.py new file mode 100644 index 00000000..a1d0aae7 --- /dev/null +++ b/.github/workflows/related_docs_test/mock-llm/mock_openai_server.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python3 + +from flask import Flask, request, jsonify +import json +import time +import uuid +from typing import Dict, Any, List +import threading + +app = Flask(__name__) + +HOST = "0.0.0.0" +PORT = 8080 + +# Constant response for chat completions +CONSTANT_RESPONSE = "This is a mock AI response. The user said: " + +# Model information +AVAILABLE_MODELS = { + "mock-model": { + "id": "mock-model", + "object": "model", + "created": 1687882411, + "owned_by": "mock-company" + } +} + +def create_chat_completion_response( + messages: List[Dict[str, str]], + model: str = "mock-model" +) -> Dict[str, Any]: + """Create a mock chat completion response.""" + + # Extract user messages for the constant response + user_messages = [ + msg["content"] for msg in messages + if msg["role"] == "user" + ] + last_user_message = user_messages[-1] if user_messages else "" + + # Generate response content + response_content = CONSTANT_RESPONSE + last_user_message + + # Create response ID + response_id = f"chatcmpl-{uuid.uuid4().hex}" + + # Current timestamp + created = int(time.time()) + + return { + "id": response_id, + "object": "chat.completion", + "created": created, + "model": model, + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": response_content, + }, + "finish_reason": "stop", + "logprobs": None, + } + ], + "usage": { + "prompt_tokens": 10, + "completion_tokens": len(response_content.split()), + "total_tokens": 10 + len(response_content.split()), + } + } + +@app.route('/v1/chat/completions', methods=['POST']) +def chat_completions(): + """Handle chat completion requests.""" + try: + data = request.get_json() + + # Extract parameters with defaults + messages = data.get('messages', []) + model = data.get('model', 'mock-model') + stream = data.get('stream', False) + + # Validate required fields + if not messages: + return jsonify({ + "error": { + "message": "Missing required field: messages", + "type": "invalid_request_error", + "code": "missing_messages" + } + }), 400 + + # Validate model + if model not in AVAILABLE_MODELS: + return jsonify({ + "error": { + "message": f"The model '{model}' does not exist", + "type": "invalid_request_error", + "code": "model_not_found" + } + }), 404 + + # Handle streaming response + if stream: + def generate(): + response_content = CONSTANT_RESPONSE + if messages: + user_messages = [ + msg["content"] for msg in messages + if msg["role"] == "user" + ] + if user_messages: + response_content += user_messages[-1] + + # Split response into chunks for streaming + words = response_content.split() + for i, word in enumerate(words): + chunk = { + "id": f"chatcmpl-{uuid.uuid4().hex()}", + "object": "chat.completion.chunk", + "created": int(time.time()), + "model": model, + "choices": [ + { + "index": 0, + "delta": { + "content": word + (" " if i < len(words) - 1 else "") + }, + "finish_reason": None if i < len(words) - 1 else "stop", + } + ] + } + yield f"data: {json.dumps(chunk)}\n\n" + yield "data: [DONE]\n\n" + + return app.response_class(generate(), mimetype='text/event-stream') + + # Regular non-streaming response + response = create_chat_completion_response(messages, model) + return jsonify(response) + + except Exception as e: + return jsonify({ + "error": { + "message": str(e), + "type": "server_error", + "code": "internal_error" + } + }), 500 + +@app.route('/v1/models', methods=['GET']) +def list_models(): + """List available models.""" + return jsonify({ + "object": "list", + "data": list(AVAILABLE_MODELS.values()) + }) + +@app.route('/v1/models/', methods=['GET']) +def retrieve_model(model_id): + """Retrieve a specific model.""" + if model_id in AVAILABLE_MODELS: + return jsonify(AVAILABLE_MODELS[model_id]) + return jsonify({ + "error": { + "message": f"The model '{model_id}' does not exist", + "type": "invalid_request_error", + "code": "model_not_found" + } + }), 404 + +@app.route('/health', methods=['GET']) +def health_check(): + """Health check endpoint.""" + return jsonify({"status": "healthy", "timestamp": time.time()}) + +@app.route('/v1/engines', methods=['GET']) +def list_engines(): + """Legacy endpoint for listing models (engines).""" + return list_models() + +def run_server(host=HOST, port=PORT, debug=False): + """Run the mock server.""" + print(f"Starting mock OpenAI compatible server at http://{host}:{port}") + print(f"Available endpoints:") + print(f" POST /v1/chat/completions") + print(f" GET /v1/models") + print(f" GET /v1/models/{{model_id}}") + print(f" GET /health") + print(f"\nExample usage:") + print(f" curl -X POST http://{host}:{port}/v1/chat/completions \\") + print(f" -H 'Content-Type: application/json' \\") + print(f" -d '{{\"model\": \"mock-model\", \"messages\": [{{\"role\": \"user\", \"content\": \"Hello!\"}}]}}'") + app.run(host=host, port=port, debug=debug) + +if __name__ == '__main__': + # Run server in a separate thread for testing + server_thread = threading.Thread( + target=run_server, + kwargs={'host': HOST, 'port': PORT, 'debug': False} + ) + server_thread.daemon = True + server_thread.start() + + # Keep main thread alive + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nShutting down server...") + diff --git a/.github/workflows/related_docs_test/wait_for_services.sh b/.github/workflows/related_docs_test/wait_for_services.sh new file mode 100755 index 00000000..796ce6a7 --- /dev/null +++ b/.github/workflows/related_docs_test/wait_for_services.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash + +source venv/bin/activate + +SERVICE_NAME=openrag-openrag-cpu-1 +docker container ls +OPENRAG_ADDR=`docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' ${SERVICE_NAME}` +docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' ${SERVICE_NAME} + +docker logs ${SERVICE_NAME} + +while true; do + STATUS_CODE=$(curl -s -o /dev/null -w "%{http_code}" "${OPENRAG_ADDR}:8080/health_check") + if [ "$STATUS_CODE" -eq 200 ]; then + echo "$(date): API is up and running" + break + else + echo "$(date): ${SERVICE_NAME} Health check failed with status $STATUS_CODE, retrying..." + docker logs ${SERVICE_NAME} + sleep 10 +fi +done + diff --git a/.github/workflows/smoke_test/index_docs.sh b/.github/workflows/smoke_test/index_docs.sh index 379b4a51..1b215b8b 100755 --- a/.github/workflows/smoke_test/index_docs.sh +++ b/.github/workflows/smoke_test/index_docs.sh @@ -2,11 +2,12 @@ source venv/bin/activate +SERVICE_NAME=openrag-openrag-cpu-1 docker container ls -OPENRAG_ADDR=`docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' openrag-openrag-cpu-1` -docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' openrag-openrag-cpu-1 +OPENRAG_ADDR=`docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' ${SERVICE_NAME}` +docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' ${SERVICE_NAME} -docker logs openrag-openrag-cpu-1 +docker logs ${SERVICE_NAME} while true; do STATUS_CODE=$(curl -s -o /dev/null -w "%{http_code}" "${OPENRAG_ADDR}:8080/health_check") @@ -14,7 +15,8 @@ while true; do echo "$(date): API is up and running" break else - echo "$(date): Health check failed with status $STATUS_CODE, retrying..." + echo "$(date): ${SERVICE_NAME} : Health check failed with status $STATUS_CODE, retrying..." + docker logs ${SERVICE_NAME} sleep 10 fi done @@ -26,7 +28,7 @@ python3 utility/data_indexer.py \ -d .github/workflows/data/simplewiki-500/ \ -p simplewiki-500 -docker logs openrag-openrag-cpu-1 +docker logs ${SERVICE_NAME} -.github/workflows/smoke_test/wait_for_tasks_completed.sh openrag-openrag-cpu-1 8080 500 +.github/workflows/smoke_test/wait_for_tasks_completed.sh ${SERVICE_NAME} 8080 500 diff --git a/.hydra_config/config.yaml b/.hydra_config/config.yaml index 8ec8bb25..e4a9365a 100644 --- a/.hydra_config/config.yaml +++ b/.hydra_config/config.yaml @@ -1,8 +1,8 @@ defaults: - _self_ # TODO: Silences the hydra version migration warning (PLEASE REVIEW FOR BREAKING CHANGES) - - chunker: ${oc.env:CHUNKER, recursive_splitter} # recursive_splitter - - retriever: ${oc.env:RETRIEVER_TYPE, single} # single # multiQuery # hyde - - rag: ChatBotRag + - chunker: ${oc.env:CHUNKER, recursive_splitter} # recursive_splitter, semantic_splitter, markdown_splitter + - retriever: ${oc.env:RETRIEVER_TYPE, single} # single # multiQuery # hyde # email + - rag: ${oc.env:RAGMODE, ChatBotRag} llm_params: &llm_params temperature: 0.1 diff --git a/.hydra_config/retriever/email.yaml b/.hydra_config/retriever/email.yaml new file mode 100644 index 00000000..e8270bb9 --- /dev/null +++ b/.hydra_config/retriever/email.yaml @@ -0,0 +1,5 @@ +defaults: + - base + +type: email + diff --git a/openrag/components/indexer/vectordb/utils.py b/openrag/components/indexer/vectordb/utils.py index 99c63c40..52785c9a 100644 --- a/openrag/components/indexer/vectordb/utils.py +++ b/openrag/components/indexer/vectordb/utils.py @@ -1,6 +1,7 @@ import hashlib import os import secrets +import enum from datetime import datetime from typing import Dict, Optional @@ -10,12 +11,13 @@ CheckConstraint, Column, DateTime, + Enum, ForeignKey, Index, Integer, String, UniqueConstraint, - create_engine, + create_engine ) from sqlalchemy.orm import ( declarative_base, @@ -57,6 +59,21 @@ class File(Base): Index("ix_partition_file", "partition_name", "file_id"), ) + # Directed many-to-many relationships + outgoing_relationships = relationship( + "FileRelationship", + foreign_keys="FileRelationship.source_file_id", + back_populates="source_file", + cascade="all, delete-orphan", + ) + + incoming_relationships = relationship( + "FileRelationship", + foreign_keys="FileRelationship.target_file_id", + back_populates="target_file", + cascade="all, delete-orphan", + ) + def to_dict(self): metadata = self.file_metadata or {} d = {"partition": self.partition_name, "file_id": self.file_id, **metadata} @@ -66,6 +83,49 @@ def __repr__(self): return f"" +class FileRelationshipType(enum.Enum): + parent = "parent" + email_thread = "email_thread" + other = "other" + + +class FileRelationship(Base): + __tablename__ = "file_relationships" + + id = Column(Integer, primary_key=True) + + # Directed edge: source → target + source_file_id = Column(Integer, ForeignKey("files.id", ondelete="CASCADE"), nullable=False) + target_file_id = Column(Integer, ForeignKey("files.id", ondelete="CASCADE"), nullable=False) + + relationship_type = Column( + Enum(FileRelationshipType, name="file_relationship_type"), + nullable=False, + ) + + source_file = relationship( + "File", + foreign_keys=[source_file_id], + back_populates="outgoing_relationships", + ) + target_file = relationship( + "File", + foreign_keys=[target_file_id], + back_populates="incoming_relationships", + ) + + __table_args__ = ( + # Prevent duplicate edges of the same type + UniqueConstraint( + "source_file_id", "target_file_id", "relationship_type", + name="uix_file_relationship_unique" + ), + # Index for efficient traversal queries + Index("ix_file_rel_source_target", "source_file_id", "target_file_id"), + Index("ix_file_rel_type", "relationship_type"), +) + + # In the Partition model class Partition(Base): __tablename__ = "partitions" @@ -239,6 +299,11 @@ def add_file_to_partition( ) session.add(membership) + target_files = {} + + for k in file_metadata: + log.info(f'file_metadata {k} = ...') + # Add file to partition file = File( file_id=file_id, @@ -247,12 +312,135 @@ def add_file_to_partition( ) session.add(file) + session.flush() + + if 'rels' in file_metadata: + for rel in file_metadata['rels']: + if rel['target'] in target_files and rel['type'] in target_files[rel['target']]: + continue + log.info(f"new rel : {rel['target']} {rel['type']}") + file_object = session.query(File).filter_by( + file_id=rel['target'], + partition_name=partition).one_or_none() + if file_object is None: + raise Exception(f'Can\'t find {rel["target"]} referenced from {file_id}') + if rel['target'] not in target_files: + target_files[rel['target']] = {} + target_files[rel['target']][rel['type']] = file_object + + objects = [] + + for target_id in target_files: + for rel_type in target_files[target_id]: + log.info(f"before rel: {file_id} {rel_type}") + r = FileRelationship( + source_file=file, + target_file=target_files[target_id][rel_type], + relationship_type=rel_type + ) + log.info("after rels") + objects.append(r) + + session.add_all(objects) + session.commit() log.info("Added file successfully") return True - except Exception: + except Exception as e: + session.rollback() + log.exception(f"Error adding file to partition: {e}") + raise + + + def get_all_rels(self, file_id, file_ids: list, relationship_type: str, partition: str): + """ Finds all files that have `file_ids` as target with `relationship_type` + """ + log = self.logger.bind(file_id=file_id) + with self.Session() as session: + try: + ids = ( + session.query(File.id) + .filter(File.file_id.in_(file_ids), File.partition_name.in_(partition)) + .all() + ) + log.info(f"len(ids) == {len(ids)}") + if 0 == len(ids): + log.warning(f"Files from the list {file_ids} in {partition} don't have any links.") + return [] + + ids = [ item.id for item in ids ] + + rels = ( + session.query(FileRelationship, File.file_id) + .join(File, File.id == FileRelationship.source_file_id) + .filter( + FileRelationship.target_file_id.in_(ids), + FileRelationship.relationship_type==relationship_type + ) + .all() + ) + + source_file_ids = [ file_id for rel, file_id in rels ] + + for rel, fle in rels: + log.opt(raw=True).info(f"{rel.id}: {rel.source_file_id} -> {rel.target_file_id} | {fle}\n") + + return source_file_ids + + except Exception as e: + session.rollback() + log.exception(f"Error: {e}") + raise + + + def get_related_file_ids(self, file_id, relationship_type: str, partition: str): + log = self.logger.bind(file_id=file_id) + with self.Session() as session: + try: + current_file = ( + session.query(File.id) + .filter(File.file_id == file_id, File.partition_name.in_(partition)) + .first() + ) + if not current_file: + log.warning(f"File {file_id} doesn't exist in partition {partition}") + return False + + file_pk = current_file.id + log.info(f'current file {file_id} id is {file_pk}') + + outgoing_rels = ( + session.query(FileRelationship, File.file_id) + .join(File, File.id == FileRelationship.target_file_id) + .filter(FileRelationship.source_file_id==file_pk, + FileRelationship.relationship_type==relationship_type) + .all() + ) + log.info(f"{len(outgoing_rels)} out rels") + for rel, fle in outgoing_rels: + log.opt(raw=True).info(f"{rel.id}: {fle}\n") + + incoming_rels = ( + session.query(FileRelationship, File.file_id) + .join(File, File.id == FileRelationship.source_file_id) + .filter(FileRelationship.target_file_id==file_pk, + FileRelationship.relationship_type==relationship_type) + .all() + ) + log.info(f"{len(incoming_rels)} in rels") + for rel, fle in incoming_rels: + log.opt(raw=True).info(f"{rel.id}: {fle}\n") + + ids = { + 'outgoing': list({file_id for rel, file_id in outgoing_rels}), + 'incoming': list({file_id for rel, file_id in incoming_rels}) + } + + return ids + + except Exception as e: session.rollback() - log.exception("Error adding file to partition") + log.exception(f"Error: {e}") raise def remove_file_from_partition(self, file_id: str, partition: str): diff --git a/openrag/components/indexer/vectordb/vectordb.py b/openrag/components/indexer/vectordb/vectordb.py index 2d22541b..026885be 100644 --- a/openrag/components/indexer/vectordb/vectordb.py +++ b/openrag/components/indexer/vectordb/vectordb.py @@ -355,6 +355,24 @@ async def async_add_documents(self, chunks: list[Document], user: dict) -> None: """Asynchronously add documents to the vector store.""" try: + # Special handing for e-mails in Twake Mail + for ch in chunks: + if ch.metadata['doctype'] in [ "com.linagora.email" ]: + if 'rels' not in ch.metadata: + ch.metadata['rels'] = [] + + if ch.metadata.get('parent_id') not in {None, ''}: + ch.metadata['rels'].append({ + 'target': ch.metadata['parent_id'], + 'type': 'parent' + }) + + if ch.metadata.get('relationship_id') not in {None, ''}: + ch.metadata['rels'].append({ + 'target': ch.metadata['relationship_id'], + 'type': 'email_thread' + }) + file_metadata = dict(chunks[0].metadata) file_metadata.pop("page") file_id, partition = ( @@ -601,6 +619,50 @@ async def get_surrounding_chunks(self, docs: list[Document]) -> list[Document]: return output_docs + async def get_related_files(self, file_id, relationship_type: str, partition: str, same_group=False): + log = self.logger.bind(file_id=str(file_id)) + try: + related_ids = self.partition_file_manager.get_related_file_ids( + file_id=file_id, + relationship_type=relationship_type, + partition=partition + ) + + # merge two groups of related_ids -> ids + ids = related_ids['outgoing'] + related_ids['incoming'] + + if same_group: + all_rels = self.partition_file_manager.get_all_rels( + file_id=file_id, + file_ids=ids, + relationship_type=relationship_type, + partition=partition + ) + + ids += all_rels + + self.logger.opt(raw=True).info(f'Query {ids} from {self.collection_name}\n') + response = await self._async_client.query( + collection_name=self.collection_name, + filter=f"partition in {partition} and file_id in {ids}" + ) + + self.logger.info(f'Got {len(response)} responses') + + return _parse_documents_from_search_results([ [ { 'entity': r } for r in response ] ]) + except MilvusException as e: + self.logger.exception("Search failed in Milvus", error=str(e)) + raise VDBSearchError( + f"Search failed in Milvus: {str(e)}", + collection_name=self.collection_name, + ) + except Exception as e: + self.logger.exception("Unexpected error occurred", error=str(e)) + raise UnexpectedVDBError( + f"Unexpected error occurred: {str(e)}", + collection_name=self.collection_name, + ) + async def delete_file(self, file_id: str, partition: str): log = self.logger.bind(file_id=file_id, partition=partition) try: diff --git a/openrag/components/pipeline.py b/openrag/components/pipeline.py index bbfc3c1f..9c3f9d7a 100644 --- a/openrag/components/pipeline.py +++ b/openrag/components/pipeline.py @@ -82,7 +82,9 @@ def __init__(self, config) -> None: self.contextualizer = AsyncOpenAI( base_url=config.llm["base_url"], api_key=config.llm["api_key"] ) - self.max_contextualized_query_len = config.rag["max_contextualized_query_len"] + + if self.rag_mode in [ RAGMODE.CHATBOTRAG ]: + self.max_contextualized_query_len = config.rag["max_contextualized_query_len"] # map reduce self.map_reduce: RAGMapReduce = RAGMapReduce(config=config) diff --git a/openrag/components/retriever.py b/openrag/components/retriever.py index 43d4deea..97394d36 100644 --- a/openrag/components/retriever.py +++ b/openrag/components/retriever.py @@ -45,6 +45,8 @@ async def retrieve( partition: list[str], query: str, ) -> list[Document]: + log = logger.bind(query=query, partition=partition) + db = get_vectordb() chunks = await db.async_search.remote( query=query, @@ -53,6 +55,118 @@ async def retrieve( similarity_threshold=self.similarity_threshold, with_surrounding_chunks=self.with_surrounding_chunks, ) + + extra_documents = [] + file_ids = set() + for chunk in chunks: + file_ids.add(chunk.metadata['file_id']) + + for file_id in file_ids: + linked_documents = await db.get_related_files.remote(file_id, 'parent', partition) + log.info(f'Got {len(linked_documents)} chunks from documents linked with {file_id}') + for linked_file_id in set([ d.metadata['file_id'] for d in linked_documents ]): + log.info("Linked : " + linked_file_id) + + extra_documents.extend(linked_documents) + + chunks.extend(extra_documents) + + for file_id in set( [ d.metadata['file_id'] for d in chunks ] ): + log.info(f'Found: {file_id}') + + return chunks + + +class EmailRetriever(BaseRetriever): + def __init__(self, top_k=6, similarity_threshold=0.95, **kwargs): + super().__init__(top_k, similarity_threshold, **kwargs) + + def get_parent_id(self, doc): + log = logger.bind(query="get_parent_id", partition="get_parent_id") + + try: + for rel in doc.metadata['rels']: + if rel['type'] in ['parent']: + return rel['target'] + except Exception as e: + log.error(f'get_parent_id failed: {e}') + raise + + log.info(f'No parent found: {doc.metadata["file_id"]}') + return None + + def get_branch(self, docs, curr_chunk): + log = logger.bind(query="get_branch", partition="get_branch") + + id2doc = {} + id2parent_id = {} + id2child_ids = {} + + all_docs = docs + [ curr_chunk ] + + for doc in all_docs: + id2doc[doc.metadata['file_id']] = doc + parent_id = self.get_parent_id(doc) + id2parent_id[doc.metadata['file_id']] = parent_id + if parent_id is not None and parent_id != doc.metadata['file_id']: + if parent_id not in id2child_ids: + id2child_ids[parent_id] = [] + id2child_ids[parent_id].append(doc.metadata['file_id']) + + # From current to root + curr_id = curr_chunk.metadata['file_id'] + subtree = [] + while curr_id != id2parent_id[curr_id]: + curr_id = id2parent_id[curr_id] + if curr_id is None: + break + subtree.append(curr_id) + + # From current to leaves + q = [] + q.extend(id2child_ids[curr_chunk.metadata['file_id']]) + while len(q) > 0: + curr_id = q.pop(0) + subtree.append(curr_id) + if curr_id in id2child_ids: + q.extend(id2child_ids[curr_id]) + + return [ id2doc[doc_id] for doc_id in subtree ] + + async def retrieve( + self, + partition: list[str], + query: str, + ) -> list[Document]: + log = logger.bind(query=query, partition=partition) + + db = get_vectordb() + chunks = await db.async_search.remote( + query=query, + partition=partition, + top_k=self.top_k, + similarity_threshold=self.similarity_threshold, + ) + + extra_documents = [] + file_ids = set() + for chunk in chunks: + file_ids.add(chunk.metadata['file_id']) + + file_id = chunk.metadata['file_id'] + + linked_documents = await db.get_related_files.remote(file_id, 'email_thread', partition, True) + log.info(f'Got {len(linked_documents)} chunks from documents linked with {file_id}') + for linked_file_id in set([ d.metadata['file_id'] for d in linked_documents ]): + log.info("Linked : " + linked_file_id) + + extra_documents.extend(self.get_branch(linked_documents, chunk)) + + chunks.extend(extra_documents) + + for file_id in set( [ d.metadata['file_id'] for d in chunks ] ): + log.info(f'Found: {file_id}') + return chunks @@ -147,6 +261,7 @@ class RetrieverFactory: "single": SingleRetriever, "multiQuery": MultiQueryRetriever, "hyde": HyDeRetriever, + "email": EmailRetriever, } @classmethod