CNDB-15669: Fully off-heap memtable#2308
Conversation
Checklist before you submit for review
|
8f66239 to
c00cfa8
Compare
23772ed to
5b58b4d
Compare
|
a3edf99 to
81e7231
Compare
|
| else | ||
| { | ||
| // We are making a copy of another PartitionData object. | ||
| buffer.putLongOrdered(inBufferPos + PARTITIONDATA_OFFSET_ROW_COUNT, partitionData.rowCountIncludingStatic()); |
There was a problem hiding this comment.
partitionData.rowCountIncludingStatic() returns an int so writing it using putLongOrdered writes 8 bytes. I presume this won't be a problem for us ds/ibm but I've heard that oss is sometimes ran on big-endian, if so then the 64-bit promoted long will place the row count at offset 4 (overwriting the tombstone count offset) and write 0 at offset 0.
Correct me if I am wrong please.
There was a problem hiding this comment.
This is definitely a mistake as all other uses of PARTITIONDATA_OFFSET_ROW_COUNT use putIntOrdered. Fixed.
| try | ||
| { | ||
| SHARD_COUNT = Integer.valueOf(shardCount); | ||
| SHARD_COUNT = Integer.parseInt(shardCount); |
There was a problem hiding this comment.
SHARD_COUNT is mutable via JMX, but it only changes for new memtables
We should add a log message or javadoc warning to clarify that this change only takes effect for newly created memtables.
There was a problem hiding this comment.
Actually, this number is never used.
Changed the config object to defer to AbstractShardedMemtable for the shard count and added comments for the lock fairness.
| final InMemoryTrie<Object> data; | ||
| final InMemoryDeletionAwareTrie<Object, TrieTombstoneMarker> data; | ||
|
|
||
| RegularAndStaticColumns columns; |
There was a problem hiding this comment.
Shouldn't we mark both
RegularAndStaticColumns columns;
EncodingStats stats;
volatile, as the are mutated in put() under a write lock but read concurrently by reader threads via TrieMemtable.columns() and TrieMemtable.encodingStats() without locks.
There was a problem hiding this comment.
I don't know why these were missed when all the other fields were marked volatile. Changed in all TrieMemtable variants.
| if (estimatedAverageRowSize == null || currentOperations.get() > estimatedAverageRowSize.operations * 1.5) | ||
| estimatedAverageRowSize = new MemtableAverageRowSize(this, mergedTrie); |
There was a problem hiding this comment.
estimatedAverageRowSize is not thread safe, IIUC if there are multiple concurrent read loads, then they can simultaneously trigger and compute MemtableAverageRowSize, traversing entire trie.
I think we should guarantee that only one thread recalculates this estimate?
There was a problem hiding this comment.
Well, this was meant to be done in such a way that it is quicker to perform twice than to synchronize.
I broke the 100-row limit that was supposed to do that. Removed the special trie code to bring it back.
| } | ||
|
|
||
| @Override | ||
| public String dumpContent(UnsafeBuffer buffer, int inBufferPos, int offsetBits) |
There was a problem hiding this comment.
Can a cell be EmbeddedNoTTL here? if so then this could fail iiuc
There was a problem hiding this comment.
Not fail but rather put data behind the "ttl" and "ldt" labels; as this is only for debugging and the data is still visible, this is okay.
Added comment.
| static long positionForSkippingBranch(long encodedBranchPosition) | ||
| { | ||
| return encodedBranchPosition + (1L << TRANSITION_SHIFT); | ||
| } | ||
|
|
||
| /// Returns true if the given `currPosition` as returned by `advance`, `advanceMultiple` or `skipTo` is the result |
There was a problem hiding this comment.
Under normal circumstances, this increments the transition byte. However, if the current transition byte is 0xFF (the last transition in forward direction, or 0x00 in reverse which is encoded as ~0x00 = 0xFF), adding 1 carries over to bit 28 (the 9th transition bit).
So IIUC
When skipTo subsequently decodes this position via Cursor.incomingTransition(encodedSkipPosition), the carry-over causes issues:
Forward Direction: The decoding masks the transition with & 0xFF (line 183), discarding bit 28. The returned skipTransition is evaluated as 0 instead of 0x100 (256).
Reverse Direction: Since DIRECTION_BIT is set, incomingTransition flips all bits. The carry-over bit 28 is flipped from 1 to 0, and bits 20–27 are flipped from 0 to 0xFF (255).
Because the skip transition is parsed as 0 (forward) or 255 (reverse) instead of 256, the cursor thinks it is looking for a sibling transition >= 0 (or <= 255 in reverse order) rather than ascending:
If the parent is a SPLIT node: With assertions enabled, this immediately triggers a crash in nextValidSplitTransition via: assert midIndex != direction.select(0, SPLIT_START_LEVEL_LIMIT - 1) (which fails because 3 != 3 is false). With assertions disabled, it descends into the first non-null child of mid (e.g. 0xC0), which has already been visited, causing duplicate emissions and/or an infinite loop.
If the parent is a SPARSE node: advanceToSparseTransition stops at the first sibling in the order word (which was already processed), repeating the subtree.
Maybe adjust Cursor.incomingTransition to keep the 9th bit (the 0x100 carry-over) and bypass direction XORing for this bit?
There was a problem hiding this comment.
Added a new method specifically for skipTo implementations.
| /// @inheritDoc | ||
| /// Range tries may have two content values. Handle this possibility here. | ||
| @Override | ||
| S processPrefix(int node, int depth, int transition) | ||
| { | ||
| S content1 = processPrefixEntry(node, depth, transition, PREFIX_CONTENT_OFFSET); | ||
| S content2 = processPrefixEntry(node, depth, transition, PREFIX_ALTERNATE_OFFSET); | ||
| assert (content1 == null) || (content2 == null) : "Prefix node with incompatible content pair " + content1 + " and " + content2; | ||
| // It's not okay to have two backtracks either, but this is not trivial to check. | ||
| return content1 == null ? content2 : content1; | ||
| } |
There was a problem hiding this comment.
If a prefix node contains two return-path boundaries (where shouldPresentOnTheReturnPath evaluates to true for both), processPrefix will push two backtracking entries at the same depth (depth - 1)?
Maybe some safety check here?
There was a problem hiding this comment.
The It's not okay to have two backtracks either, but this is not trivial to check. comment is specifically for this case. It's a bit of a pain to check it.
Do you insist on doing so?
| private static RangesCursor tailCopyOf(RangesCursor copyFrom, Direction newDirection) | ||
| { | ||
| assert !Cursor.isOnReturnPath(copyFrom.currentPosition) | ||
| : "Cannot take tail of a position " + Cursor.toString(copyFrom.currentPosition) + " on the return path."; | ||
| boolean directionMatches = newDirection == copyFrom.direction(); | ||
|
|
||
| // Calculate the span of boundaries that are still active for the tail, not including any matching return path | ||
| // (the latter has the same effect as the set being open-ended at this tail). | ||
| int startInclusive = copyFrom.currentIdx; | ||
| int endExclusive = startInclusive; | ||
| while (endExclusive < copyFrom.endIdx && | ||
| Cursor.compare(copyFrom.nextPositions[endExclusive], | ||
| copyFrom.currentPosition | ON_RETURN_PATH_BIT) < 0) | ||
| ++endExclusive; | ||
|
|
||
| // We can only drop an even number of boundaries on either size. Expand the indexes to make them even. | ||
| int arrayStart = startInclusive & ~1; | ||
| int arrayEnd = ((endExclusive + 1) & ~1); | ||
| // Note: if endExclusive == startInclusive, arrayEnd - arrayStart is 0 if branch is not included, 2 if included | ||
| // (i.e. startInclusive is odd). | ||
|
|
||
| final long depthDiff = Cursor.depthCorrectionValue(copyFrom.currentPosition); | ||
| ByteSource.Peekable[] sources = new ByteSource.Peekable[arrayEnd - arrayStart]; | ||
| final long[] nextPositions = new long[arrayEnd - arrayStart]; | ||
|
|
||
| int newStartIdx; | ||
|
|
||
| // Duplicate all selected boundaries, adjust depths and reverse the order if the direction doesn't match. | ||
| if (directionMatches) | ||
| { | ||
| for (int i = startInclusive; i < endExclusive; ++i) | ||
| { | ||
| sources[i - arrayStart] = copyFrom.duplicateSource(i); | ||
| nextPositions[i - arrayStart] = copyFrom.nextPositions[i] - depthDiff; | ||
| } | ||
| newStartIdx = startInclusive - arrayStart; | ||
| } | ||
| else | ||
| { | ||
| for (int i = startInclusive; i < endExclusive; ++i) | ||
| { | ||
| int destIndex = arrayEnd - 1 - i; | ||
| sources[destIndex] = copyFrom.duplicateSource(i); | ||
| nextPositions[destIndex] = (copyFrom.nextPositions[i] - depthDiff) ^ TRANSITION_MASK; | ||
| if (sources[destIndex].peek() == ByteSource.END_OF_STREAM) | ||
| nextPositions[destIndex] ^= ON_RETURN_PATH_BIT; | ||
| } | ||
| newStartIdx = arrayEnd - endExclusive; | ||
| } | ||
|
|
||
| // Determine the state the root needs to present. | ||
| boolean startIsContained = (newStartIdx & 1) != 0; | ||
| RangeState rootState = startIsContained ? newDirection.select(RangeState.START, RangeState.END) | ||
| : RangeState.NOT_CONTAINED; | ||
| long rootPosition = Cursor.rootPosition(newDirection); | ||
|
|
||
| // Add an onReturnPath root position for open-ended sets. | ||
| int last = nextPositions.length - 1; | ||
| if (last > 0 && sources[last] == null) | ||
| { | ||
| sources[last] = ByteSource.EMPTY; | ||
| nextPositions[last] = rootPosition | ON_RETURN_PATH_BIT; | ||
| } | ||
|
|
||
| return new RangesCursor(copyFrom.byteComparableVersion, | ||
| copyFrom.endsAfterMask, | ||
| nextPositions, | ||
| sources, | ||
| newStartIdx, | ||
| nextPositions.length, | ||
| rootPosition, | ||
| rootState); | ||
| } |
There was a problem hiding this comment.
We allocate sources and nextPositions each time, maybe we could reuse?
There was a problem hiding this comment.
We have to duplicate sources so we can't use parent's array. nextPositions must also be distinct because the child cursor modifies them.
| /// Allocate a new position in the object array. Used by the memory allocation strategy to allocate a content spot | ||
| /// when it runs out of recycled positions. | ||
| private int allocateNewObject() | ||
| { | ||
| int index = reservedCount++; | ||
| int leadBit = getBufferIdx(index, CONTENTS_START_SHIFT, CONTENTS_START_SIZE); | ||
| AtomicReferenceArray<T> array = contentArrays[leadBit]; | ||
| if (array == null) | ||
| { | ||
| assert inBufferOffset(index, leadBit, CONTENTS_START_SIZE) == 0 : "Error in content arrays configuration."; | ||
| contentArrays[leadBit] = new AtomicReferenceArray<>(CONTENTS_START_SIZE << leadBit); | ||
| } | ||
| return index; | ||
| } |
There was a problem hiding this comment.
Is this possible we can go over the 2^28 allocations? If so we could throw TrieSpaceExhaustedException with explicit check
There was a problem hiding this comment.
It's actually 2^29th (the largest array is 2^28) and we can't fill it, because the cell space is limited at 2GiB and it takes 4 bytes to store an int.
Doesn't hurt to have a check, though.
If we are using bytes + specials assigned by pojo (which we don't currently do), we have 2GiB / 32 possible special indexes and it is actually possible to exhaust them. Added check.
| @Override | ||
| public ColumnData clone(Cloner cloner) | ||
| { | ||
| return null; | ||
| } |
There was a problem hiding this comment.
Shouldn't we clone the wrapped column here? why null
| @Override | ||
| public T content() | ||
| { | ||
| T content = data.content(); | ||
| if (content == null) | ||
| return null; | ||
| if (deletions == null) | ||
| return content; | ||
|
|
||
| E applicableDeletions = atDeletions ? deletions.content() : null; | ||
| if (applicableDeletions == null) | ||
| { | ||
| applicableDeletions = deletions.precedingState(); | ||
| if (applicableDeletions == null) | ||
| return content; | ||
| } | ||
|
|
||
| return resolver.apply(applicableDeletions, content); | ||
| } |
There was a problem hiding this comment.
In RangesApplyCursor.contet() we have an exhaustion guard check. Shouldn't we do the same here?
There was a problem hiding this comment.
Well, adding this guard would seriously complicate other code.
Instead went for officially allowing state and precedingState to be called in exhausted state, and also refactored some of the definition of TrieSetCursor and its verification to ensure it's in better agreement with RangeCursor.
f22cd48 to
bc643e7
Compare
85abd58 to
dee7251
Compare
Implements a row-level trie memtable that uses deletion-aware tries to store deletions separately from live data, together with the associated TrieBackedPartition and TriePartitionUpdate. Refactors trie hierarchy to support multiple trie types: - plain - range, which stores range boundaries and is able to answer questions about the range that applies to every point in the trie - deletion aware, which combines a data part and a deletion range trie Every trie type supports suitable operations, including merging and intersection that make sense for the type of trie. In particular, deletion-aware tries apply range branches to delete data during merges. Adds a new method to UnfilteredRowIterator that is implemented by the new trie-backed partitions to ask them to stop issuing tombstones. This is done on filtering (i.e. conversion from UnfilteredRowIterator to RowIterator) where tombstones have already done their job and are no longer needed. Adds JMH tests of tombstones that demonstrate tombstone-independent performance on memtable queries. # Conflicts: # test/burn/org/apache/cassandra/index/sai/LongVectorTest.java
in a combined `encodedState` returned by advancing methods. This saves megamorphic calls to `incomingTransition` and can be augmented by further information at no cost.
This functionality has two main applications: - it allows reverse walks that present prefix content in the correct byte-comparable order (i.e. prefixes after children) - it makes it possible to have full control over what is and isn't included in a trie ranges (e.g. making it possible to have a branch set and nested ranges)
…and TrieMemtable to Stage3 version Remove duplicate configuration object and add tests for stage 3
This change extends the coverage of the memtable trie to the cell level, defining mappings of trie branches to and from the legacy concepts of complex columns and rows.
This makes it possible to have completely off-heap trie memtable, where cell data is stored inside the trie structure if it is small enough to fit, or placed in natively-allocated memory and referenced by memory address.
Rework row filtering to avoid moving deletion boundaries Implement generalized range intersection and use it to perform row filtering Drop mapping merge
Add opOrder safety for some iterations over memtable data that aren't in a opOrder protected group. Make sure all TrieMemtable buffers are copied, including for on-heap tries that can be overwritten. Add facility to overwrite data and a simple test.
when deletionsAtFixedPoints is not true
taking into account overflow positions constructed by positionForSkippingBranch
Change TrieSetCursor to be fully compatible with RangeCursor and implement a separate nonNullState method.
Change incomingPosition to suport overflows and improve its implementation.
❌ Build ds-cassandra-pr-gate/PR-2308 rejected by Butler5 regressions found Found 5 new test failures
Found 6 known test failures |



What is the issue
https://github.com/riptano/cndb/issues/15669
https://github.com/riptano/cndb/issues/10302
What does this PR fix and why was it fixed
Implementation of the fully off-heap, tombstone-aware memtable.
The first commit is CNDB-10302 as reviewed in #2005, adding tombstone support. The second refactors some of the access interfaces to combine the cursor position into a single long for efficiency and extra flexibility, which the third commit uses to lift some restrictions in the kinds of ranges that the tries could support. The fifth commit extends the memtable trie all the way to individual cells, and the sixth makes it possible to store data in trie cells. When used with
offheap_objectsallocation type, this memtable is fully off-heap, with ~100KiB of on-heap presence irrespective of data size.Each commit should compile and pass tests, and comes with documentation in the included markdown files.