diff --git a/docs/disk_hnsw_multithreaded_architecture.md b/docs/disk_hnsw_multithreaded_architecture.md new file mode 100644 index 000000000..4716b90c7 --- /dev/null +++ b/docs/disk_hnsw_multithreaded_architecture.md @@ -0,0 +1,327 @@ +# HNSWDisk Multi-Threading Architecture + +## Overview + +This document describes the multi-threaded architecture of the HNSWDisk index, focusing on synchronization, concurrency in writing to disk, and performance enhancements. + +## Key Architectural Components + +### 1. Lightweight Insert Jobs + +Each insert job (to the threadpool) is lightweight and only stores metadata (vectorId, elementMaxLevel). Vector data is looked up from shared storage when the job executes, minimizing memory usage when many jobs are queued. + + +``` +┌──────────────────────────────────────────────────────────────┐ +│ HNSWDiskSingleInsertJob │ +├──────────────────────────────────────────────────────────────┤ +│ - vectorId │ +│ - elementMaxLevel │ +└──────────────────────────────────────────────────────────────┘ +``` + +At execution time, jobs access vector data via: +- **Raw vectors**: `shared_ptr` from `rawVectorsInRAM` (refcount increment, no copy) +- **Processed vectors**: Direct access from `this->vectors` container + +### 2. Segmented Neighbor Cache + +To reduce lock contention in multi-threaded scenarios, the neighbor changes cache is partitioned into **64 independent segments**: + +```cpp +static constexpr size_t NUM_CACHE_SEGMENTS = 64; // Power of 2 for efficient modulo + +struct alignas(64) CacheSegment { + std::shared_mutex guard; // Per-segment lock + std::unordered_map> cache; // Neighbor lists + std::unordered_set dirty; // Nodes needing disk write + std::unordered_set newNodes; // Nodes never written to disk +}; +``` +Note: +NUM_CACHE_SEGMENTS can be changed which will cause better separation of the cache, +but will require more RAM usage - can be configured by the user or by the expected +number of vectors in the index. + +**Key benefits:** +- Threads accessing different segments proceed in parallel. +- Cache-line alignment (`alignas(64)`) prevents false sharing. +- Hash-based segment distribution. + +#### Cache Memory Management + +**Current Behavior: No Eviction** + +The segment cache (`cacheSegment.cache`) currently **grows unboundedly** and is **never evicted**. Once a node's neighbor list is loaded into cache (either from disk or created during insert), it remains in memory indefinitely. + +**Why Cache is Source of Truth** + +The cache cannot simply be cleared because it serves as the **source of truth** for pending updates that haven't been flushed to disk yet. The Swap-and-Flush pattern relies on: +1. Cache always having the latest neighbor lists +2. `dirty` set tracking which nodes need to be written +3. Flusher reading current cache state (not stale data) + +**Need to decide which strategy to implement (if any).** +**Another option is to not use the neighbors cache at all and always read from disk** + +**1. LRU Eviction for Clean Entries** + +Evict least-recently-used entries that are **not dirty** (already persisted to disk): +```cpp +// Pseudocode +if (cacheSize > maxCacheSize) { + for (auto& entry : lruOrder) { + if (!dirty.contains(entry.key)) { + cache.erase(entry.key); + if (--evicted >= targetEviction) break; + } + } +} +``` +*Pros:* Simple, safe (dirty entries always kept) +*Cons:* Requires LRU tracking overhead (linked list + map) + +**2. Time-Based Eviction** + +Evict clean entries older than a threshold: +```cpp +// Pseudocode +for (auto& entry : cache) { + if (!dirty.contains(entry.key) && + now - entry.lastAccessTime > evictionTimeout) { + cache.erase(entry.key); + } +} +``` +*Pros:* Predictable memory behavior +*Cons:* Requires timestamp tracking per entry + +**3. Write-Through with Immediate Eviction** + +After flushing to disk, immediately evict the written entries: +```cpp +// In flushDirtyNodesToDisk(), after successful write: +for (uint64_t key : flushedNodes) { + cacheSegment.cache.erase(key); // Evict after persist +} +``` +*Pros:* Minimal memory usage, no tracking overhead +*Cons:* Increases disk reads on subsequent access + +**4. Size-Limited Cache with Eviction Policy** + +Configure maximum cache size and evict when exceeded: +```cpp +size_t maxCacheEntries = 100000; // Configurable +// On insert, check size and evict clean entries if needed +``` +*Pros:* Bounded memory usage +*Cons:* Need to choose appropriate eviction policy + +### 3. Lock Hierarchy + +| Lock | Type | Protects | Notes | +|------|------|----------|------| +| `indexDataGuard` | `shared_mutex` | `entrypointNode`, `maxLevel`, `idToMetaData`, `labelToIdMap` | Metadata access during graph construction | +| `vectorsGuard` | `shared_mutex` | Vectors container (prevents resize during access) | Vectors access during graph construction | +| `rawVectorsGuard` | `shared_mutex` | `rawVectorsInRAM` map | Raw vectors access during graph construction | +| `stagedUpdatesGuard` | `shared_mutex` | Staged graph updates for deletions | +| `diskWriteGuard` | `mutex` | Serializes global flush operations | +| `cacheSegments_[i].guard` | `shared_mutex` | Per-segment cache, dirty set, newNodes | + +### 4. Atomic Variables + +```cpp +std::atomic curElementCount; // Thread-safe element counting +std::atomic totalDirtyCount_{0}; // Fast threshold check without locking +std::atomic pendingSingleInsertJobs_{0}; // Track pending async jobs +``` +Note: +We can probably think of more atomics variables that can be added to further improve performance, I just used for the important ones. + +## Concurrency Patterns + +### Swap-and-Flush Pattern (Critical for Preventing Lost Updates) + +The `writeDirtyNodesToDisk` and `flushDirtyNodesToDisk` methods implement a two-phase approach: + +``` +Phase 1: Read cache data under SHARED lock (concurrent writers allowed) + - Read neighbor lists + - Build RocksDB WriteBatch + +Phase 2: Clear dirty flags under EXCLUSIVE lock (brief, per-segment) + - "Atomically" (under a lock) swap dirty set contents + - Release lock immediately + +Phase 3: Write to disk (NO locks held) + - Any new inserts after Phase 2 will re-add to dirty set + - On failure: re-add nodes to dirty set for retry +``` + +This prevents the "Lost Update" race condition where a concurrent insert's changes could be lost. + +### Atomic Check-and-Add for Neighbor Lists + +```cpp +bool tryAddNeighborToCacheIfCapacity(nodeId, level, newNeighborId, maxCapacity) { + // Under exclusive lock: + // 1. Check if already present (avoid duplicates) + // 2. Check capacity + // 3. If space available: add neighbor, mark dirty + // 4. If full: return false (caller must use heuristic) +} +``` + +This atomic operation prevents race conditions when multiple threads add neighbors to the same node. + +### Read-Copy-Update Pattern for Cache Access + +```cpp +// Read path (getNeighborsFromCache): +1. Try shared lock → read from cache +2. If miss: read from disk (no lock held during I/O) - rocksdb is thread-safe +3. Acquire exclusive lock → insert to cache (double-check) + +// Write path (addNeighborToCache): +1. Acquire exclusive lock +2. Load from disk if needed (release/re-acquire around I/O) - rocksdb is thread-safe +3. Add neighbor, mark dirty +``` + +## Disk Write Batching + +Controlled by `diskWriteBatchThreshold`: + +| Value | Behavior | +|-------|----------| +| `0` | Flush after every insert (no batching) | +| `>0` | Accumulate until `totalDirtyCount_ >= threshold`, then flush | + +```cpp +if (diskWriteBatchThreshold == 0) { + writeDirtyNodesToDisk(modifiedNodes, rawVectorData, vectorId); +} else if (totalDirtyCount_ >= diskWriteBatchThreshold) { + flushDirtyNodesToDisk(); // Flush ALL dirty nodes +} +``` + +## Job Queue Integration + +### Async Processing Flow + +``` +addVector() + │ + ├── Single-threaded path (no job queue): + │ └── executeGraphInsertionCore() [inline] + │ + └── Multi-threaded path (with job queue): + ├── Create HNSWDiskSingleInsertJob (just vectorId + level, no vector copy) + ├── Submit via SubmitJobsToQueue callback + └── Worker thread executes: + └── executeSingleInsertJob() + ├── Get shared_ptr to raw vector from rawVectorsInRAM + ├── Get processed vector from this->vectors + └── executeGraphInsertionCore() +``` + +### Job Structure + +```cpp +struct HNSWDiskSingleInsertJob : public AsyncJob { + idType vectorId; + size_t elementMaxLevel; + // No vector data stored - looked up from index when job executes + // This saves memory: 100M pending jobs don't need 100M vector copies +}; +``` + +Jobs look up vector data at execution time: +- **Raw vectors**: Accessed via `shared_ptr` from `rawVectorsInRAM` (just increments refcount, no copy) +- **Processed vectors**: Accessed from `this->vectors` container + +This eliminates memory duplication while maintaining thread safety through reference counting. + +## Data Flow During Insert + +``` +1. addVector() + ├── Atomically allocate vectorId (curElementCount.fetch_add) + ├── Store raw vector in rawVectorsInRAM (for other jobs to access) + ├── Preprocess vector (quantization) + ├── Store processed vector in vectors container + └── Store metadata (label, topLevel) + +2. executeGraphInsertionCore() + ├── insertElementToGraph() + │ ├── greedySearchLevel() [levels > element_max_level] + │ └── For each level: + │ ├── searchLayer() → find neighbors + │ └── mutuallyConnectNewElement() + │ ├── setNeighborsInCache() [new node's neighbors] + │ └── For each neighbor: + │ ├── tryAddNeighborToCacheIfCapacity() + │ └── or revisitNeighborConnections() [if full] + └── Handle disk write (based on threshold) + +3. Disk Write (when triggered) + ├── writeDirtyNodesToDisk() [per-insert path] + └── or flushDirtyNodesToDisk() [batch flush path] +``` + +## Performance Optimizations + +### 1. Lock-Free Hot Paths + +```cpp +// Fast deleted check without acquiring indexDataGuard +template +bool isMarkedAsUnsafe(idType internalId) const { + return __atomic_load_n(&idToMetaData[internalId].flags, 0) & FLAG; +} +``` + +Used in `processCandidate()` to avoid lock contention during search. + +### 2. Atomic Counters for Fast Threshold Checks + +```cpp +// No locking needed to check if flush is required +if (totalDirtyCount_.load(std::memory_order_relaxed) >= diskWriteBatchThreshold) { + flushDirtyNodesToDisk(); +} +``` + +### 3. newNodes Tracking + +Nodes created in the current batch are tracked in `cacheSegment.newNodes`: +- Avoids disk lookups for vectors that haven't been written yet +- Cleared after successful flush to disk + +### 4. Raw Vectors in RAM with shared_ptr + +Raw vectors are stored in `rawVectorsInRAM` using `std::shared_ptr`: + +```cpp +std::unordered_map> rawVectorsInRAM; +``` + +**Benefits:** +- Allows concurrent jobs to access vectors before disk write +- Eliminates redundant disk reads during graph construction +- **Zero-copy job execution**: Jobs increment refcount instead of copying entire vector +- **Safe concurrent deletion**: If vector is erased from map while job is executing, the `shared_ptr` keeps data alive until job completes +- Protected by `rawVectorsGuard` (shared_mutex) + +**Execution flow:** +```cpp +// Job execution - no data copy, just refcount increment +std::shared_ptr localRawRef; +{ + std::shared_lock lock(rawVectorsGuard); + localRawRef = rawVectorsInRAM[job->vectorId]; // refcount++ +} +// Lock released, but data stays alive via localRawRef +// Use localRawRef->data() for graph insertion and disk write +``` diff --git a/src/VecSim/algorithms/hnsw/hnsw_disk.h b/src/VecSim/algorithms/hnsw/hnsw_disk.h index f00d2ebd4..df95d3020 100644 --- a/src/VecSim/algorithms/hnsw/hnsw_disk.h +++ b/src/VecSim/algorithms/hnsw/hnsw_disk.h @@ -13,6 +13,7 @@ #include "VecSim/memory/vecsim_malloc.h" #include "VecSim/utils/vecsim_stl.h" #include "VecSim/utils/vec_utils.h" +#include #include // #include "VecSim/containers/data_block.h" // #include "VecSim/containers/raw_data_container_interface.h" @@ -54,6 +55,11 @@ // #include // #include #include +#include +#include + +// Forward declaration for AsyncJob +#include "VecSim/vec_sim_tiered_index.h" using std::pair; @@ -119,6 +125,35 @@ struct GraphKey { #pragma pack() +////////////////////////////////////// HNSW Disk Job Structures ////////////////////////////////////// + +/** + * Forward declaration of HNSWDiskIndex for job structures. + */ +template +class HNSWDiskIndex; + +/** + * Definition of the job that inserts a single vector completely from start to end. + * Each job is self-contained and writes directly to disk upon completion. + * No batching or staging - optimized for workloads where disk write is cheap + * but reading (searching for neighbors) is the bottleneck. + * + * The job holds copies of the vector data to avoid external references and race conditions. + */ +struct HNSWDiskSingleInsertJob : public AsyncJob { + idType vectorId; + size_t elementMaxLevel; + // No vector data stored - looked up from index when job executes + // This saves memory: 100M pending jobs don't need 100M vector copies + + HNSWDiskSingleInsertJob(std::shared_ptr allocator, idType vectorId_, + size_t elementMaxLevel_, JobCallback insertCb, + VecSimIndex *index_) + : AsyncJob(allocator, HNSW_DISK_SINGLE_INSERT_JOB, insertCb, index_), + vectorId(vectorId_), elementMaxLevel(elementMaxLevel_) {} +}; + //////////////////////////////////// HNSW index implementation //////////////////////////////////// template @@ -149,8 +184,9 @@ class HNSWDiskIndex : public VecSimIndexAbstract mutable std::default_random_engine levelGenerator; // multithreaded scenario. - size_t curElementCount; + std::atomic curElementCount; size_t numMarkedDeleted; + idType entrypointNode; size_t maxLevel; // this is the top level of the entry point's element @@ -166,12 +202,6 @@ class HNSWDiskIndex : public VecSimIndexAbstract mutable std::shared_mutex indexDataGuard; mutable VisitedNodesHandlerPool visitedNodesHandlerPool; - // Batch processing state - size_t batchThreshold; // Number of vectors to accumulate before batch update - vecsim_stl::vector pendingVectorIds; // Vector IDs waiting to be indexed - vecsim_stl::vector pendingMetadata; // Metadata for pending vectors - size_t pendingVectorCount; // Count of vectors in memory - /** * Threshold for batching delete operations. * When the number of pending deletions reaches this value, the deletions are processed in a batch. @@ -245,9 +275,82 @@ class HNSWDiskIndex : public VecSimIndexAbstract vecsim_stl::vector stagedInsertNeighborUpdates; // Temporary storage for raw vectors in RAM (until flush batch) - // Maps idType -> raw vector data (stored as string for simplicity) - std::unordered_map rawVectorsInRAM; + // Maps idType -> raw vector data (using shared_ptr to avoid copying in job execution) + // When a job executes, it just increments refcount instead of copying the entire vector + std::unordered_map> rawVectorsInRAM; + + + /********************************** Multi-threading Support **********************************/ + + // Job queue parameters (similar to tiered index) + void *jobQueue = nullptr; + void *jobQueueCtx = nullptr; + SubmitCB SubmitJobsToQueue = nullptr; + + // Lock for protecting staged updates during merge from parallel insert jobs + // Uses shared_mutex for better read concurrency - multiple readers can access simultaneously + mutable std::shared_mutex stagedUpdatesGuard; + + // Lock for protecting vectors container during concurrent access + // Needed because addElement can resize the container, invalidating pointers + mutable std::shared_mutex vectorsGuard; + // Note: metadataGuard was consolidated into indexDataGuard + // indexDataGuard now protects: entrypointNode, maxLevel, idToMetaData, labelToIdMap + + // Lock for protecting rawVectorsInRAM during concurrent access + // Needed because unordered_map can rehash during insert, invalidating iterators + mutable std::shared_mutex rawVectorsGuard; + + + // Segmented neighbor cache for reduced lock contention in multi-threaded scenarios + // Cache is partitioned into NUM_CACHE_SEGMENTS independent segments + // Each segment has its own lock, cache map, and dirty set + // This allows threads accessing different segments to proceed in parallel + // Tradeoff: Increased memory usage for NUM_CACHE_SEGMENTS * (cache map + dirty set) + + static constexpr size_t NUM_CACHE_SEGMENTS = 64; // Power of 2 for efficient modulo + + // Cache segment structure - each segment is cache-line aligned to prevent false sharing + struct alignas(64) CacheSegment { + std::shared_mutex guard; + std::unordered_map> cache; + std::unordered_set dirty; + // Track nodes created in current batch (never written to disk yet) + // This helps avoid disk lookups for new nodes + std::unordered_set newNodes; + + CacheSegment() = default; + CacheSegment(const CacheSegment&) = delete; + CacheSegment& operator=(const CacheSegment&) = delete; + CacheSegment(CacheSegment&&) = delete; + CacheSegment& operator=(CacheSegment&&) = delete; + }; + + // Array of cache segments - using unique_ptr for lazy initialization + mutable std::unique_ptr cacheSegments_; + + // Atomic counter for total dirty nodes (for fast threshold check without locking) + mutable std::atomic totalDirtyCount_{0}; + + // Helper function to compute segment index from cache key + // Uses mixing function for better distribution + static size_t getSegmentIndex(uint64_t key) { + // Mix the bits for better distribution (splitmix64-style mixing) + key ^= key >> 33; + key *= 0xff51afd7ed558ccdULL; + key ^= key >> 33; + return key % NUM_CACHE_SEGMENTS; + } + + // Threshold for flushing dirty nodes to disk (0 = flush after each insert, default = batch) + size_t diskWriteBatchThreshold = 1000; + + // Lock for protecting dirty nodes flush operations (global flush serialization) + mutable std::mutex diskWriteGuard; + + // Atomic counter for pending single insert jobs (batchless mode) + std::atomic pendingSingleInsertJobs_{0}; protected: HNSWDiskIndex() = delete; // default constructor is disabled. @@ -275,18 +378,53 @@ class HNSWDiskIndex : public VecSimIndexAbstract public: // Core vector addition methods - void insertElementToGraph(idType element_id, size_t element_max_level, idType entry_point, - size_t global_max_level, const void *raw_vector_data, const void *vector_data); + void insertElementToGraph(idType element_id, size_t element_max_level, + idType entry_point, size_t global_max_level, + const void *raw_vector_data, const void *vector_data, + vecsim_stl::vector &modifiedNodes); idType mutuallyConnectNewElement(idType new_node_id, - vecsim_stl::updatable_max_heap &top_candidates, size_t level); - - // Batch processing methods - void processBatch(); - void flushBatch(); // Force flush current batch + vecsim_stl::updatable_max_heap &top_candidates, + size_t level, vecsim_stl::vector &modifiedNodes); + // Delete batch processing methods void processDeleteBatch(); void flushDeleteBatch(); // Force flush current delete batch - void setBatchThreshold(size_t threshold); // Set batch threshold + + // Job submission helpers + void submitSingleJob(AsyncJob *job); + void submitJobs(vecsim_stl::vector &jobs); + + // Job execution + static void executeSingleInsertJobWrapper(AsyncJob *job); + void executeSingleInsertJob(HNSWDiskSingleInsertJob *job); + + // Cache management methods + void addNeighborToCache(idType nodeId, size_t level, idType newNeighborId); + void setNeighborsInCache(idType nodeId, size_t level, const vecsim_stl::vector &neighbors, bool isNewNode = false); + void getNeighborsFromCache(idType nodeId, size_t level, vecsim_stl::vector &result) const; + // Atomic check-and-add for cache - returns true if added, false if at capacity + bool tryAddNeighborToCacheIfCapacity(idType nodeId, size_t level, idType newNeighborId, size_t maxCapacity); + +private: + // Helper to load neighbors from disk into cache if not already present + // Returns true if data is available in cache after call, false otherwise + // Caller must hold lock on cacheSegment.guard (will be temporarily released during disk I/O) + void loadNeighborsFromDiskIfNeeded(uint64_t key, CacheSegment& cacheSegment, + std::unique_lock& lock); + + // Write dirty cache entries to disk + void writeDirtyNodesToDisk(const vecsim_stl::vector &modifiedNodes, + const void *newVectorRawData, idType newVectorId); + + // Unified core function for graph insertion - used by both single-threaded and multi-threaded paths + // Batching is controlled by diskWriteBatchThreshold (0 = no batching, >0 = batch size) + void executeGraphInsertionCore(idType vectorId, size_t elementMaxLevel, + idType entryPoint, size_t globalMaxLevel, + const void *rawVectorData, const void *processedVectorData); + + // Helper to write a single vector's entry to disk + void writeVectorToDisk(idType vectorId, const void *rawVectorData, + const vecsim_stl::vector &neighbors); // Helper methods void emplaceHeap(vecsim_stl::abstract_priority_queue &heap, DistType dist, @@ -294,13 +432,10 @@ class HNSWDiskIndex : public VecSimIndexAbstract void emplaceHeap(vecsim_stl::abstract_priority_queue &heap, DistType dist, idType id) const; void getNeighbors(idType nodeId, size_t level, vecsim_stl::vector& result) const; - void getNeighborsAndVector(idType nodeId, size_t level, vecsim_stl::vector& result, void* vector_data) const; - void getNeighborsAndVector(labelType nodeId, size_t level, vecsim_stl::vector& result, void* vector_data) const; size_t getNeighborsCount(idType nodeId, size_t level) const; - void searchPendingVectors(const void* query_data, candidatesLabelsMaxHeap& top_candidates, size_t k) const; - // Manual control of staged updates - void flushStagedUpdates(); // Manually flush any pending staged updates + // Manual control of staged delete updates + void flushStagedDeleteUpdates(); // Manually flush any pending staged delete updates protected: // Helper method to filter deleted or invalid nodes from a neighbor list (DRY principle) @@ -308,8 +443,16 @@ class HNSWDiskIndex : public VecSimIndexAbstract // Filters out: nodes marked as deleted, and nodes with invalid IDs (>= curElementCount) inline bool filterDeletedNodes(vecsim_stl::vector& neighbors) const { size_t original_size = neighbors.size(); + size_t elementCount = curElementCount.load(std::memory_order_acquire); + // Hold shared lock to prevent idToMetaData resize during access + std::shared_lock lock(indexDataGuard); + size_t metadataSize = idToMetaData.size(); auto new_end = std::remove_if(neighbors.begin(), neighbors.end(), - [this](idType id) { return id >= curElementCount || isMarkedDeleted(id); }); + [this, elementCount, metadataSize](idType id) { + // Use lock-free check: bounds check first, then atomic flag read + return id >= elementCount || id >= metadataSize || + isMarkedAsUnsafe(id); + }); neighbors.erase(new_end, neighbors.end()); return neighbors.size() < original_size; } @@ -323,10 +466,11 @@ class HNSWDiskIndex : public VecSimIndexAbstract void flushStagedGraphUpdates(vecsim_stl::vector& graphUpdates, vecsim_stl::vector& neighborUpdates); - // New method for handling neighbor connection updates when neighbor lists are full - void stageRevisitNeighborConnections(idType new_node_id, idType selected_neighbor, - size_t level, DistType distance); - + // Re-evaluate neighbor connections when a neighbor's list is full + // Applies heuristic to select best neighbors including the new node + void revisitNeighborConnections(idType new_node_id, idType selected_neighbor, + size_t level, DistType distance, + vecsim_stl::vector &modifiedNodes); public: // Methods needed by benchmark framework @@ -344,6 +488,7 @@ class HNSWDiskIndex : public VecSimIndexAbstract candidatesLabelsMaxHeap *getNewMaxPriorityQueue() const; bool isMarkedDeleted(idType id) const; bool isMarkedDeleted(labelType id) const; + bool isMarkedDeletedUnsafe(idType id) const; // Lock-free version for hot paths labelType getExternalLabel(idType id) const; // Helper methods for emplacing to heaps (overloaded for idType and labelType) @@ -353,7 +498,7 @@ class HNSWDiskIndex : public VecSimIndexAbstract DistType dist, idType id) const; template - void processCandidate(Identifier candidate_id, const void *data_point, size_t level, size_t ef, + void processCandidate(idType curNodeId, const void *data_point, size_t level, size_t ef, std::unordered_set *visited_set, vecsim_stl::updatable_max_heap &top_candidates, candidatesMaxHeap &candidate_set, DistType &lowerBound) const; @@ -366,6 +511,8 @@ class HNSWDiskIndex : public VecSimIndexAbstract const void* query_data, size_t k) const; protected: + // Internal version that assumes caller already holds the lock (or is inside a locked section) + bool getRawVectorInternal(idType id, void* output_buffer) const; idType searchBottomLayerEP(const void *query_data, void *timeoutCtx = nullptr, VecSimQueryReply_Code *rc = nullptr) const; @@ -374,7 +521,8 @@ class HNSWDiskIndex : public VecSimIndexAbstract HNSWDiskIndex(const HNSWParams *params, const AbstractIndexInitParams &abstractInitParams, const IndexComponents &components, rocksdb::DB *db, rocksdb::ColumnFamilyHandle *cf, const std::string &dbPath = "", - size_t random_seed = 100); + size_t random_seed = 100, void *jobQueue = nullptr, void *jobQueueCtx = nullptr, + SubmitCB submitCb = nullptr); virtual ~HNSWDiskIndex(); /*************************** Index API ***************************/ @@ -434,14 +582,24 @@ class HNSWDiskIndex : public VecSimIndexAbstract // Flagging API template void markAs(idType internalId) { + std::shared_lock lock(indexDataGuard); __atomic_fetch_or(&idToMetaData[internalId].flags, FLAG, 0); } template void unmarkAs(idType internalId) { + std::shared_lock lock(indexDataGuard); __atomic_fetch_and(&idToMetaData[internalId].flags, ~FLAG, 0); } template bool isMarkedAs(idType internalId) const { + std::shared_lock lock(indexDataGuard); + return __atomic_load_n(&idToMetaData[internalId].flags, 0) & FLAG; + } + + // Lock-free version for hot paths where we know the ID is valid and in bounds + // (caller must ensure internalId < idToMetaData.size()) + template + bool isMarkedAsUnsafe(idType internalId) const { return __atomic_load_n(&idToMetaData[internalId].flags, 0) & FLAG; } @@ -454,6 +612,19 @@ class HNSWDiskIndex : public VecSimIndexAbstract size_t getDeleteBatchThreshold() const { return deleteBatchThreshold; } size_t getPendingDeleteCount() const { return pendingDeleteIds.size(); } + // Disk write batching control + void setDiskWriteBatchThreshold(size_t threshold) { diskWriteBatchThreshold = threshold; } + size_t getDiskWriteBatchThreshold() const { return diskWriteBatchThreshold; } + size_t getPendingDiskWriteCount() const { return totalDirtyCount_.load(std::memory_order_relaxed); } + void flushDirtyNodesToDisk(); // Flush all pending dirty nodes to disk + + // Job queue configuration (for multi-threaded processing) + void setJobQueue(void *jobQueue_, void *jobQueueCtx_, SubmitCB submitCb_) { + jobQueue = jobQueue_; + jobQueueCtx = jobQueueCtx_; + SubmitJobsToQueue = submitCb_; + } + // Debug methods to inspect graph structure void debugPrintGraphStructure() const; void debugPrintNodeNeighbors(idType node_id) const; @@ -507,17 +678,18 @@ template HNSWDiskIndex::HNSWDiskIndex( const HNSWParams *params, const AbstractIndexInitParams &abstractInitParams, const IndexComponents &components, rocksdb::DB *db, - rocksdb::ColumnFamilyHandle *cf, const std::string &dbPath, size_t random_seed) + rocksdb::ColumnFamilyHandle *cf, const std::string &dbPath, size_t random_seed, + void *jobQueue_, void *jobQueueCtx_, SubmitCB submitCb_) : VecSimIndexAbstract(abstractInitParams, components), idToMetaData(INITIAL_CAPACITY, this->allocator), labelToIdMap(this->allocator), db(db), cf(cf), dbPath(dbPath), indexDataGuard(), visitedNodesHandlerPool(INITIAL_CAPACITY, this->allocator), - batchThreshold(10), - pendingVectorIds(this->allocator), pendingMetadata(this->allocator), pendingVectorCount(0), pendingDeleteIds(this->allocator), num_visited_nodes(0), num_visited_nodes_higher_levels(0), stagedInsertUpdates(this->allocator), stagedDeleteUpdates(this->allocator), stagedRepairUpdates(this->allocator), - stagedInsertNeighborUpdates(this->allocator) { + stagedInsertNeighborUpdates(this->allocator), + jobQueue(jobQueue_), jobQueueCtx(jobQueueCtx_), SubmitJobsToQueue(submitCb_), + cacheSegments_(new CacheSegment[NUM_CACHE_SEGMENTS]) { M = params->M ? params->M : HNSW_DEFAULT_M; M0 = M * 2; @@ -531,10 +703,8 @@ HNSWDiskIndex::HNSWDiskIndex( dbOptions = db->GetOptions(); curElementCount = 0; numMarkedDeleted = 0; - - // initializations for special treatment of the first node entrypointNode = INVALID_ID; - maxLevel = HNSW_INVALID_LEVEL; + maxLevel = 0; if (M <= 1) throw std::runtime_error("HNSW index parameter M cannot be 1"); @@ -554,8 +724,6 @@ HNSWDiskIndex::~HNSWDiskIndex() { stagedInsertNeighborUpdates.clear(); // Clear pending vectors - pendingVectorIds.clear(); - pendingMetadata.clear(); pendingDeleteIds.clear(); // Clear raw vectors in RAM @@ -581,7 +749,8 @@ HNSWDiskIndex::topKQuery(const void *query_data, size_t k, auto rep = new VecSimQueryReply(this->allocator); this->lastMode = STANDARD_KNN; - if ((curElementCount == 0 && pendingVectorCount == 0) || k == 0) { + // Check if index is empty + if (curElementCount.load(std::memory_order_acquire) == 0 || k == 0) { return rep; } @@ -617,11 +786,6 @@ HNSWDiskIndex::topKQuery(const void *query_data, size_t k, // Step 2: Search bottom layer using quantized distances auto results = searchLayerLabels(bottom_layer_ep, processed_query, 0, query_ef); - if (pendingVectorCount > 0) { - // Search pending vectors using the helper method - searchPendingVectors(query_data, results, k); - } - // Step 3: Re-rank candidates using raw float32 distances for improved recall if (useRawData && !results.empty()) { rerankWithRawDistances(results, query_data, k); @@ -761,145 +925,104 @@ int HNSWDiskIndex::addVector( const void *vector, labelType label ) { - // Store raw vector in RAM first (until flush batch) + // Atomically get a unique element ID + // fetch_add returns the OLD value before incrementing, giving us a unique ID + idType newElementId = static_cast(curElementCount.fetch_add(1, std::memory_order_acq_rel)); + + // Store raw vector in RAM first (until written to disk) // We need to store the original vector before preprocessing - idType newElementId = curElementCount; + // NOTE: In batchless mode, we still use rawVectorsInRAM so other concurrent jobs can access + // the raw vectors of vectors that haven't been written to disk yet + // Using shared_ptr so job execution can just increment refcount instead of copying const char* raw_data = reinterpret_cast(vector); - rawVectorsInRAM[newElementId] = std::string(raw_data, this->inputBlobSize); + auto rawVectorPtr = std::make_shared(raw_data, this->inputBlobSize); + { + std::lock_guard lock(rawVectorsGuard); + rawVectorsInRAM[newElementId] = rawVectorPtr; + } // Preprocess the vector ProcessedBlobs processedBlobs = this->preprocess(vector); - // Store the processed vector in memory - size_t containerId = this->vectors->size(); - this->vectors->addElement(processedBlobs.getStorageBlob(), containerId); + // Store the processed vector in memory (protected by vectorsGuard) + { + std::lock_guard lock(vectorsGuard); + size_t containerId = this->vectors->size(); + this->vectors->addElement(processedBlobs.getStorageBlob(), containerId); + } // Create new element ID and metadata size_t elementMaxLevel = getRandomLevel(mult); DiskElementMetaData new_element(label, elementMaxLevel); - // Ensure capacity for the new element ID - if (newElementId >= indexCapacity()) { - size_t new_cap = ((newElementId + this->blockSize) / this->blockSize) * this->blockSize; - visitedNodesHandlerPool.resize(new_cap); - idToMetaData.resize(new_cap); - labelToIdMap.reserve(new_cap); - } - - // Store metadata immediately - idToMetaData[newElementId] = new_element; - labelToIdMap[label] = newElementId; - - // Increment vector count immediately - curElementCount++; - - - // Resize visited nodes handler pool to accommodate new elements - visitedNodesHandlerPool.resize(curElementCount); - - // Add only the vector ID to pending vectors for indexing - pendingVectorIds.push_back(newElementId); - pendingVectorCount++; + // Ensure capacity for the new element ID (protected by indexDataGuard) + { + std::lock_guard lock(indexDataGuard); + if (newElementId >= indexCapacity()) { + size_t new_cap = ((newElementId + this->blockSize) / this->blockSize) * this->blockSize; + visitedNodesHandlerPool.resize(new_cap); + idToMetaData.resize(new_cap); + labelToIdMap.reserve(new_cap); + } - // Process batch if threshold reached - if (pendingVectorCount >= batchThreshold) { - processBatch(); + // Store metadata immediately + idToMetaData[newElementId] = new_element; + labelToIdMap[label] = newElementId; } - return 1; // Success -} - -template -void HNSWDiskIndex::insertElementToGraph(idType element_id, - size_t element_max_level, - idType entry_point, - size_t global_max_level, - const void *raw_vector_data, - const void *vector_data) { - - idType curr_element = entry_point; - DistType cur_dist = std::numeric_limits::max(); - size_t max_common_level; - if (element_max_level < global_max_level) { - max_common_level = element_max_level; - cur_dist = this->calcDistance(vector_data, getDataByInternalId(curr_element)); - for (auto level = static_cast(global_max_level); - level > static_cast(element_max_level); level--) { - // this is done for the levels which are above the max level - // to which we are going to insert the new element. We do - // a greedy search in the graph starting from the entry point - // at each level, and move on with the closest element we can find. - // When there is no improvement to do, we take a step down. - greedySearchLevel(vector_data, level, curr_element, cur_dist); + // Resize visited nodes handler pool to accommodate new elements + // Use load() to read atomic value + visitedNodesHandlerPool.resize(curElementCount.load(std::memory_order_acquire)); + + // Each vector is processed immediately and written to disk + // Get entry point info + idType currentEntryPoint; + size_t currentMaxLevel; + { + std::shared_lock lock(indexDataGuard); + currentEntryPoint = entrypointNode; + currentMaxLevel = maxLevel; + } + + // Handle first vector (becomes entry point) + if (currentEntryPoint == INVALID_ID) { + std::unique_lock lock(indexDataGuard); + if (entrypointNode == INVALID_ID) { + entrypointNode = newElementId; + maxLevel = elementMaxLevel; } - } else { - max_common_level = global_max_level; - } - - for (auto level = static_cast(max_common_level); level >= 0; level--) { - vecsim_stl::updatable_max_heap top_candidates = - searchLayer(curr_element, vector_data, level, efConstruction); - - // If the entry point was marked deleted between iterations, we may receive an empty - // candidates set. - if (!top_candidates.empty()) { - curr_element = mutuallyConnectNewElement(element_id, top_candidates, level); - } else { - this->log(VecSimCommonStrings::LOG_WARNING_STRING, - "WARNING: No candidates found at level %d!", level); + // Write initial vector to disk with empty neighbors + vecsim_stl::vector emptyNeighbors(this->allocator); + for (size_t level = 0; level <= elementMaxLevel; level++) { + GraphKey graphKey(newElementId, level); + std::string value = serializeGraphValue(vector, emptyNeighbors); + auto writeOptions = rocksdb::WriteOptions(); + writeOptions.disableWAL = true; + db->Put(writeOptions, cf, graphKey.asSlice(), value); } + // Remove raw vector from RAM now that it's on disk + { + std::lock_guard rawLock(rawVectorsGuard); + rawVectorsInRAM.erase(newElementId); + } + return 1; } -} - -template -idType HNSWDiskIndex::mutuallyConnectNewElement( - idType new_node_id, vecsim_stl::updatable_max_heap &top_candidates, size_t level) { - - // The maximum number of neighbors allowed for an existing neighbor (not new). - size_t max_M_cur = level ? M : M0; - - // Filter the top candidates to the selected neighbors by the algorithm heuristics. - // First, we need to copy the top candidates to a vector. - candidatesList top_candidates_list(this->allocator); - top_candidates_list.insert(top_candidates_list.end(), top_candidates.begin(), - top_candidates.end()); - // Use the heuristic to filter the top candidates, and get the next closest entry point. - idType next_closest_entry_point = getNeighborsByHeuristic2(top_candidates_list, M); - assert(top_candidates_list.size() <= M && - "Should be not be more than M candidates returned by the heuristic"); - - // Instead of writing to disk immediately, stage the updates in memory - // Stage the new node's neighbors - vecsim_stl::vector neighbor_ids(this->allocator); - neighbor_ids.reserve(top_candidates_list.size()); - for (size_t i = 0; i < top_candidates_list.size(); ++i) { - neighbor_ids.push_back(top_candidates_list[i].second); - } - - // Add to staged graph updates (for insertions) - uint64_t insert_key = makeRepairKey(new_node_id, level); - stagedInsertMap[insert_key] = stagedInsertUpdates.size(); - stagedInsertUpdates.emplace_back(new_node_id, level, neighbor_ids, this->allocator); - - // Stage updates to existing nodes to include the new node in their neighbor lists - for (const auto &neighbor_data : top_candidates_list) { - idType selected_neighbor = neighbor_data.second; - DistType distance = neighbor_data.first; - // Check if the neighbor's neighbor list has capacity - // Use the helper function to get the current neighbor count (checks staged updates too) - size_t current_neighbor_count = getNeighborsCount(selected_neighbor, level); + // Check if we have a job queue for async processing + if (SubmitJobsToQueue != nullptr) { + // Multi-threaded: submit job for async processing + // No vector copies in job - job will look up from rawVectorsInRAM and this->vectors + auto *job = new (this->allocator) HNSWDiskSingleInsertJob( + this->allocator, newElementId, elementMaxLevel, + HNSWDiskIndex::executeSingleInsertJobWrapper, this); - if (current_neighbor_count < max_M_cur) { - // Neighbor has capacity, just add the new node - stagedInsertNeighborUpdates.emplace_back(selected_neighbor, level, new_node_id); - } else { - // Neighbor is full, need to re-evaluate connections using revisitNeighborConnections - // logic - stageRevisitNeighborConnections(new_node_id, selected_neighbor, level, distance); - } + submitSingleJob(job); + } else { + // Single-threaded: execute inline (batching controlled by diskWriteBatchThreshold) + executeGraphInsertionCore(newElementId, elementMaxLevel, currentEntryPoint, + currentMaxLevel, vector, processedBlobs.getStorageBlob()); } - return next_closest_entry_point; + return 1; } template @@ -928,9 +1051,9 @@ void HNSWDiskIndex::flushStagedGraphUpdates( continue; } - // Get raw vector data - - if (!getRawVector(update.node_id, raw_vector_buffer.data())) { + // Get raw vector data (use internal version since we already hold the lock or + // are being called from a context where locking is managed by the caller) + if (!getRawVectorInternal(update.node_id, raw_vector_buffer.data())) { this->log(VecSimCommonStrings::LOG_WARNING_STRING, "WARNING: Skipping graph update for node %u at level %zu - no raw vector data available", update.node_id, update.level); @@ -997,7 +1120,7 @@ void HNSWDiskIndex::flushStagedGraphUpdates( } } - getRawVector(node_id, raw_vector_buffer.data()); + getRawVectorInternal(node_id, raw_vector_buffer.data()); // Serialize with new format and add to batch std::string graph_value = serializeGraphValue(raw_vector_buffer.data(), updated_neighbors); @@ -1018,28 +1141,12 @@ void HNSWDiskIndex::flushStagedGraphUpdates( } template -void HNSWDiskIndex::stageRevisitNeighborConnections(idType new_node_id, - idType selected_neighbor, - size_t level, - DistType distance) { - // Read the neighbor's current neighbor list from disk - // TODO: perhaps cache the neigbhors for stage update - GraphKey neighborKey(selected_neighbor, level); - std::string graph_value; - rocksdb::Status status = db->Get(rocksdb::ReadOptions(), cf, neighborKey.asSlice(), &graph_value); - - if (!status.ok()) { - this->log(VecSimCommonStrings::LOG_WARNING_STRING, - " WARNING: Could not read existing neighbors for node %u at level %zu", - selected_neighbor, level); - // Fall back to simple neighbor update - stagedInsertNeighborUpdates.emplace_back(selected_neighbor, level, new_node_id); - return; - } - - // Parse existing neighbors using new format +void HNSWDiskIndex::revisitNeighborConnections( + idType new_node_id, idType selected_neighbor, size_t level, DistType distance, + vecsim_stl::vector &modifiedNodes) { + // Read the neighbor's current neighbor list from cache (source of truth) vecsim_stl::vector existing_neighbors(this->allocator); - deserializeGraphValue(graph_value, existing_neighbors); + getNeighborsFromCache(selected_neighbor, level, existing_neighbors); // Collect all candidates: existing neighbors + new node candidatesList candidates(this->allocator); @@ -1049,56 +1156,28 @@ void HNSWDiskIndex::stageRevisitNeighborConnections(idType n candidates.emplace_back(distance, new_node_id); // Add existing neighbors with their distances to the selected neighbor - const void* selected_neighbor_data = getDataByInternalId(selected_neighbor); - for (size_t j = 0; j < existing_neighbors.size(); j++) { - idType existing_neighbor_id = existing_neighbors[j]; - const void *existing_neighbor_data = getDataByInternalId(existing_neighbor_id); - DistType existing_distance = - this->calcDistance(existing_neighbor_data, selected_neighbor_data); - candidates.emplace_back(existing_distance, existing_neighbor_id); + const void *selected_neighbor_data = getDataByInternalId(selected_neighbor); + for (idType existing_neighbor_id : existing_neighbors) { + const void *existing_data = getDataByInternalId(existing_neighbor_id); + DistType existing_dist = this->calcDistance(existing_data, selected_neighbor_data); + candidates.emplace_back(existing_dist, existing_neighbor_id); } - // Use the heuristic to select the best neighbors (similar to revisitNeighborConnections in - // hnsw.h) - size_t max_M_cur = level ? M : M0; - // Apply the neighbor selection heuristic + size_t max_M_cur = level ? M : M0; vecsim_stl::vector removed_candidates(this->allocator); getNeighborsByHeuristic2(candidates, max_M_cur, removed_candidates); - // Check if the new node was selected as a neighbor - bool new_node_selected = false; + // Extract selected neighbor IDs + vecsim_stl::vector new_neighbors(this->allocator); + new_neighbors.reserve(candidates.size()); for (const auto &candidate : candidates) { - if (candidate.second == new_node_id) { - new_node_selected = true; - break; - } + new_neighbors.push_back(candidate.second); } - if (new_node_selected) { - // The new node was selected, so we need to update the neighbor's neighbor list - // Extract the selected neighbor IDs - vecsim_stl::vector selected_neighbor_ids(this->allocator); - selected_neighbor_ids.reserve(candidates.size()); - for (const auto &candidate : candidates) { - selected_neighbor_ids.push_back(candidate.second); - } - - // Stage this update - the neighbor's neighbor list will be completely replaced - // We'll need to handle this specially in flushStagedGraphUpdates - uint64_t insert_key = makeRepairKey(selected_neighbor, level); - stagedInsertMap[insert_key] = stagedInsertUpdates.size(); - stagedInsertUpdates.emplace_back(selected_neighbor, level, selected_neighbor_ids, - this->allocator); - - // Also stage the bidirectional connection from new node to selected neighbor - stagedInsertNeighborUpdates.emplace_back(new_node_id, level, selected_neighbor); - - } else { - // The new node was not selected, so we only need to stage the unidirectional connection - // from new node to selected neighbor - stagedInsertNeighborUpdates.emplace_back(new_node_id, level, selected_neighbor); - } + // Update cache with the new neighbor list + setNeighborsInCache(selected_neighbor, level, new_neighbors); + modifiedNodes.push_back(makeRepairKey(selected_neighbor, level)); } template @@ -1106,10 +1185,9 @@ idType HNSWDiskIndex::searchBottomLayerEP(const void *query_ void *timeoutCtx, VecSimQueryReply_Code *rc) const { if (rc) *rc = VecSim_QueryReply_OK; - - // auto [curr_element, max_level] = safeGetEntryPointState(); - auto curr_element = entrypointNode; - auto max_level = maxLevel; + + // Use safeGetEntryPointState to read entry point with proper locking + auto [curr_element, max_level] = safeGetEntryPointState(); if (curr_element == INVALID_ID) return curr_element; // index is empty. @@ -1195,8 +1273,10 @@ const void* HNSWDiskIndex::getVectorFromGraphValue(const std template const void *HNSWDiskIndex::getDataByInternalId(idType id) const { - assert(id < curElementCount); + assert(id < curElementCount.load(std::memory_order_acquire)); + // Acquire shared lock to prevent concurrent resize of vectors container + std::shared_lock lock(vectorsGuard); const void* result = this->vectors->getElement(id); if (result != nullptr) { return result; @@ -1207,27 +1287,30 @@ const void *HNSWDiskIndex::getDataByInternalId(idType id) co template bool HNSWDiskIndex::getRawVector(idType id, void* output_buffer) const { - - if (id >= curElementCount) { + size_t elementCount = curElementCount.load(std::memory_order_acquire); + if (id >= elementCount) { this->log(VecSimCommonStrings::LOG_WARNING_STRING, "WARNING: getRawVector called with invalid id %u (current count: %zu)", - id, curElementCount); + id, elementCount); return false; } - // First check RAM (for vectors not yet flushed) - auto it = rawVectorsInRAM.find(id); - if (it != rawVectorsInRAM.end()) { - const char* data_ptr = it->second.data(); - std::memcpy(output_buffer, data_ptr, this->inputBlobSize); - return true; - + // First check RAM (for vectors not yet flushed) - protected by shared lock + { + std::shared_lock lock(rawVectorsGuard); + auto it = rawVectorsInRAM.find(id); + if (it != rawVectorsInRAM.end()) { + const char* data_ptr = it->second->data(); + std::memcpy(output_buffer, data_ptr, this->inputBlobSize); + return true; + } } - // If not in RAM or cache, retrieve from disk + // If not in RAM or cache, retrieve from disk (RocksDB is thread-safe) GraphKey graphKey(id, 0); std::string level0_graph_value; - rocksdb::Status status = db->Get(rocksdb::ReadOptions(), cf, graphKey.asSlice(), &level0_graph_value); + rocksdb::Status status = + db->Get(rocksdb::ReadOptions(), cf, graphKey.asSlice(), &level0_graph_value); if (status.ok()) { // Extract vector data @@ -1250,6 +1333,52 @@ bool HNSWDiskIndex::getRawVector(idType id, void* output_buf } +// Internal version for use during flush operations +template +bool HNSWDiskIndex::getRawVectorInternal(idType id, void* output_buffer) const { + size_t elementCount = curElementCount.load(std::memory_order_acquire); + if (id >= elementCount) { + this->log(VecSimCommonStrings::LOG_WARNING_STRING, + "WARNING: getRawVectorInternal called with invalid id %u (current count: %zu)", + id, elementCount); + return false; + } + + // First check RAM (for vectors not yet flushed) - protected by shared lock + { + std::shared_lock lock(rawVectorsGuard); + auto it = rawVectorsInRAM.find(id); + if (it != rawVectorsInRAM.end()) { + const char* data_ptr = it->second->data(); + std::memcpy(output_buffer, data_ptr, this->inputBlobSize); + return true; + } + } + + // If not in RAM, retrieve from disk (NO LOCK - caller is expected to hold lock) + GraphKey graphKey(id, 0); + std::string level0_graph_value; + rocksdb::Status status = db->Get(rocksdb::ReadOptions(), cf, graphKey.asSlice(), &level0_graph_value); + + if (status.ok()) { + // Extract vector data + const void* vector_data = getVectorFromGraphValue(level0_graph_value); + if (vector_data != nullptr) { + std::memcpy(output_buffer, vector_data, this->inputBlobSize); + return true; + } + } else if (status.IsNotFound()) { + this->log(VecSimCommonStrings::LOG_WARNING_STRING, + "WARNING: Raw vector not found in RAM or on disk for id %u (isMarkedDeleted: %d)", + id, isMarkedDeleted(id)); + } else { + this->log(VecSimCommonStrings::LOG_WARNING_STRING, + "WARNING: Failed to retrieve raw vector for id %u: %s", id, + status.ToString().c_str()); + } + return false; +} + template void HNSWDiskIndex::rerankWithRawDistances( vecsim_stl::updatable_max_heap& candidates, @@ -1368,7 +1497,9 @@ HNSWDiskIndex::searchLayerLabels(idType ep_id, const void *d } candidate_set.pop(); - processCandidate(getExternalLabel(curr_el_pair.second), data_point, layer, ef, + // Pass internal ID to processCandidate - getNeighbors expects idType, not labelType + // The emplaceHeap overload handles conversion to labelType for top_candidates + processCandidate(curr_el_pair.second, data_point, layer, ef, &visited_set, top_candidates, candidate_set, lowerBound); } @@ -1389,6 +1520,16 @@ bool HNSWDiskIndex::isMarkedDeleted(idType id) const { return id < idToMetaData.size() && isMarkedAs(id); } +// Lock-free version for hot paths - still needs bounds checking +template +bool HNSWDiskIndex::isMarkedDeletedUnsafe(idType id) const { + // Must check bounds even in unsafe version - accessing out of bounds is UB + if (id >= idToMetaData.size()) { + return true; // Treat out-of-bounds as deleted (won't be added to results) + } + return isMarkedAsUnsafe(id); +} + template bool HNSWDiskIndex::isMarkedDeleted(labelType id) const { auto it = labelToIdMap.find(id); @@ -1409,6 +1550,9 @@ template void HNSWDiskIndex::greedySearchLevel(const void *data_point, size_t level, idType &curr_element, DistType &cur_dist) const { + // Hold shared lock for entire search to prevent idToMetaData resize during isMarkedDeletedUnsafe calls + std::shared_lock indexLock(indexDataGuard); + bool changed; idType bestCand = curr_element; idType bestNonDeletedCand = bestCand; @@ -1417,20 +1561,15 @@ void HNSWDiskIndex::greedySearchLevel(const void *data_point do { changed = false; - // Read neighbors from RocksDB for the current node at this level - GraphKey graphKey(bestCand, level); - std::string graph_value; - rocksdb::Status status = db->Get(rocksdb::ReadOptions(), cf, graphKey.asSlice(), &graph_value); + // Read neighbors (checks staged updates first, then RocksDB) + vecsim_stl::vector neighbors(this->allocator); + getNeighbors(bestCand, level, neighbors); - if (!status.ok()) { + if (neighbors.empty()) { // No neighbors found for this node at this level, stop searching break; } - // Parse the neighbors using new format - vecsim_stl::vector neighbors(this->allocator); - deserializeGraphValue(graph_value, neighbors); - // Check each neighbor to find a better candidate for (size_t i = 0; i < neighbors.size(); i++) { idType candidate = neighbors[i]; @@ -1447,7 +1586,8 @@ void HNSWDiskIndex::greedySearchLevel(const void *data_point cur_dist = d; bestCand = candidate; changed = true; - if (!running_query && !isMarkedDeleted(candidate)) { + // Use lock-free version - candidate was already validated by getNeighbors + if (!running_query && !isMarkedDeletedUnsafe(candidate)) { bestNonDeletedCand = bestCand; } } @@ -1507,7 +1647,7 @@ size_t HNSWDiskIndex::getRandomLevel(double reverse_size) { template template void HNSWDiskIndex::processCandidate( - Identifier curNodeId, const void *data_point, size_t level, size_t ef, std::unordered_set *visited_set, + idType curNodeId, const void *data_point, size_t level, size_t ef, std::unordered_set *visited_set, vecsim_stl::updatable_max_heap &top_candidates, candidatesMaxHeap &candidate_set, DistType &lowerBound) const { assert(visited_set != nullptr); @@ -1520,13 +1660,13 @@ void HNSWDiskIndex::processCandidate( for (idType candidate_id : neighbors) { // Skip invalid neighbors - assert(candidate_id < curElementCount); + assert(candidate_id < curElementCount.load(std::memory_order_acquire)); if (visited_set->find(candidate_id) != visited_set->end()) { continue; } visited_set->insert(candidate_id); - // TODO: possibly use cached raw vectors + // TODO: possibly use cached raw vectors DistType cur_dist = this->calcDistance(data_point, getDataByInternalId(candidate_id)); if (lowerBound > cur_dist || top_candidates.size() < ef) { @@ -1534,8 +1674,9 @@ void HNSWDiskIndex::processCandidate( candidate_set.emplace(-cur_dist, candidate_id); // Insert the candidate to the top candidates heap only if it is not marked as - // deleted. - if (!isMarkedDeleted(candidate_id)) + // deleted. Use lock-free version since candidate_id was already bounds-checked + // by getNeighbors (which filters invalid IDs). + if (!isMarkedDeletedUnsafe(candidate_id)) emplaceHeap(top_candidates, cur_dist, candidate_id); if (top_candidates.size() > ef) @@ -1550,8 +1691,6 @@ void HNSWDiskIndex::processCandidate( } } - - template VecSimQueryReply * HNSWDiskIndex::rangeQuery(const void *query_data, double radius, @@ -1603,7 +1742,9 @@ size_t HNSWDiskIndex::indexCapacity() const { template size_t HNSWDiskIndex::indexSize() const { - return this->curElementCount - this->numMarkedDeleted; + // Return active element count (curElementCount never decreases in disk mode, + // so we subtract numMarkedDeleted to get the actual number of active elements) + return curElementCount.load(std::memory_order_acquire) - this->numMarkedDeleted; } template @@ -1633,243 +1774,130 @@ void HNSWDiskIndex::getNeighbors(idType nodeId, size_t level // Clear the result vector first result.clear(); - // First check staged graph updates using hash maps for O(1) lookup uint64_t lookup_key = makeRepairKey(nodeId, level); - - // Check insert staging area - auto insert_it = stagedInsertMap.find(lookup_key); - if (insert_it != stagedInsertMap.end()) { - const auto &update = stagedInsertUpdates[insert_it->second]; - result.reserve(update.neighbors.size()); - for (size_t i = 0; i < update.neighbors.size(); i++) { - result.push_back(update.neighbors[i]); - } - // Filter out deleted nodes using helper - filterDeletedNodes(result); - return; - } - - // Check delete staging area - auto delete_it = stagedDeleteMap.find(lookup_key); - if (delete_it != stagedDeleteMap.end()) { - const auto &update = stagedDeleteUpdates[delete_it->second]; - result.reserve(update.neighbors.size()); - for (size_t i = 0; i < update.neighbors.size(); i++) { - result.push_back(update.neighbors[i]); - } - // Filter out deleted nodes using helper - filterDeletedNodes(result); - return; - } - - // Also check staged repair updates (already cleaned neighbors waiting to be flushed) - auto repair_it = stagedRepairMap.find(lookup_key); - if (repair_it != stagedRepairMap.end()) { - auto &update = stagedRepairUpdates[repair_it->second]; - result.reserve(update.neighbors.size()); - for (size_t i = 0; i < update.neighbors.size(); i++) { - result.push_back(update.neighbors[i]); - } - // Filter in case nodes were deleted after this repair was staged - if (filterDeletedNodes(result)) { - // Update the existing repair entry with the more up-to-date cleaned list - update.neighbors = result; - } - return; - } - - // If not found in staged updates, check disk - GraphKey graphKey(nodeId, level); - - std::string graph_value; - rocksdb::Status status = db->Get(rocksdb::ReadOptions(), cf, graphKey.asSlice(), &graph_value); - - if (status.ok()) { - deserializeGraphValue(graph_value, result); - // Filter out deleted nodes and check if any were filtered - if (filterDeletedNodes(result)) { - // Lazy repair: if we filtered any deleted nodes, stage for cleanup - // Use hash map for O(1) duplicate detection - uint64_t repair_key = makeRepairKey(nodeId, level); - if (stagedRepairMap.find(repair_key) == stagedRepairMap.end()) { - stagedRepairMap[repair_key] = stagedRepairUpdates.size(); - stagedRepairUpdates.emplace_back(nodeId, level, result, this->allocator); + bool foundInCache = false; + + // Step 1: Check cache first (source of truth for pending updates) + { + size_t segment = getSegmentIndex(lookup_key); + auto& cacheSegment = cacheSegments_[segment]; + std::shared_lock cacheLock(cacheSegment.guard); + auto it = cacheSegment.cache.find(lookup_key); + if (it != cacheSegment.cache.end()) { + result.reserve(it->second.size()); + for (idType id : it->second) { + result.push_back(id); } + foundInCache = true; + } else if (cacheSegment.newNodes.find(lookup_key) != cacheSegment.newNodes.end()) { + // New node not yet connected - return empty + foundInCache = true; // Treat as found (empty is valid) } } -} - -template -void HNSWDiskIndex::getNeighborsAndVector(idType nodeId, size_t level, vecsim_stl::vector& result, void* vector_data) const { - // Clear the result vector first - result.clear(); - // First check staged graph updates - for (const auto& update : stagedInsertUpdates) { - if (update.node_id == nodeId && update.level == level) { + // Step 2: Check delete staging area (for deletion operations) + if (!foundInCache) { + std::shared_lock lock(stagedUpdatesGuard); + auto delete_it = stagedDeleteMap.find(lookup_key); + if (delete_it != stagedDeleteMap.end()) { + const auto &update = stagedDeleteUpdates[delete_it->second]; result.reserve(update.neighbors.size()); for (size_t i = 0; i < update.neighbors.size(); i++) { result.push_back(update.neighbors[i]); } + foundInCache = true; + } + + // Also check staged repair updates (already cleaned neighbors waiting to be flushed) + if (!foundInCache) { + auto repair_it = stagedRepairMap.find(lookup_key); + if (repair_it != stagedRepairMap.end()) { + const auto &update = stagedRepairUpdates[repair_it->second]; + result.reserve(update.neighbors.size()); + for (size_t i = 0; i < update.neighbors.size(); i++) { + result.push_back(update.neighbors[i]); + } + foundInCache = true; + } } } - auto it = rawVectorsInRAM.find(nodeId); - if (it != rawVectorsInRAM.end()) { - std::memcpy(vector_data, it->second.data(), this->inputBlobSize); - } - if (!result.empty() && it != rawVectorsInRAM.end()) { - return; - } - // If not found in staged updates, check disk - GraphKey graphKey(nodeId, level); - std::string graph_value; - rocksdb::Status status = db->Get(rocksdb::ReadOptions(), cf, graphKey.asSlice(), &graph_value); + // Step 3: If not found in cache or staging, read from disk + if (!foundInCache) { + GraphKey graphKey(nodeId, level); + std::string graph_value; + rocksdb::Status status = + db->Get(rocksdb::ReadOptions(), cf, graphKey.asSlice(), &graph_value); - if (status.ok()) { - deserializeGraphValue(graph_value, result); - std::memcpy(vector_data, graph_value.data(), this->inputBlobSize); + if (status.ok()) { + deserializeGraphValue(graph_value, result); + } } -} -template -void HNSWDiskIndex::getNeighborsAndVector(labelType nodeId, size_t level, vecsim_stl::vector& result, void* vector_data) const { - // Check if label exists in the map (it won't if it's been marked as deleted) - auto it = labelToIdMap.find(nodeId); - if (it == labelToIdMap.end()) { - // Label doesn't exist (has been marked as deleted), return empty neighbors - result.clear(); - return; - } - getNeighborsAndVector(it->second, level, result, vector_data); + // Note: We do NOT filter deleted nodes here. During search, we need to explore + // through deleted nodes to find non-deleted neighbors. The filtering happens in + // processCandidate when adding to top_candidates (via isMarkedDeletedUnsafe check). + // + // Lazy repair for deleted nodes is handled separately when we detect stale edges + // during graph maintenance operations, not during search. } template size_t HNSWDiskIndex::getNeighborsCount(idType nodeId, size_t level) const { - // First check staged graph updates using hash maps for O(1) lookup uint64_t lookup_key = makeRepairKey(nodeId, level); - // Check insert staging area - auto insert_it = stagedInsertMap.find(lookup_key); - if (insert_it != stagedInsertMap.end()) { - const auto &update = stagedInsertUpdates[insert_it->second]; - - return update.neighbors.size(); + // Step 1: Check cache first (source of truth for pending updates) + { + size_t segment = getSegmentIndex(lookup_key); + auto& cacheSegment = cacheSegments_[segment]; + std::shared_lock cacheLock(cacheSegment.guard); + auto it = cacheSegment.cache.find(lookup_key); + if (it != cacheSegment.cache.end()) { + return it->second.size(); + } + // Check if this is a new node (never written to disk) + if (cacheSegment.newNodes.find(lookup_key) != cacheSegment.newNodes.end()) { + return 0; // New node not yet connected + } } - // If not found in staged updates, check disk + // Step 2: If not found in cache, check disk GraphKey graphKey(nodeId, level); std::string graph_value; rocksdb::Status status = db->Get(rocksdb::ReadOptions(), cf, graphKey.asSlice(), &graph_value); if (status.ok()) { - const char* ptr = graph_value.data(); ptr += this->inputBlobSize; size_t neighbor_count = *reinterpret_cast(ptr); - return neighbor_count; } - return 0; // Not found + return 0; } -template -void HNSWDiskIndex::searchPendingVectors( - const void *query_data, candidatesLabelsMaxHeap &top_candidates, size_t k) const { - for (size_t i = 0; i < pendingVectorCount; i++) { - idType vectorId = pendingVectorIds[i]; - if (isMarkedDeleted(vectorId)) { - // Skip deleted vectors - continue; - } - - // Get the vector data from memory - const void* vector_data = rawVectorsInRAM.find(vectorId)->second.data(); +/********************************** Multi-threaded Job Execution **********************************/ - // Get metadata for this vector - const DiskElementMetaData &metadata = idToMetaData[vectorId]; - labelType label = metadata.label; - - // Calculate distance - DistType dist = this->calcDistanceRaw(query_data, vector_data); +template +void HNSWDiskIndex::submitSingleJob(AsyncJob *job) { + this->SubmitJobsToQueue(this->jobQueue, this->jobQueueCtx, &job, &job->Execute, 1); +} - // Add to candidates if it's good enough - if (top_candidates.size() < k) { - top_candidates.emplace(dist, label); - } else if (dist < top_candidates.top().first) { - top_candidates.pop(); - top_candidates.emplace(dist, label); - } +template +void HNSWDiskIndex::submitJobs(vecsim_stl::vector &jobs) { + vecsim_stl::vector callbacks(jobs.size(), this->allocator); + for (size_t i = 0; i < jobs.size(); i++) { + callbacks[i] = jobs[i]->Execute; } + this->SubmitJobsToQueue(this->jobQueue, this->jobQueueCtx, jobs.data(), callbacks.data(), + jobs.size()); } -/********************************** Batch Processing Methods **********************************/ - template -void HNSWDiskIndex::processBatch() { - if (pendingVectorCount == 0) { - return; - } - - // Clear any previous staged updates (for insertions) - stagedInsertUpdates.clear(); - stagedInsertMap.clear(); - stagedInsertNeighborUpdates.clear(); - - // Process each pending vector ID (vectors are already stored in memory) - for (size_t i = 0; i < pendingVectorCount; i++) { - idType vectorId = pendingVectorIds[i]; - if (isMarkedDeleted(vectorId)) { - // Skip deleted vectors - continue; - } - - // Get the vector data from memory - const void* vector_data = this->vectors->getElement(vectorId); - const void* raw_vector_data = rawVectorsInRAM.find(vectorId)->second.data(); - // Get metadata for this vector - DiskElementMetaData &metadata = idToMetaData[vectorId]; - size_t elementMaxLevel = metadata.topLevel; - - // Insert into graph if not the first element - if (entrypointNode != INVALID_ID) { - insertElementToGraph(vectorId, elementMaxLevel, entrypointNode, maxLevel, raw_vector_data, vector_data); - // If the new element has a higher level than the current max level, - // it becomes the new entry point - if (elementMaxLevel > maxLevel) { - entrypointNode = vectorId; - maxLevel = elementMaxLevel; - } - } else { - // First element becomes the entry point - entrypointNode = vectorId; - maxLevel = elementMaxLevel; - } - } - // std::cout << "processBatch memory: " << this->getAllocationSize()/1024/1024 << " MB" << std::endl; - - // Now flush all staged graph updates to disk in a single batch operation - flushStagedGraphUpdates(stagedInsertUpdates, stagedInsertNeighborUpdates); - stagedInsertMap.clear(); - - // Clear the pending vector IDs - pendingVectorIds.clear(); - rawVectorsInRAM.clear(); - pendingMetadata.clear(); - pendingVectorCount = 0; -} - -template -void HNSWDiskIndex::flushBatch() { - processBatch(); -} - -template -void HNSWDiskIndex::repairNeighborConnections( - idType neighbor_id, size_t level, idType deleted_id, - const vecsim_stl::vector &deleted_node_neighbors, - vecsim_stl::vector &neighbor_neighbors) { +void HNSWDiskIndex::repairNeighborConnections( + idType neighbor_id, size_t level, idType deleted_id, + const vecsim_stl::vector &deleted_node_neighbors, + vecsim_stl::vector &neighbor_neighbors) { // ===== Graph Repair Strategy ===== // When deleting a node, we need to repair its neighbors' connections to maintain @@ -1891,10 +1919,11 @@ void HNSWDiskIndex::repairNeighborConnections( // Use a hash set to track candidate IDs for O(1) duplicate detection std::unordered_set candidate_ids; + size_t elementCount = curElementCount.load(std::memory_order_acquire); // Add existing neighbors (excluding the deleted node) with their distances for (idType nn : neighbor_neighbors) { - if (nn != deleted_id && nn < curElementCount && !isMarkedDeleted(nn)) { + if (nn != deleted_id && nn < elementCount && !isMarkedDeleted(nn)) { const void *nn_data = getDataByInternalId(nn); DistType dist = this->calcDistance(nn_data, neighbor_data); candidates.emplace_back(dist, nn); @@ -1904,7 +1933,7 @@ void HNSWDiskIndex::repairNeighborConnections( // Add deleted node's neighbors (excluding current neighbor) as repair candidates for (idType candidate_id : deleted_node_neighbors) { - if (candidate_id != neighbor_id && candidate_id < curElementCount && + if (candidate_id != neighbor_id && candidate_id < elementCount && !isMarkedDeleted(candidate_id)) { // Check if already in candidates to avoid duplicates using O(1) hash set lookup if (candidate_ids.find(candidate_id) == candidate_ids.end()) { @@ -1920,7 +1949,7 @@ void HNSWDiskIndex::repairNeighborConnections( vecsim_stl::unordered_set original_neighbors_set(this->allocator); original_neighbors_set.reserve(neighbor_neighbors.size()); for (idType nn : neighbor_neighbors) { - if (nn != deleted_id && nn < curElementCount) { + if (nn != deleted_id && nn < elementCount) { original_neighbors_set.insert(nn); } } @@ -1952,11 +1981,12 @@ void HNSWDiskIndex::processDeleteBatch() { // Create a set of IDs being deleted in this batch for quick lookup std::unordered_set deletingIds(pendingDeleteIds.begin(), pendingDeleteIds.end()); + size_t elementCount = curElementCount.load(std::memory_order_acquire); // Process each deleted node for (idType deleted_id : pendingDeleteIds) { // Skip if already processed or invalid - if (deleted_id >= curElementCount || deleted_id >= idToMetaData.size()) { + if (deleted_id >= elementCount || deleted_id >= idToMetaData.size()) { continue; } @@ -1976,7 +2006,7 @@ void HNSWDiskIndex::processDeleteBatch() { // For each neighbor of the deleted node for (idType neighbor_id : deleted_node_neighbors) { // Skip if neighbor is also deleted, invalid, or in the current deletion batch - if (neighbor_id >= curElementCount || isMarkedDeleted(neighbor_id) || + if (neighbor_id >= elementCount || isMarkedDeleted(neighbor_id) || deletingIds.find(neighbor_id) != deletingIds.end()) { continue; } @@ -2014,7 +2044,7 @@ void HNSWDiskIndex::processDeleteBatch() { // Mark metadata as invalid and clean up raw vectors AFTER processing all nodes // This ensures getNeighbors() and other methods work correctly during graph repair for (idType deleted_id : pendingDeleteIds) { - if (deleted_id >= curElementCount || deleted_id >= idToMetaData.size()) { + if (deleted_id >= elementCount || deleted_id >= idToMetaData.size()) { continue; } // Mark the metadata as invalid @@ -2025,7 +2055,6 @@ void HNSWDiskIndex::processDeleteBatch() { if (ram_it != rawVectorsInRAM.end()) { rawVectorsInRAM.erase(ram_it); } - } // Flush all staged graph updates to disk in a single batch operation @@ -2060,17 +2089,13 @@ void HNSWDiskIndex::flushDeleteBatch() { processDeleteBatch(); } -template -void HNSWDiskIndex::setBatchThreshold(size_t threshold) { - batchThreshold = threshold; -} - /********************************** Debug Methods **********************************/ template void HNSWDiskIndex::debugPrintGraphStructure() const { + size_t elementCount = curElementCount.load(std::memory_order_acquire); this->log(VecSimCommonStrings::LOG_DEBUG_STRING, "=== HNSW Disk Index Graph Structure ==="); - this->log(VecSimCommonStrings::LOG_DEBUG_STRING, "Total elements: %zu", curElementCount); + this->log(VecSimCommonStrings::LOG_DEBUG_STRING, "Total elements: %zu", elementCount); this->log(VecSimCommonStrings::LOG_DEBUG_STRING, "Entry point: %u", entrypointNode); this->log(VecSimCommonStrings::LOG_DEBUG_STRING, "Max level: %zu", maxLevel); this->log(VecSimCommonStrings::LOG_DEBUG_STRING, "M: %zu, M0: %zu", M, M0); @@ -2081,7 +2106,7 @@ void HNSWDiskIndex::debugPrintGraphStructure() const { // Print metadata for each element this->log(VecSimCommonStrings::LOG_DEBUG_STRING, "Element metadata:"); - for (size_t i = 0; i < std::min(curElementCount, idToMetaData.size()); ++i) { + for (size_t i = 0; i < std::min(elementCount, idToMetaData.size()); ++i) { if (idToMetaData[i].label != INVALID_LABEL) { this->log(VecSimCommonStrings::LOG_DEBUG_STRING, " Element %zu: label=%u, topLevel=%zu", i, idToMetaData[i].label, @@ -2095,9 +2120,10 @@ void HNSWDiskIndex::debugPrintGraphStructure() const { template void HNSWDiskIndex::debugPrintNodeNeighbors(idType node_id) const { - if (node_id >= curElementCount) { + size_t elementCount = curElementCount.load(std::memory_order_acquire); + if (node_id >= elementCount) { this->log(VecSimCommonStrings::LOG_DEBUG_STRING, "Node %u is out of range (max: %zu)", - node_id, (curElementCount - 1)); + node_id, (elementCount - 1)); return; } @@ -2207,8 +2233,9 @@ size_t HNSWDiskIndex::debugCountGraphEdges() const { template void HNSWDiskIndex::debugValidateGraphConnectivity() const { this->log(VecSimCommonStrings::LOG_DEBUG_STRING, "=== Graph Connectivity Validation ==="); + size_t elementCount = curElementCount.load(std::memory_order_acquire); - if (curElementCount == 0) { + if (elementCount == 0) { this->log(VecSimCommonStrings::LOG_DEBUG_STRING, "Index is empty, nothing to validate"); return; } @@ -2222,7 +2249,7 @@ void HNSWDiskIndex::debugValidateGraphConnectivity() const { } // Check connectivity for first few elements - size_t elements_to_check = std::min(curElementCount, size_t(5)); + size_t elements_to_check = std::min(elementCount, size_t(5)); this->log(VecSimCommonStrings::LOG_DEBUG_STRING, "Checking connectivity for first %zu elements:", elements_to_check); @@ -2237,7 +2264,7 @@ void HNSWDiskIndex::debugValidateGraphConnectivity() const { this->log(VecSimCommonStrings::LOG_DEBUG_STRING, "Checking for isolated nodes..."); size_t isolated_count = 0; - for (size_t i = 0; i < curElementCount; ++i) { + for (size_t i = 0; i < elementCount; ++i) { if (idToMetaData[i].label == INVALID_LABEL) continue; @@ -2277,20 +2304,12 @@ void HNSWDiskIndex::debugValidateGraphConnectivity() const { template -void HNSWDiskIndex::flushStagedUpdates() { - // Flush both insert and delete staged updates +void HNSWDiskIndex::flushStagedDeleteUpdates() { + // Flush staged delete updates to disk // Note: This is a non-const method that modifies the staging areas - flushStagedGraphUpdates(stagedInsertUpdates, stagedInsertNeighborUpdates); - stagedInsertMap.clear(); vecsim_stl::vector emptyNeighborUpdates(this->allocator); flushStagedGraphUpdates(stagedDeleteUpdates, emptyNeighborUpdates); stagedDeleteMap.clear(); - - // Also flush staged repair updates (opportunistic cleanup from getNeighbors) - if (!stagedRepairUpdates.empty()) { - flushStagedGraphUpdates(stagedRepairUpdates, emptyNeighborUpdates); - stagedRepairMap.clear(); - } } template @@ -2497,14 +2516,14 @@ vecsim_stl::vector HNSWDiskIndex::markDelete(labelTy const idType internalId = it->second; // Check if already marked deleted - if (isMarkedDeleted(internalId)) { + if (idToMetaData[internalId].flags & DELETE_MARK) { // Already deleted, return empty vector return internal_ids; } // Mark as deleted (but don't clean up raw vectors yet - they're needed for graph repair // in processDeleteBatch. Cleanup happens there after repair is complete.) - markAs(internalId); + idToMetaData[internalId].flags |= DELETE_MARK; this->numMarkedDeleted++; // If this is the entrypoint, we need to replace it @@ -2525,11 +2544,13 @@ void HNSWDiskIndex::replaceEntryPoint() { // This method is called when the current entrypoint is marked as deleted // We need to find a new entrypoint from the remaining non-deleted nodes idType old_entry_point_id = entrypointNode; + size_t currentMaxLevel = maxLevel; - // Try to find a new entrypoint at the current max level - while (maxLevel != HNSW_INVALID_LEVEL) { - // First, try to find a neighbor of the old entrypoint at the top level - GraphKey graphKey(old_entry_point_id, maxLevel); + // Try to find a new entrypoint at the current max level (including level 0) + // Use a do-while or check >= 0 to include level 0 + while (true) { + // First, try to find a neighbor of the old entrypoint at this level + GraphKey graphKey(old_entry_point_id, currentMaxLevel); std::string graph_value; rocksdb::Status status = db->Get(rocksdb::ReadOptions(), cf, graphKey.asSlice(), &graph_value); @@ -2541,30 +2562,667 @@ void HNSWDiskIndex::replaceEntryPoint() { // Try to find a non-deleted neighbor for (size_t i = 0; i < neighbors.size(); i++) { - if (!isMarkedDeleted(neighbors[i])) { + if (neighbors[i] < idToMetaData.size() && + !(idToMetaData[neighbors[i]].flags & DELETE_MARK)) { entrypointNode = neighbors[i]; + maxLevel = currentMaxLevel; return; } } } // If no suitable neighbor found, search for any non-deleted node at this level - for (idType id = 0; id < curElementCount; id++) { + size_t elementCount = curElementCount.load(); + for (idType id = 0; id < elementCount; id++) { if (id != old_entry_point_id && id < idToMetaData.size() && - idToMetaData[id].label != INVALID_LABEL && idToMetaData[id].topLevel == maxLevel && - !isMarkedDeleted(id)) { + idToMetaData[id].label != INVALID_LABEL && + idToMetaData[id].topLevel >= currentMaxLevel && + !(idToMetaData[id].flags & DELETE_MARK)) { entrypointNode = id; + maxLevel = currentMaxLevel; return; } } // No non-deleted nodes at this level, decrease maxLevel and try again - maxLevel--; + if (currentMaxLevel == 0) break; + currentMaxLevel--; } // If we get here, the index is empty or all nodes are deleted entrypointNode = INVALID_ID; - maxLevel = HNSW_INVALID_LEVEL; + maxLevel = 0; +} + +/********************************** Batchless Mode Implementation **********************************/ + +template +void HNSWDiskIndex::getNeighborsFromCache( + idType nodeId, size_t level, vecsim_stl::vector &result) const { + + result.clear(); + uint64_t key = makeRepairKey(nodeId, level); + size_t segment = getSegmentIndex(key); + auto& cacheSegment = cacheSegments_[segment]; + + // Step 1: Check in-memory cache (includes pending writes from all jobs) + { + std::shared_lock lock(cacheSegment.guard); + auto it = cacheSegment.cache.find(key); + if (it != cacheSegment.cache.end()) { + // Copy from std::vector to vecsim_stl::vector + result.reserve(it->second.size()); + for (idType id : it->second) { + result.push_back(id); + } + filterDeletedNodes(result); + return; + } + + // Check if this is a new node (never written to disk) - return empty + if (cacheSegment.newNodes.find(key) != cacheSegment.newNodes.end()) { + // New node not yet in cache - return empty (will be populated when connected) + return; + } + } + + // Step 2: Not in cache and not a new node - read from disk + GraphKey graphKey(nodeId, level); + std::string graph_value; + rocksdb::Status status = db->Get(rocksdb::ReadOptions(), cf, graphKey.asSlice(), &graph_value); + + if (status.ok()) { + deserializeGraphValue(graph_value, result); + } + + // Add to cache for future reads (convert to unique_lock for write) + { + std::unique_lock lock(cacheSegment.guard); + // Double-check: another thread may have populated it + auto it = cacheSegment.cache.find(key); + if (it == cacheSegment.cache.end()) { + // Copy from vecsim_stl::vector to std::vector + std::vector cacheEntry; + cacheEntry.reserve(result.size()); + for (idType id : result) { + cacheEntry.push_back(id); + } + cacheSegment.cache[key] = std::move(cacheEntry); + } + } + + filterDeletedNodes(result); +} + +// Helper to load neighbors from disk into cache if not already present +// Caller must hold unique_lock on cacheSegment.guard (will be temporarily released during disk I/O) +template +void HNSWDiskIndex::loadNeighborsFromDiskIfNeeded( + uint64_t key, CacheSegment& cacheSegment, std::unique_lock& lock) { + + auto it = cacheSegment.cache.find(key); + if (it != cacheSegment.cache.end()) { + return; // Already in cache + } + + // Check if this is a new node (never written to disk) - skip disk lookup + bool isNewNode = (cacheSegment.newNodes.find(key) != cacheSegment.newNodes.end()); + + if (!isNewNode) { + // First modification of existing node - need to load current state from disk + idType nodeId = static_cast(key >> 32); + size_t level = static_cast(key & 0xFFFFFFFF); + + lock.unlock(); + vecsim_stl::vector diskNeighbors(this->allocator); + GraphKey graphKey(nodeId, level); + std::string graph_value; + rocksdb::Status status = db->Get(rocksdb::ReadOptions(), cf, graphKey.asSlice(), &graph_value); + if (status.ok()) { + deserializeGraphValue(graph_value, diskNeighbors); + } + lock.lock(); + + // Re-check: another thread might have populated it + if (cacheSegment.cache.find(key) == cacheSegment.cache.end()) { + // Convert vecsim_stl::vector to std::vector + std::vector cacheEntry; + cacheEntry.reserve(diskNeighbors.size()); + for (idType id : diskNeighbors) { + cacheEntry.push_back(id); + } + cacheSegment.cache[key] = std::move(cacheEntry); + } + } else { + // New node - initialize with empty neighbor list + cacheSegment.cache[key] = std::vector(); + } +} + +template +void HNSWDiskIndex::addNeighborToCache( + idType nodeId, size_t level, idType newNeighborId) { + + uint64_t key = makeRepairKey(nodeId, level); + size_t segment = getSegmentIndex(key); + auto& cacheSegment = cacheSegments_[segment]; + + std::unique_lock lock(cacheSegment.guard); + + // Load from disk if not in cache (handles newNodes check internally) + loadNeighborsFromDiskIfNeeded(key, cacheSegment, lock); + + // Add new neighbor (avoid duplicates) + auto &neighbors = cacheSegment.cache[key]; + if (std::find(neighbors.begin(), neighbors.end(), newNeighborId) == neighbors.end()) { + neighbors.push_back(newNeighborId); + } + + // Mark as dirty (needs disk write) and increment atomic counter + auto insertResult = cacheSegment.dirty.insert(key); + if (insertResult.second) { // Only increment if newly inserted + totalDirtyCount_.fetch_add(1, std::memory_order_relaxed); + } +} + +template +bool HNSWDiskIndex::tryAddNeighborToCacheIfCapacity( + idType nodeId, size_t level, idType newNeighborId, size_t maxCapacity) { + + uint64_t key = makeRepairKey(nodeId, level); + size_t segment = getSegmentIndex(key); + auto& cacheSegment = cacheSegments_[segment]; + + std::unique_lock lock(cacheSegment.guard); + + // Load from disk if not in cache (handles newNodes check internally) + loadNeighborsFromDiskIfNeeded(key, cacheSegment, lock); + + // Atomic check-and-add under the lock + auto &neighbors = cacheSegment.cache[key]; + + // Check if already present (avoid duplicates) + if (std::find(neighbors.begin(), neighbors.end(), newNeighborId) != neighbors.end()) { + return true; // Already added, consider it success + } + + // Check capacity + if (neighbors.size() >= maxCapacity) { + return false; // No capacity + } + + // Has capacity - add the neighbor + neighbors.push_back(newNeighborId); + auto insertResult = cacheSegment.dirty.insert(key); + if (insertResult.second) { // Only increment if newly inserted + totalDirtyCount_.fetch_add(1, std::memory_order_relaxed); + } + return true; +} + +template +void HNSWDiskIndex::setNeighborsInCache( + idType nodeId, size_t level, const vecsim_stl::vector &neighbors, bool isNewNode) { + + uint64_t key = makeRepairKey(nodeId, level); + size_t segment = getSegmentIndex(key); + auto& cacheSegment = cacheSegments_[segment]; + + // Convert vecsim_stl::vector to std::vector + std::vector cacheEntry; + cacheEntry.reserve(neighbors.size()); + for (idType id : neighbors) { + cacheEntry.push_back(id); + } + + std::unique_lock lock(cacheSegment.guard); + cacheSegment.cache[key] = std::move(cacheEntry); + + // If this is a new node, track it to avoid disk lookups + if (isNewNode) { + cacheSegment.newNodes.insert(key); + } + + auto insertResult = cacheSegment.dirty.insert(key); + if (insertResult.second) { // Only increment if newly inserted + totalDirtyCount_.fetch_add(1, std::memory_order_relaxed); + } +} + +template +void HNSWDiskIndex::writeVectorToDisk( + idType vectorId, const void *rawVectorData, + const vecsim_stl::vector &neighbors) { + + auto writeOptions = rocksdb::WriteOptions(); + writeOptions.disableWAL = true; + + // Write all levels for this vector + size_t topLevel = idToMetaData[vectorId].topLevel; + rocksdb::WriteBatch batch; + + for (size_t level = 0; level <= topLevel; level++) { + GraphKey graphKey(vectorId, level); + std::string value = serializeGraphValue(rawVectorData, neighbors); + batch.Put(cf, graphKey.asSlice(), value); + } + + db->Write(writeOptions, &batch); +} + +template +void HNSWDiskIndex::writeDirtyNodesToDisk( + const vecsim_stl::vector &modifiedNodes, + const void *newVectorRawData, idType newVectorId) { + + if (modifiedNodes.empty()) { + return; + } + + // ==================== SWAP-AND-FLUSH PATTERN ==================== + // To avoid the "Lost Update" race condition, we atomically clear the dirty flag + // BEFORE writing to disk (not after). If a new insert occurs while we're writing, + // it will simply re-add the node to the dirty set. No updates are lost. + // ================================================================ + + auto writeOptions = rocksdb::WriteOptions(); + writeOptions.disableWAL = true; + + rocksdb::WriteBatch batch; + std::vector rawVectorBuffer(this->inputBlobSize); + vecsim_stl::vector nodesToWrite(this->allocator); + + // ==================== SWAP-AND-FLUSH PATTERN ==================== + // Two-phase approach to minimize lock contention while preventing lost updates: + // Phase 1: Read cache data under SHARED lock (allows concurrent writers) + // Phase 2: Clear dirty flags under EXCLUSIVE lock (brief, after all reads done) + // If writing fails, we re-add nodes to dirty set. + // ================================================================ + + // Phase 1: Read cache data under shared locks (allows other writers to proceed) + for (uint64_t key : modifiedNodes) { + idType nodeId = static_cast(key >> 32); + size_t level = static_cast(key & 0xFFFFFFFF); + size_t segment = getSegmentIndex(key); + auto& cacheSegment = cacheSegments_[segment]; + + // Get neighbors from cache under SHARED lock (concurrent reads allowed) + vecsim_stl::vector neighbors(this->allocator); + { + std::shared_lock segmentLock(cacheSegment.guard); + auto it = cacheSegment.cache.find(key); + if (it == cacheSegment.cache.end()) { + continue; + } + const std::vector &cacheNeighbors = it->second; + neighbors.reserve(cacheNeighbors.size()); + for (idType id : cacheNeighbors) { + neighbors.push_back(id); + } + } + // Shared lock released - other threads can add to cache + + // Get raw vector data (no cache lock held during disk I/O) + const void *rawData; + if (nodeId == newVectorId) { + rawData = newVectorRawData; + } else { + // Read from RAM or disk + if (!getRawVectorInternal(nodeId, rawVectorBuffer.data())) { + this->log(VecSimCommonStrings::LOG_WARNING_STRING, + "WARNING: Could not get raw vector for node %u", nodeId); + continue; + } + rawData = rawVectorBuffer.data(); + } + + GraphKey graphKey(nodeId, level); + std::string value = serializeGraphValue(rawData, neighbors); + batch.Put(cf, graphKey.asSlice(), value); + nodesToWrite.push_back(key); + } + + // Phase 2: Clear dirty flags BEFORE writing (swap-and-flush pattern) + // Brief exclusive locks - one segment at a time + for (uint64_t key : nodesToWrite) { + size_t segment = getSegmentIndex(key); + auto& cacheSegment = cacheSegments_[segment]; + std::unique_lock segmentLock(cacheSegment.guard); + if (cacheSegment.dirty.erase(key) > 0) { + totalDirtyCount_.fetch_sub(1, std::memory_order_relaxed); + } + } + + if (nodesToWrite.empty()) { + return; + } + + // Phase 3: Write to disk (no locks held) + // Dirty flags already cleared in Phase 2 - any concurrent modifications + // will re-add to dirty set and be picked up in next flush + rocksdb::Status status = db->Write(writeOptions, &batch); + if (!status.ok()) { + this->log(VecSimCommonStrings::LOG_WARNING_STRING, + "ERROR: Failed to write dirty nodes to disk: %s", status.ToString().c_str()); + + // On failure, re-add nodes to dirty set so they'll be retried + for (uint64_t key : nodesToWrite) { + size_t segment = getSegmentIndex(key); + auto& cacheSegment = cacheSegments_[segment]; + std::unique_lock segmentLock(cacheSegment.guard); + if (cacheSegment.dirty.insert(key).second) { + totalDirtyCount_.fetch_add(1, std::memory_order_relaxed); + } + } + } + // Success case: dirty flags already cleared, nothing more to do +} + +template +void HNSWDiskIndex::flushDirtyNodesToDisk() { + std::lock_guard flushLock(diskWriteGuard); + + // Check if there are any dirty nodes using atomic counter (fast path) + if (totalDirtyCount_.load(std::memory_order_relaxed) == 0) { + return; + } + + // ==================== SWAP-AND-FLUSH PATTERN ==================== + // Process one segment at a time to minimize lock contention: + // 1. Acquire exclusive lock on segment (blocks inserts briefly - nanoseconds for swap) + // 2. Swap dirty set with empty set (atomic ownership transfer) + // 3. Release lock immediately (segment is free for new inserts) + // 4. Process swapped nodes and write to disk (no locks held) + // + // If a new insert happens after the swap, it sees an empty dirty set + // and correctly adds itself. No updates are lost. + // ================================================================ + + auto writeOptions = rocksdb::WriteOptions(); + writeOptions.disableWAL = true; + + rocksdb::WriteBatch batch; + std::vector rawVectorBuffer(this->inputBlobSize); + std::unordered_set successfulLevel0Ids; + + // Track all nodes we're trying to write (for error recovery) + vecsim_stl::vector allNodesToFlush(this->allocator); + + // Process each segment independently + for (size_t s = 0; s < NUM_CACHE_SEGMENTS; ++s) { + auto& cacheSegment = cacheSegments_[s]; + + // Local container to take ownership of dirty nodes + std::unordered_set nodesToFlush; + + { + // 1. Acquire exclusive lock (brief - just for the swap) + std::unique_lock segmentLock(cacheSegment.guard); + + if (cacheSegment.dirty.empty()) { + continue; + } + + // 2. Atomic swap - move dirty set contents to local, leave dirty empty + // New inserts after this will add to the now-empty dirty set + nodesToFlush.swap(cacheSegment.dirty); + + // 3. Update global counter - we've taken ownership of these nodes + totalDirtyCount_.fetch_sub(nodesToFlush.size(), std::memory_order_relaxed); + } + // 4. Lock released here - segment is free for inserts + + // 5. Process nodes and add to batch (no locks held) + for (uint64_t key : nodesToFlush) { + idType nodeId = static_cast(key >> 32); + size_t level = static_cast(key & 0xFFFFFFFF); + + // Get neighbors from cache (need shared lock briefly) + vecsim_stl::vector neighbors(this->allocator); + { + std::shared_lock segmentLock(cacheSegment.guard); + auto it = cacheSegment.cache.find(key); + if (it == cacheSegment.cache.end()) { + // Node not in cache - shouldn't happen, but skip if it does + continue; + } + const std::vector &cacheNeighbors = it->second; + neighbors.reserve(cacheNeighbors.size()); + for (idType id : cacheNeighbors) { + neighbors.push_back(id); + } + } + + // Get raw vector data (no segment lock needed) + if (!getRawVectorInternal(nodeId, rawVectorBuffer.data())) { + // Raw vector not available - this shouldn't happen for dirty nodes + this->log(VecSimCommonStrings::LOG_WARNING_STRING, + "WARNING: Raw vector not found for dirty node %u at level %zu", nodeId, level); + continue; + } + + GraphKey graphKey(nodeId, level); + std::string value = serializeGraphValue(rawVectorBuffer.data(), neighbors); + batch.Put(cf, graphKey.asSlice(), value); + allNodesToFlush.push_back(key); + + if (level == 0) { + successfulLevel0Ids.insert(nodeId); + } + } + } + + // Write entire batch to RocksDB (no locks held during I/O) + if (batch.Count() > 0) { + rocksdb::Status status = db->Write(writeOptions, &batch); + if (!status.ok()) { + this->log(VecSimCommonStrings::LOG_WARNING_STRING, + "ERROR: Failed to flush dirty nodes to disk: %s", status.ToString().c_str()); + + // On failure, re-add nodes to dirty set so they'll be retried + for (uint64_t key : allNodesToFlush) { + size_t segment = getSegmentIndex(key); + auto& cacheSegment = cacheSegments_[segment]; + std::unique_lock segmentLock(cacheSegment.guard); + if (cacheSegment.dirty.insert(key).second) { + totalDirtyCount_.fetch_add(1, std::memory_order_relaxed); + } + } + return; // Don't clear newNodes or remove raw vectors on failure + } + } + + // Success: Clear newNodes flags for all flushed nodes + for (uint64_t key : allNodesToFlush) { + size_t segment = getSegmentIndex(key); + auto& cacheSegment = cacheSegments_[segment]; + std::unique_lock segmentLock(cacheSegment.guard); + cacheSegment.newNodes.erase(key); + } + + // Remove raw vectors for level 0 nodes that were written to disk + if (!successfulLevel0Ids.empty()) { + std::lock_guard lock(rawVectorsGuard); + for (idType vectorId : successfulLevel0Ids) { + rawVectorsInRAM.erase(vectorId); + } + } +} + +template +void HNSWDiskIndex::insertElementToGraph( + idType element_id, size_t element_max_level, idType entry_point, size_t global_max_level, + const void *raw_vector_data, const void *vector_data, + vecsim_stl::vector &modifiedNodes) { + + idType curr_element = entry_point; + DistType cur_dist = std::numeric_limits::max(); + size_t max_common_level; + + if (element_max_level < global_max_level) { + max_common_level = element_max_level; + cur_dist = this->calcDistance(vector_data, getDataByInternalId(curr_element)); + for (auto level = static_cast(global_max_level); + level > static_cast(element_max_level); level--) { + greedySearchLevel(vector_data, level, curr_element, cur_dist); + } + } else { + max_common_level = global_max_level; + } + + for (auto level = static_cast(max_common_level); level >= 0; level--) { + vecsim_stl::updatable_max_heap top_candidates = + searchLayer(curr_element, vector_data, level, efConstruction); + + if (!top_candidates.empty()) { + curr_element = mutuallyConnectNewElement(element_id, top_candidates, level, + modifiedNodes); + } else { + this->log(VecSimCommonStrings::LOG_WARNING_STRING, + "WARNING: No candidates found at level %d!", level); + } + } +} + +template +idType HNSWDiskIndex::mutuallyConnectNewElement( + idType new_node_id, vecsim_stl::updatable_max_heap &top_candidates, + size_t level, vecsim_stl::vector &modifiedNodes) { + + size_t max_M_cur = level ? M : M0; + + // Copy candidates to list for heuristic processing + candidatesList top_candidates_list(this->allocator); + top_candidates_list.insert(top_candidates_list.end(), top_candidates.begin(), + top_candidates.end()); + + // Use heuristic to filter candidates + idType next_closest_entry_point = getNeighborsByHeuristic2(top_candidates_list, M); + + // Extract selected neighbor IDs + vecsim_stl::vector neighbor_ids(this->allocator); + neighbor_ids.reserve(top_candidates_list.size()); + for (size_t i = 0; i < top_candidates_list.size(); ++i) { + neighbor_ids.push_back(top_candidates_list[i].second); + } + + // Update cache for the new node's neighbors + setNeighborsInCache(new_node_id, level, neighbor_ids); + modifiedNodes.push_back(makeRepairKey(new_node_id, level)); + + // Update existing nodes to include the new node in their neighbor lists + for (const auto &neighbor_data : top_candidates_list) { + idType selected_neighbor = neighbor_data.second; + DistType distance = neighbor_data.first; + + // Use atomic check-and-add to prevent race conditions + if (tryAddNeighborToCacheIfCapacity(selected_neighbor, level, new_node_id, max_M_cur)) { + // Successfully added - mark as modified + modifiedNodes.push_back(makeRepairKey(selected_neighbor, level)); + } else { + // Full - need to re-evaluate using heuristic + revisitNeighborConnections(new_node_id, selected_neighbor, level, distance, + modifiedNodes); + } + } + + return next_closest_entry_point; +} + +template +void HNSWDiskIndex::executeSingleInsertJobWrapper(AsyncJob *job) { + auto *insertJob = static_cast(job); + auto *index = static_cast *>(job->index); + index->executeSingleInsertJob(insertJob); +} + +template +void HNSWDiskIndex::executeSingleInsertJob(HNSWDiskSingleInsertJob *job) { + if (!job->isValid) { + delete job; + return; + } + + // Get shared_ptr to raw vector from rawVectorsInRAM (just increments refcount, no copy) + // This keeps the data alive even if erased from map before job finishes + std::shared_ptr localRawRef = [&]() -> std::shared_ptr { + std::shared_lock lock(rawVectorsGuard); + auto it = rawVectorsInRAM.find(job->vectorId); + if (it == rawVectorsInRAM.end()) { + // Vector was already erased (e.g., deleted before job executed) + return nullptr; + } + return it->second; + }(); + + if (!localRawRef) { + delete job; + return; + } + + // Get processed vector from vectors container + const void *processedVector = [&]() -> const void * { + std::shared_lock lock(vectorsGuard); + return this->vectors->getElement(job->vectorId); + }(); + + // Get current entry point and max level + auto [currentEntryPoint, currentMaxLevel] = safeGetEntryPointState(); + + // Use unified core function (batching controlled by diskWriteBatchThreshold) + executeGraphInsertionCore(job->vectorId, job->elementMaxLevel, currentEntryPoint, + currentMaxLevel, localRawRef->data(), processedVector); + + delete job; +} + +template +void HNSWDiskIndex::executeGraphInsertionCore( + idType vectorId, size_t elementMaxLevel, + idType entryPoint, size_t globalMaxLevel, + const void *rawVectorData, const void *processedVectorData) { + + if (entryPoint == INVALID_ID || vectorId == entryPoint) { + // Entry point or first vector - nothing to connect + // Raw vector cleanup happens in addVector for entry point case + return; + } + + // Track modified nodes for disk write + vecsim_stl::vector modifiedNodes(this->allocator); + + // Hold shared lock during graph insertion to prevent idToMetaData resize + // while we're reading from it in searchLayer/processCandidate + { + std::shared_lock lock(indexDataGuard); + // Insert into graph + insertElementToGraph(vectorId, elementMaxLevel, entryPoint, globalMaxLevel, + rawVectorData, processedVectorData, modifiedNodes); + } + + // Handle disk writes - both single-threaded and multi-threaded support batching + // diskWriteBatchThreshold controls when to flush: + // 0 = flush after every insert (no batching) + // >0 = flush when dirty count reaches threshold + if (diskWriteBatchThreshold == 0) { + // No batching: write immediately + writeDirtyNodesToDisk(modifiedNodes, rawVectorData, vectorId); + std::lock_guard lock(rawVectorsGuard); + rawVectorsInRAM.erase(vectorId); + } else if (totalDirtyCount_.load(std::memory_order_relaxed) >= diskWriteBatchThreshold) { + // Threshold reached: flush all dirty nodes + flushDirtyNodesToDisk(); + } + // else: batching enabled but threshold not reached, keep accumulating + + // Update entry point if this vector has higher level + if (elementMaxLevel > globalMaxLevel) { + std::unique_lock lock(indexDataGuard); + if (elementMaxLevel > maxLevel) { + entrypointNode = vectorId; + maxLevel = elementMaxLevel; + } + } } #ifdef BUILD_TESTS diff --git a/src/VecSim/algorithms/hnsw/hnsw_disk_serializer.h b/src/VecSim/algorithms/hnsw/hnsw_disk_serializer.h index db3914e12..5a8f6eeb6 100644 --- a/src/VecSim/algorithms/hnsw/hnsw_disk_serializer.h +++ b/src/VecSim/algorithms/hnsw/hnsw_disk_serializer.h @@ -56,14 +56,14 @@ HNSWDiskIndex::HNSWDiskIndex( : VecSimIndexAbstract(abstractInitParams, components), idToMetaData(this->allocator), labelToIdMap(this->allocator), db(db), cf(cf), dbPath(""), indexDataGuard(), visitedNodesHandlerPool(INITIAL_CAPACITY, this->allocator), - batchThreshold(0), // Will be restored from file - pendingVectorIds(this->allocator), pendingMetadata(this->allocator), - pendingVectorCount(0), pendingDeleteIds(this->allocator), + pendingDeleteIds(this->allocator), stagedInsertUpdates(this->allocator), stagedDeleteUpdates(this->allocator), stagedRepairUpdates(this->allocator), - stagedInsertNeighborUpdates(this->allocator) { + stagedInsertNeighborUpdates(this->allocator), + jobQueue(nullptr), jobQueueCtx(nullptr), SubmitJobsToQueue(nullptr), + cacheSegments_(new CacheSegment[NUM_CACHE_SEGMENTS]) { - // Restore index fields from file (including batchThreshold) + // Restore index fields from file this->restoreIndexFields(input); // Validate the restored fields @@ -72,7 +72,7 @@ HNSWDiskIndex::HNSWDiskIndex( // Initialize level generator with seed based on curElementCount for better distribution // Using curElementCount ensures different sequences for indexes with different sizes // Add a constant offset to avoid seed=0 for empty indexes - this->levelGenerator.seed(200 + this->curElementCount); + this->levelGenerator.seed(200 + this->curElementCount.load(std::memory_order_acquire)); // Restore graph and vectors from file this->restoreGraph(input, version); @@ -101,10 +101,11 @@ template void HNSWDiskIndex::restoreVectorsFromFile(std::ifstream &input, EncodingVersion version) { auto start_time = std::chrono::steady_clock::now(); + size_t elementCount = this->curElementCount.load(std::memory_order_acquire); // Read vectors directly from file // The vectors are stored as processed blobs (storage format) - for (idType id = 0; id < this->curElementCount; id++) { + for (idType id = 0; id < elementCount; id++) { // Allocate memory for the processed vector auto vector_blob = this->allocator->allocate_unique(this->dataSize); @@ -129,7 +130,7 @@ void HNSWDiskIndex::restoreVectorsFromFile(std::ifstream &in double elapsed_seconds = std::chrono::duration(end_time - start_time).count(); this->log(VecSimCommonStrings::LOG_VERBOSE_STRING, "Restored %zu processed vectors from metadata file in %f seconds", - this->curElementCount, elapsed_seconds); + elementCount, elapsed_seconds); } /** @@ -154,7 +155,8 @@ template void HNSWDiskIndex::restoreVectorsFromRocksDB(EncodingVersion version) { // Iterate through all elements and restore their processed vectors auto start_time = std::chrono::steady_clock::now(); - for (idType id = 0; id < this->curElementCount; id++) { + size_t elementCount = this->curElementCount.load(std::memory_order_acquire); + for (idType id = 0; id < elementCount; id++) { // Retrieve the raw vector from RocksDB (stored in level-0 graph value) GraphKey graphKey(id, 0); std::string level0_graph_value; @@ -195,7 +197,7 @@ void HNSWDiskIndex::restoreVectorsFromRocksDB(EncodingVersio double elapsed_seconds = std::chrono::duration(end_time - start_time).count(); this->log(VecSimCommonStrings::LOG_VERBOSE_STRING, "Restored %zu processed vectors from RocksDB checkpoint in %f seconds", - this->curElementCount, elapsed_seconds); + this->curElementCount.load(std::memory_order_acquire), elapsed_seconds); } /** @@ -250,14 +252,11 @@ void HNSWDiskIndex::restoreVectors(std::ifstream &input, Enc */ template void HNSWDiskIndex::saveIndexIMP(std::ofstream &output) { - // Flush any pending updates before saving to ensure consistent snapshot - this->flushStagedUpdates(); - this->flushBatch(); + // Flush any pending delete updates before saving to ensure consistent snapshot + this->flushStagedDeleteUpdates(); + this->flushDirtyNodesToDisk(); // Verify that all pending state has been flushed // These assertions ensure data integrity during serialization - if (!pendingVectorIds.empty()) { - throw std::runtime_error("Serialization error: pendingVectorIds not empty after flush"); - } if (!stagedInsertUpdates.empty()) { throw std::runtime_error("Serialization error: stagedInsertUpdates not empty after flush"); } @@ -270,9 +269,6 @@ void HNSWDiskIndex::saveIndexIMP(std::ofstream &output) { if (!rawVectorsInRAM.empty()) { throw std::runtime_error("Serialization error: rawVectorsInRAM not empty after flush"); } - if (pendingVectorCount != 0) { - throw std::runtime_error("Serialization error: pendingVectorCount not zero after flush"); - } if (!stagedRepairUpdates.empty()) { throw std::runtime_error("Serialization error: stagedRepairUpdates not empty after flush"); } @@ -420,9 +416,10 @@ HNSWIndexMetaData HNSWDiskIndex::checkIntegrity() const { // Store all edges for efficient bidirectional checking: (node_id, level) -> set of neighbors std::unordered_map>> all_edges; + size_t elementCount = this->curElementCount.load(std::memory_order_acquire); // First pass: count deleted and max level - for (idType id = 0; id < this->curElementCount; id++) { + for (idType id = 0; id < elementCount; id++) { if (this->isMarkedDeleted(id)) { num_deleted++; } @@ -440,7 +437,7 @@ HNSWIndexMetaData HNSWDiskIndex::checkIntegrity() const { } // Validate entry point - if (this->curElementCount > 0 && this->entrypointNode == INVALID_ID) { + if (elementCount > 0 && this->entrypointNode == INVALID_ID) { this->log(VecSimCommonStrings::LOG_WARNING_STRING, "checkIntegrity failed: no entry point set for non-empty index"); return res; @@ -475,7 +472,7 @@ HNSWIndexMetaData HNSWDiskIndex::checkIntegrity() const { idType neighborId = neighbors[i]; // Check for invalid neighbor - if (neighborId >= this->curElementCount || neighborId == gk->id) { + if (neighborId >= elementCount || neighborId == gk->id) { this->log(VecSimCommonStrings::LOG_WARNING_STRING, "checkIntegrity failed: invalid neighbor %u for node %u at level %zu", neighborId, gk->id, gk->level); @@ -527,7 +524,7 @@ HNSWIndexMetaData HNSWDiskIndex::checkIntegrity() const { // Check for isolated nodes (non-deleted nodes with no neighbors at any level) size_t isolated_count = 0; - for (idType id = 0; id < this->curElementCount; id++) { + for (idType id = 0; id < elementCount; id++) { // Skip deleted nodes if (this->isMarkedDeleted(id)) { continue; @@ -592,13 +589,19 @@ void HNSWDiskIndex::restoreIndexFields(std::ifstream &input) Serializer::readBinaryPOD(input, this->mult); // Restore index state - Serializer::readBinaryPOD(input, this->curElementCount); + // Note: curElementCount is atomic, so we read into a temporary first + size_t tempElementCount; + Serializer::readBinaryPOD(input, tempElementCount); + this->curElementCount.store(tempElementCount, std::memory_order_release); Serializer::readBinaryPOD(input, this->numMarkedDeleted); + // Read entry point state Serializer::readBinaryPOD(input, this->maxLevel); Serializer::readBinaryPOD(input, this->entrypointNode); - // Restore batch processing configuration - Serializer::readBinaryPOD(input, this->batchThreshold); + // Restore batch processing configuration (legacy field, now unused - read and discard) + size_t legacyBatchThreshold; + Serializer::readBinaryPOD(input, legacyBatchThreshold); + (void)legacyBatchThreshold; // Suppress unused warning // Restore dbPath (string: length + data) size_t dbPathLength; @@ -637,10 +640,12 @@ void HNSWDiskIndex::restoreIndexFields(std::ifstream &input) template void HNSWDiskIndex::restoreGraph(std::ifstream &input, EncodingVersion version) { + size_t elementCount = this->curElementCount.load(std::memory_order_acquire); + // Phase 1: Restore metadata for all elements - this->idToMetaData.resize(this->curElementCount); + this->idToMetaData.resize(elementCount); - for (idType id = 0; id < this->curElementCount; id++) { + for (idType id = 0; id < elementCount; id++) { labelType label; size_t topLevel; elementFlags flags; @@ -683,19 +688,18 @@ void HNSWDiskIndex::restoreGraph(std::ifstream &input, "RocksDB checkpoint loaded. Vectors will be loaded on-demand from disk during queries."); // Clear any pending state (must be empty after deserialization) - this->pendingVectorIds.clear(); - this->pendingMetadata.clear(); - this->pendingVectorCount = 0; this->stagedInsertUpdates.clear(); this->stagedDeleteUpdates.clear(); this->stagedInsertNeighborUpdates.clear(); // Resize visited nodes handler pool - this->visitedNodesHandlerPool.resize(this->curElementCount); + this->visitedNodesHandlerPool.resize(elementCount); } template void HNSWDiskIndex::saveIndexFields(std::ofstream &output) const { + size_t elementCount = this->curElementCount.load(std::memory_order_acquire); + // Save index type Serializer::writeBinaryPOD(output, VecSimAlgo_HNSWLIB_DISK); @@ -705,7 +709,7 @@ void HNSWDiskIndex::saveIndexFields(std::ofstream &output) c Serializer::writeBinaryPOD(output, this->metric); Serializer::writeBinaryPOD(output, this->blockSize); Serializer::writeBinaryPOD(output, this->isMulti); - Serializer::writeBinaryPOD(output, this->curElementCount); // Use curElementCount as initial capacity + Serializer::writeBinaryPOD(output, elementCount); // Use curElementCount as initial capacity // Save HNSW build parameters Serializer::writeBinaryPOD(output, this->M); @@ -720,13 +724,15 @@ void HNSWDiskIndex::saveIndexFields(std::ofstream &output) c Serializer::writeBinaryPOD(output, this->mult); // Save index state - Serializer::writeBinaryPOD(output, this->curElementCount); + Serializer::writeBinaryPOD(output, elementCount); Serializer::writeBinaryPOD(output, this->numMarkedDeleted); + // Write entry point state Serializer::writeBinaryPOD(output, this->maxLevel); Serializer::writeBinaryPOD(output, this->entrypointNode); - // Save batch processing configuration - Serializer::writeBinaryPOD(output, this->batchThreshold); + // Save batch processing configuration (legacy field for compatibility, write 0) + size_t legacyBatchThreshold = 0; + Serializer::writeBinaryPOD(output, legacyBatchThreshold); // Save dbPath (string: length + data) size_t dbPathLength = this->dbPath.length(); @@ -736,8 +742,10 @@ void HNSWDiskIndex::saveIndexFields(std::ofstream &output) c template void HNSWDiskIndex::saveGraph(std::ofstream &output) const { + size_t elementCount = this->curElementCount.load(std::memory_order_acquire); + // Phase 1: Save metadata for all elements - for (idType id = 0; id < this->curElementCount; id++) { + for (idType id = 0; id < elementCount; id++) { labelType label = this->idToMetaData[id].label; size_t topLevel = this->idToMetaData[id].topLevel; elementFlags flags = this->idToMetaData[id].flags; @@ -787,9 +795,10 @@ void HNSWDiskIndex::saveGraph(std::ofstream &output) const { template void HNSWDiskIndex::saveVectorsToFile(std::ofstream &output) const { auto start_time = std::chrono::steady_clock::now(); + size_t elementCount = this->curElementCount.load(std::memory_order_acquire); // Save all processed vectors - for (idType id = 0; id < this->curElementCount; id++) { + for (idType id = 0; id < elementCount; id++) { // Get the processed vector from the vectors container const void *vector_data = this->vectors->getElement(id); @@ -811,5 +820,5 @@ void HNSWDiskIndex::saveVectorsToFile(std::ofstream &output) double elapsed_seconds = std::chrono::duration(end_time - start_time).count(); this->log(VecSimCommonStrings::LOG_VERBOSE_STRING, "Saved %zu processed vectors to metadata file in %f seconds", - this->curElementCount, elapsed_seconds); + elementCount, elapsed_seconds); } diff --git a/src/VecSim/vec_sim_common.h b/src/VecSim/vec_sim_common.h index 97cb6ae6f..4ab155be5 100644 --- a/src/VecSim/vec_sim_common.h +++ b/src/VecSim/vec_sim_common.h @@ -256,6 +256,7 @@ typedef enum { HNSW_SEARCH_JOB, HNSW_SWAP_JOB, SVS_BATCH_UPDATE_JOB, + HNSW_DISK_SINGLE_INSERT_JOB, // Batchless: single vector insert from start to disk write INVALID_JOB // to indicate that finding a JobType >= INVALID_JOB is an error } JobType; diff --git a/tests/benchmark/bm_initialization/bm_hnsw_disk_initialize_fp32.h b/tests/benchmark/bm_initialization/bm_hnsw_disk_initialize_fp32.h index a268e0fa9..f4d622a8d 100644 --- a/tests/benchmark/bm_initialization/bm_hnsw_disk_initialize_fp32.h +++ b/tests/benchmark/bm_initialization/bm_hnsw_disk_initialize_fp32.h @@ -62,29 +62,68 @@ REGISTER_FlushBatchDisk(BM_FLUSH_BATCH_DISK); BENCHMARK_TEMPLATE_DEFINE_F(BM_VecSimCommon, BM_FUNC_NAME(TopK_DeleteLabel_ProtectGT, HNSWDisk), fp32_index_t) (benchmark::State &st) { TopK_HNSW_DISK_DeleteLabel_ProtectGT(st); } REGISTER_TopK_HNSW_DISK_DeleteLabel_ProtectGT(BM_VecSimCommon, BM_FUNC_NAME(TopK_DeleteLabel_ProtectGT, HNSWDisk)); +// Special disk-based HNSW benchmarks for batch processing with multi-threaded async ingest +// Args: {INDEX_HNSW_DISK, thread_count} +// This benchmark reloads the disk index for each run since async operations modify the index +BENCHMARK_TEMPLATE_DEFINE_F(BM_VecSimBasics, BM_ADD_LABEL_ASYNC_DISK, fp32_index_t) +(benchmark::State &st) { + // Reload the disk index fresh for each benchmark run + std::string folder_path = AttachRootPath(hnsw_index_file); -// Range benchmarks -// BENCHMARK_TEMPLATE_DEFINE_F(BM_VecSimBasics, BM_FUNC_NAME(Range, BF), fp32_index_t) -// (benchmark::State &st) { Range_BF(st); } -// REGISTER_Range_BF(BM_FUNC_NAME(Range, BF), fp32_index_t); - -// Range HNSW -// BENCHMARK_TEMPLATE_DEFINE_F(BM_VecSimBasics, BM_FUNC_NAME(Range, HNSW), fp32_index_t) -// (benchmark::State &st) { Range_HNSW(st); } -// REGISTER_Range_HNSW(BM_FUNC_NAME(Range, HNSW), fp32_index_t); - -// Special disk-based HNSW benchmarks for batch processing -// RE-ENABLED: Async AddLabel and DeleteLabel benchmarks for HNSW disk index now work with populated -// BENCHMARK_TEMPLATE_DEFINE_F(BM_VecSimBasics, BM_ADD_LABEL_ASYNC, fp32_index_t) -// (benchmark::State &st) { AddLabel_AsyncIngest(st); } -// BENCHMARK_REGISTER_F(BM_VecSimBasics, BM_ADD_LABEL_ASYNC) -// ->UNIT_AND_ITERATIONS->Arg(INDEX_HNSW_DISK) -// ->ArgName("INDEX_HNSW_DISK"); - -// BENCHMARK_TEMPLATE_DEFINE_F(BM_VecSimBasics, BM_DELETE_LABEL_ASYNC, fp32_index_t) -// (benchmark::State &st) { DeleteLabel_AsyncRepair(st); } -// BENCHMARK_REGISTER_F(BM_VecSimBasics, BM_DELETE_LABEL_ASYNC) -// ->UNIT_AND_ITERATIONS->Arg(1) -// ->Arg(100) -// ->Arg(BM_VecSimGeneral::block_size) -// ->ArgName("SwapJobsThreshold"); + // Clean up existing thread pool and index + if (BM_VecSimGeneral::mock_thread_pool) { + BM_VecSimGeneral::mock_thread_pool->thread_pool_join(); + delete BM_VecSimGeneral::mock_thread_pool; + BM_VecSimGeneral::mock_thread_pool = nullptr; + } + indices[INDEX_HNSW_DISK] = IndexPtr(nullptr); + + // Reload the index from the checkpoint + indices[INDEX_HNSW_DISK] = IndexPtr(HNSWDiskFactory::NewIndex(folder_path)); + + // Create new mock thread pool + BM_VecSimGeneral::mock_thread_pool = new tieredIndexMock(); + auto &mock_thread_pool = *BM_VecSimGeneral::mock_thread_pool; + mock_thread_pool.ctx->index_strong_ref = indices[INDEX_HNSW_DISK].get_shared(); + + // Set up job queue for async operations on the disk index + auto *disk_index = dynamic_cast *>(indices[INDEX_HNSW_DISK].get()); + if (disk_index) { + disk_index->setJobQueue(&mock_thread_pool.jobQ, mock_thread_pool.ctx, + tieredIndexMock::submit_callback); + } + + // Configure thread pool size from benchmark argument and start threads + size_t thread_count = st.range(1); + mock_thread_pool.thread_pool_size = thread_count; + mock_thread_pool.init_threads(); + + // Get initial state + auto *index = indices[INDEX_HNSW_DISK].get(); + size_t initial_index_size = VecSimIndex_IndexSize(index); + + // Measure the AddLabel_AsyncIngest benchmark + auto start_time = std::chrono::high_resolution_clock::now(); + AddLabel_AsyncIngest(st); + // Wait for all jobs to complete before measuring end time + mock_thread_pool.thread_pool_wait(); + auto end_time = std::chrono::high_resolution_clock::now(); + + // Calculate stats + size_t final_index_size = VecSimIndex_IndexSize(index); + size_t vectors_added = final_index_size - initial_index_size; + double total_time_ns = std::chrono::duration(end_time - start_time).count(); + double avg_time_per_label_ns = vectors_added > 0 ? total_time_ns / vectors_added : 0; + + // Add custom counters + st.counters["vectors_added"] = vectors_added; + st.counters["total_time_ns"] = total_time_ns; + st.counters["avg_ns_per_label"] = avg_time_per_label_ns; +} +BENCHMARK_REGISTER_F(BM_VecSimBasics, BM_ADD_LABEL_ASYNC_DISK) + ->Unit(benchmark::kNanosecond) + ->Iterations(10000) + ->Args({INDEX_HNSW_DISK, 1}) + ->Args({INDEX_HNSW_DISK, 4}) + ->Args({INDEX_HNSW_DISK, 8}) + ->ArgNames({"IndexType", "Threads"}); diff --git a/tests/benchmark/bm_vecsim_basics.h b/tests/benchmark/bm_vecsim_basics.h index 162d1ff1d..455b97a0f 100644 --- a/tests/benchmark/bm_vecsim_basics.h +++ b/tests/benchmark/bm_vecsim_basics.h @@ -332,7 +332,7 @@ void BM_VecSimBasics::FlushBatchDisk(benchmark::State &st) { auto *hnsw_disk_index = dynamic_cast *>(hnsw_index); size_t flush_threshold = st.range(0); - hnsw_disk_index->setBatchThreshold(flush_threshold); + hnsw_disk_index->setDiskWriteBatchThreshold(flush_threshold); for (size_t i = 0; i < flush_threshold-1; i++) { // add vectors to fill the batch VecSimIndex_AddVector(hnsw_disk_index, QUERIES[i%N_QUERIES].data(), i); diff --git a/tests/benchmark/bm_vecsim_index.h b/tests/benchmark/bm_vecsim_index.h index 8c469d08f..2b6bae189 100644 --- a/tests/benchmark/bm_vecsim_index.h +++ b/tests/benchmark/bm_vecsim_index.h @@ -187,6 +187,9 @@ void BM_VecSimIndex::Initialize() { auto &mock_thread_pool = *BM_VecSimGeneral::mock_thread_pool; mock_thread_pool.ctx->index_strong_ref = indices[INDEX_HNSW_DISK].get_shared(); // Threads will be started on-demand by the benchmark via reconfigure_threads(). + // NOTE: Job queue is NOT set here - individual benchmarks that need async + // processing should call setJobQueue() with appropriate thread configuration. + // The regular AddLabel benchmark uses single-threaded mode (no job queue). } } diff --git a/tests/benchmark/data/scripts/CMakeLists.txt b/tests/benchmark/data/scripts/CMakeLists.txt index d19e0bc1e..18984fc77 100644 --- a/tests/benchmark/data/scripts/CMakeLists.txt +++ b/tests/benchmark/data/scripts/CMakeLists.txt @@ -21,7 +21,15 @@ find_package(PkgConfig REQUIRED) pkg_check_modules(ROCKSDB REQUIRED IMPORTED_TARGET rocksdb) # Create the executable -add_executable(hnsw_disk_serializer hnsw_disk_serializer.cpp) +add_executable(hnsw_disk_serializer + hnsw_disk_serializer.cpp + ${CMAKE_SOURCE_DIR}/tests/utils/mock_thread_pool.cpp +) + +# Include the mock_thread_pool header +target_include_directories(hnsw_disk_serializer PRIVATE + ${CMAKE_SOURCE_DIR}/tests/utils +) # Link against VecSim and RocksDB (using CMake targets from parent project) target_link_libraries(hnsw_disk_serializer diff --git a/tests/benchmark/data/scripts/hnsw_disk_serializer.cpp b/tests/benchmark/data/scripts/hnsw_disk_serializer.cpp index 1af29add1..c653f0dba 100644 --- a/tests/benchmark/data/scripts/hnsw_disk_serializer.cpp +++ b/tests/benchmark/data/scripts/hnsw_disk_serializer.cpp @@ -5,7 +5,7 @@ * It supports both .raw files (no header) and .fbin files (with header). * * Usage: - * ./hnsw_disk_serializer [M] [efConstruction] + * ./hnsw_disk_serializer [M] [efConstruction] [threads] [diskWriteBatchThreshold] * * Arguments: * input_file - Binary file containing vectors (.raw or .fbin) @@ -17,14 +17,16 @@ * type - Data type: FLOAT32, FLOAT64, BFLOAT16, FLOAT16, INT8, UINT8 * M - HNSW M parameter (default: 64) * efConstruction - HNSW efConstruction parameter (default: 512) + * threads - Number of threads for parallel indexing (default: 4, use 0 for single-threaded) + * diskWriteBatchThreshold - Threshold for disk write batching (default: 1, larger = fewer disk writes) * * Examples: * # Using .raw file (dimension required) * ./hnsw_disk_serializer dbpedia-cosine-dim768-test_vectors.raw \ * dbpedia-cosine-dim768-M64-efc512 768 Cosine FLOAT32 64 512 * - * # Using .fbin file (auto-detect dimension) - * ./hnsw_disk_serializer vectors.fbin output 0 Cosine FLOAT32 64 512 + * # Using .fbin file (auto-detect dimension) with 8 threads + * ./hnsw_disk_serializer vectors.fbin output 0 Cosine FLOAT32 64 512 8 * * # Using .fbin file (verify dimension) * ./hnsw_disk_serializer vectors.fbin output 768 Cosine FLOAT32 64 512 @@ -35,6 +37,7 @@ #include "VecSim/algorithms/hnsw/hnsw_disk.h" #include "VecSim/types/bfloat16.h" #include "VecSim/types/float16.h" +#include "mock_thread_pool.h" #include #include #include @@ -43,6 +46,7 @@ #include #include #include +#include using bfloat16 = vecsim_types::bfloat16; using float16 = vecsim_types::float16; @@ -264,7 +268,7 @@ void saveIndexByType(VecSimIndex *index, const std::string &output_file) { int main(int argc, char *argv[]) { if (argc < 6) { - std::cerr << "Usage: " << argv[0] << " [M] [efConstruction]\n"; + std::cerr << "Usage: " << argv[0] << " [M] [efConstruction] [threads] [diskWriteBatchThreshold]\n"; std::cerr << "\nArguments:\n"; std::cerr << " input_file - Binary file (.raw or .fbin)\n"; std::cerr << " output_name - Base name for output files\n"; @@ -273,6 +277,8 @@ int main(int argc, char *argv[]) { std::cerr << " type - Data type: FLOAT32, FLOAT64, BFLOAT16, FLOAT16, INT8, UINT8\n"; std::cerr << " M - HNSW M parameter (default: 64)\n"; std::cerr << " efConstruction - HNSW efConstruction parameter (default: 512)\n"; + std::cerr << " threads - Number of threads for parallel indexing (default: 4, use 0 for single-threaded)\n"; + std::cerr << " diskWriteBatchThreshold - Threshold for disk write batching (default: 1)\n"; return 1; } @@ -283,6 +289,8 @@ int main(int argc, char *argv[]) { VecSimType type = parseType(argv[5]); size_t M = (argc > 6) ? std::stoull(argv[6]) : 64; size_t efConstruction = (argc > 7) ? std::stoull(argv[7]) : 512; + size_t num_threads = (argc > 8) ? std::stoull(argv[8]) : 4; + size_t disk_write_batch_threshold = (argc > 9) ? std::stoull(argv[9]) : 1; // Check if input file exists if (!std::filesystem::exists(input_file)) { @@ -340,6 +348,8 @@ int main(int argc, char *argv[]) { std::cout << "Type: " << getTypeName(type) << "\n"; std::cout << "M: " << M << "\n"; std::cout << "efConstruction: " << efConstruction << "\n"; + std::cout << "Threads: " << (num_threads > 0 ? std::to_string(num_threads) : "single-threaded") << "\n"; + std::cout << "DiskWriteBatchThreshold: " << disk_write_batch_threshold << "\n"; std::cout << "Number of vectors: " << num_vectors << "\n"; std::cout << "==================================\n\n"; @@ -376,6 +386,29 @@ int main(int argc, char *argv[]) { return 1; } + // Set up multi-threaded processing if requested + std::unique_ptr mock_thread_pool; + if (num_threads > 0) { + mock_thread_pool = std::make_unique(); + mock_thread_pool->ctx->index_strong_ref.reset(index, [](VecSimIndex*) { + // Custom deleter that does nothing - we manage index lifetime separately + }); + mock_thread_pool->thread_pool_size = num_threads; + mock_thread_pool->init_threads(); + + // Configure the disk index to use the job queue + auto *disk_index = dynamic_cast *>(index); + if (disk_index) { + disk_index->setJobQueue(&mock_thread_pool->jobQ, mock_thread_pool->ctx, + tieredIndexMock::submit_callback); + // Set disk write batch threshold for better performance + // Larger batches = fewer disk writes = faster indexing + disk_index->setDiskWriteBatchThreshold(disk_write_batch_threshold); + } + + std::cout << "Multi-threaded indexing enabled with " << num_threads << " threads\n"; + } + std::cout << "Index created successfully\n"; std::cout << "Loading vectors from file...\n"; @@ -399,6 +432,55 @@ int main(int argc, char *argv[]) { std::cerr << "Warning: Expected " << num_vectors << " vectors but added " << vectors_added << "\n"; } + // Wait for all background jobs to complete if using multi-threaded indexing + if (mock_thread_pool) { + std::cout << "Waiting for background indexing jobs to complete...\n"; + + // Custom wait loop with progress reporting + auto start_time = std::chrono::steady_clock::now(); + size_t last_indexed = 0; + while (true) { + // Check if queue is empty and all jobs done + if (mock_thread_pool->isIdle()) { + break; + } + + // Print progress every 5 seconds + size_t current_indexed = VecSimIndex_IndexSize(index); + size_t queue_size = mock_thread_pool->jobQ.size(); + + auto elapsed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start_time).count(); + + if (current_indexed != last_indexed || elapsed % 5 == 0) { + std::cout << "\rIndexed: " << current_indexed << "/" << num_vectors + << " | Queue: " << queue_size + << " | Time: " << elapsed << "s " << std::flush; + last_indexed = current_indexed; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + std::cout << "\n"; + + // Wait for all background jobs to complete (batchless mode - no pending vectors) + mock_thread_pool->thread_pool_wait(); + std::cout << "All background jobs completed.\n"; + + // Flush any remaining dirty nodes to disk + std::cout << "Flushing remaining dirty nodes to disk...\n"; + + auto *disk_index = dynamic_cast *>(index); + if (disk_index) { + disk_index->flushDirtyNodesToDisk(); + } + + std::cout << "Dirty nodes flushed.\n"; + + // Stop the thread pool before saving + mock_thread_pool->thread_pool_join(); + } + auto index_time = std::chrono::high_resolution_clock::now(); auto indexing_duration = std::chrono::duration_cast(index_time - start_time).count(); std::cout << "Indexing completed in " << indexing_duration << " ms\n"; @@ -443,6 +525,9 @@ int main(int argc, char *argv[]) { std::cout << "Total size: " << (metadata_size + checkpoint_size) / (1024.0 * 1024.0) << " MB\n"; // Cleanup + if (mock_thread_pool) { + mock_thread_pool.reset(); + } VecSimIndex_Free(index); std::filesystem::remove_all(rocksdb_path); diff --git a/tests/benchmark/run_files/bm_hnsw_disk_single_fp32.cpp b/tests/benchmark/run_files/bm_hnsw_disk_single_fp32.cpp index 28fd8fd6f..c9855e993 100644 --- a/tests/benchmark/run_files/bm_hnsw_disk_single_fp32.cpp +++ b/tests/benchmark/run_files/bm_hnsw_disk_single_fp32.cpp @@ -22,7 +22,7 @@ size_t BM_VecSimGeneral::EF_C = 256; // Dataset file paths - using deep-1M dataset // For HNSW disk, hnsw_index_file points to the folder containing index.hnsw_disk_v1 and rocksdb/ const char *BM_VecSimGeneral::hnsw_index_file = - "tests/benchmark/data/deep-1M-L2-dim96-M32-efc200-disk-vectors"; + "tests/benchmark/data/deep-1M-L2-dim96-M32-efc200-disk-vectors.zip"; const char *BM_VecSimGeneral::test_queries_file = "tests/benchmark/data/deep.query.public.10K.fbin"; const char *BM_VecSimGeneral::ground_truth_file = "tests/benchmark/data/deep.groundtruth.1M.10K.ibin"; // defined only for this benchmark diff --git a/tests/unit/test_hnsw_disk.cpp b/tests/unit/test_hnsw_disk.cpp index 300ffb065..a69800de2 100644 --- a/tests/unit/test_hnsw_disk.cpp +++ b/tests/unit/test_hnsw_disk.cpp @@ -15,6 +15,7 @@ #include "VecSim/memory/vecsim_malloc.h" #include "unit_test_utils.h" +#include "mock_thread_pool.h" using namespace std; @@ -374,12 +375,9 @@ TEST_F(HNSWDiskIndexTest, AddVectorTest) { auto test_vector2 = createRandomVector(dim, rng); normalizeVector(test_vector2); - // Add vector to batch (should be stored in memory) + // Add vector (written to disk immediately in batchless mode) index.addVector(test_vector2.data(), label2); - // Force flush the batch to disk - index.flushBatch(); - // Test 3: Query to verify both vectors were added correctly VecSimQueryParams queryParams; queryParams.hnswRuntimeParams.efRuntime = 50; @@ -493,10 +491,7 @@ TEST_F(HNSWDiskIndexTest, BatchingTest) { EXPECT_EQ(index.indexSize(), 15); EXPECT_EQ(index.indexLabelCount(), 15); - // Force flush to process the batch - index.flushBatch(); - - // Verify that vectors are now on disk + // Vectors are written to disk immediately in batchless mode EXPECT_EQ(index.indexSize(), 15); EXPECT_EQ(index.indexLabelCount(), 15); @@ -585,8 +580,7 @@ TEST_F(HNSWDiskIndexTest, HierarchicalSearchTest) { index.addVector(test_vectors[i].data(), labels[i]); } - // Force flush to process all vectors - index.flushBatch(); + // Vectors are written to disk immediately in batchless mode std::cout << "\n=== Index state after adding vectors ===" << std::endl; std::cout << "Index size: " << index.indexSize() << std::endl; @@ -942,18 +936,28 @@ TEST_F(HNSWDiskIndexTest, markDelete) { } ASSERT_EQ(index.getNumMarkedDeleted(), n / 2); - ASSERT_EQ(index.indexSize(), n); + // indexSize() returns active elements (curElementCount - numMarkedDeleted) in disk mode + ASSERT_EQ(index.indexSize(), n / 2); // Search for k results around the middle. Expect to find only non-deleted results. auto verify_res_half = [&](size_t id, double score, size_t result_index) { + // Verify the result is not from the deleted set ASSERT_NE(id % 2, ep_reminder); size_t diff_id = (id > 50) ? (id - 50) : (50 - id); // The results alternate between below and above 50, with pairs at the same distance - // Pattern: 49,51 (diff=1), 47,53 (diff=3), 45,55 (diff=5), etc. - // So expected_diff = result_index | 1 (make it odd) - size_t expected_diff = result_index | 1; + // If ep_reminder == 0 (deleted even), remaining are odd: 49,51 (diff=1), 47,53 (diff=3), etc. + // Pattern: expected_diff = (result_index + 1) | 1 = make it odd, starting from 1 + // If ep_reminder == 1 (deleted odd), remaining are even: 50 (diff=0), 48,52 (diff=2), etc. + // Pattern: expected_diff = ((result_index + 1) / 2) * 2 = pairs of even numbers + size_t expected_diff; + if (ep_reminder == 0) { + // Deleted even, remaining odd + expected_diff = (result_index + 1) | 1; + } else { + // Deleted odd, remaining even + expected_diff = ((result_index + 1) / 2) * 2; + } ASSERT_EQ(diff_id, expected_diff); - // ASSERT_EQ(score, (dim * expected_diff * expected_diff)); }; // Run search test after marking deleted @@ -985,7 +989,11 @@ TEST_F(HNSWDiskIndexTest, markDelete) { } } - ASSERT_EQ(index.indexSize(), n + n / 2 + 1); + // indexSize() returns active elements (curElementCount - numMarkedDeleted) + // curElementCount = n + 1 + n/2 = 151 (original + 1 new + re-added) + // numMarkedDeleted = n/2 = 50 + // indexSize() = 151 - 50 = 101 = n + 1 + ASSERT_EQ(index.indexSize(), n + 1); ASSERT_EQ(index.getNumMarkedDeleted(), n / 2); // Search for k results around the middle again. Expect to find the same results we @@ -1042,8 +1050,7 @@ TEST_F(HNSWDiskIndexTest, BatchedDeletionTest) { index.addVector(vec.data(), label); } - // Flush any pending batches - index.flushBatch(); + // Vectors are written to disk immediately in batchless mode // Verify all vectors were added ASSERT_EQ(index.indexSize(), n); @@ -1062,8 +1069,9 @@ TEST_F(HNSWDiskIndexTest, BatchedDeletionTest) { index.flushDeleteBatch(); // Verify the index size and label count - // Note: indexSize includes marked deleted vectors - ASSERT_EQ(index.indexSize(), n); + // indexSize() returns active elements (curElementCount - numMarkedDeleted) + // deleted_count = n/2 + ASSERT_EQ(index.indexSize(), n - deleted_count); ASSERT_EQ(index.indexLabelCount(), n - deleted_count); // Verify that deleted vectors cannot be deleted again (they don't exist) @@ -1152,8 +1160,7 @@ TEST_F(HNSWDiskIndexTest, InterleavedInsertDeleteTest) { ASSERT_EQ(ret, 1) << "Failed to add vector " << label; } - // Flush any pending batches - index.flushBatch(); + // Vectors are written to disk immediately in batchless mode ASSERT_EQ(index.indexSize(), initial_count); ASSERT_EQ(index.indexLabelCount(), initial_count); @@ -1183,15 +1190,15 @@ TEST_F(HNSWDiskIndexTest, InterleavedInsertDeleteTest) { ASSERT_EQ(insert_ret, 1) << "Failed to add vector " << insert_label; } - // Flush any pending batches - index.flushBatch(); + // Vectors are written to disk immediately, flush pending deletes index.flushDeleteBatch(); // Verify index state - // indexSize() returns curElementCount (highest ID + 1) - // Without ID recycling, curElementCount grows with each insertion. - // After initial_count insertions + insert_count new insertions = initial_count + insert_count - ASSERT_EQ(index.indexSize(), initial_count + insert_count); + // indexSize() returns active elements (curElementCount - numMarkedDeleted) + // curElementCount = initial_count + insert_count (no ID recycling) + // numMarkedDeleted = delete_count + // indexSize() = initial_count + insert_count - delete_count + ASSERT_EQ(index.indexSize(), initial_count + insert_count - delete_count); // indexLabelCount() returns labelToIdMap.size() which reflects active (non-deleted) labels // So it should be: initial_count - delete_count + insert_count = 100 - 20 + 20 = 100 ASSERT_EQ(index.indexLabelCount(), initial_count - delete_count + insert_count); @@ -1254,14 +1261,15 @@ TEST_F(HNSWDiskIndexTest, InterleavedInsertDeleteTest) { } } - // Flush any pending batches - index.flushBatch(); + // Vectors are written to disk immediately, flush pending deletes index.flushDeleteBatch(); // Final verification - // Without ID recycling, indexSize() grows with each insertion. - // Total insertions: initial_count + 20 (Phase 2) + 10 (Phase 6) = initial_count + 30 - ASSERT_EQ(index.indexSize(), initial_count + 30); + // indexSize() returns active elements (curElementCount - numMarkedDeleted) + // curElementCount = initial_count + 30 (total insertions) + // numMarkedDeleted = 30 (total deletions) + // indexSize() = initial_count + 30 - 30 = initial_count + ASSERT_EQ(index.indexSize(), initial_count); // indexLabelCount() = initial_count - total_deletes + total_inserts = 100 - 30 + 30 = 100 size_t expected_label_count = initial_count - 30 + 30; // deleted 30 total, added 30 total ASSERT_EQ(index.indexLabelCount(), expected_label_count); @@ -1318,8 +1326,7 @@ TEST_F(HNSWDiskIndexTest, StagedRepairTest) { ASSERT_EQ(ret, 1) << "Failed to add vector " << label; } - // Flush to disk so all graph data is persisted - index.flushBatch(); + // Vectors are written to disk immediately in batchless mode ASSERT_EQ(index.indexSize(), n); ASSERT_EQ(index.indexLabelCount(), n); @@ -1460,8 +1467,7 @@ TEST_F(HNSWDiskIndexTest, GraphRepairBidirectionalEdges) { } } - // Flush to disk - index.flushBatch(); + // Vectors are written to disk immediately in batchless mode ASSERT_EQ(index.indexSize(), n); // Delete a vector from the middle of a cluster (should trigger repair) @@ -1575,8 +1581,7 @@ TEST_F(HNSWDiskIndexTest, UnidirectionalEdgeCleanup) { ASSERT_EQ(ret, 1) << "Failed to add vector " << label; } - // Flush to disk - index.flushBatch(); + // Vectors are written to disk immediately in batchless mode ASSERT_EQ(index.indexSize(), n); // Delete some vectors - this may create unidirectional dangling edges @@ -1692,8 +1697,7 @@ TEST_F(HNSWDiskIndexTest, GraphRepairWithHeuristic) { ASSERT_EQ(ret, 1) << "Failed to add vector " << label; } - // Flush to disk - index.flushBatch(); + // Vectors are written to disk immediately in batchless mode ASSERT_EQ(index.indexSize(), n); // Delete a vector that likely has many neighbors diff --git a/tests/unit/test_quantized_hnsw_disk.cpp b/tests/unit/test_quantized_hnsw_disk.cpp index f85432029..675b65e18 100644 --- a/tests/unit/test_quantized_hnsw_disk.cpp +++ b/tests/unit/test_quantized_hnsw_disk.cpp @@ -104,8 +104,7 @@ TEST_F(QuantizedHNSWDiskTest, BasicQuantization) { ASSERT_EQ(index->indexSize(), n_vectors); - // Flush batch to process vectors and build graph - index->flushBatch(); + // Vectors are now written to disk immediately in batchless mode // Test query auto query = vectors[0]; @@ -182,8 +181,7 @@ TEST_F(QuantizedHNSWDiskTest, QuantizationAccuracy) { VecSimIndex_AddVector(index, v2.data(), 2); VecSimIndex_AddVector(index, v3.data(), 3); - // Flush batch to process vectors and build graph - index->flushBatch(); + // Vectors are now written to disk immediately in batchless mode // Query with v1 - should find itself first auto *results = VecSimIndex_TopKQuery(index, v1.data(), 3, nullptr, BY_SCORE); diff --git a/tests/utils/mock_thread_pool.h b/tests/utils/mock_thread_pool.h index 67fc31111..2ca8607f9 100644 --- a/tests/utils/mock_thread_pool.h +++ b/tests/utils/mock_thread_pool.h @@ -128,6 +128,11 @@ class tieredIndexMock { void thread_pool_join(); void thread_pool_wait(size_t waiting_duration = 10); + // Check if all jobs are complete (queue empty and no in-flight jobs) + bool isIdle() const { + return jobQ.empty() && executions_status.AllDone(); + } + // Reconfigure the thread pool to have exactly `new_size` workers. void reconfigure_threads(size_t new_size); };