Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.mobilenativefoundation.store.store5.impl.extensions.now
import org.mobilenativefoundation.store.store5.internal.concurrent.ThreadSafety
import org.mobilenativefoundation.store.store5.internal.definition.WriteRequestQueue
import org.mobilenativefoundation.store.store5.internal.result.EagerConflictResolutionResult
import org.mobilenativefoundation.store.store5.internal.result.StoreDelegateWriteResult

@OptIn(ExperimentalStoreApi::class)
internal class RealMutableStore<Key : Any, Network : Any, Output : Any, Local : Any>(
Expand Down Expand Up @@ -85,25 +86,30 @@ internal class RealMutableStore<Key : Any, Network : Any, Output : Any, Local :
val storeWriteResponse =
try {
// Always write to local first.
delegate.write(writeRequest.key, writeRequest.value)

// Try to sync to network.
val updaterResult = tryUpdateServer(writeRequest)

// Convert UpdaterResult -> StoreWriteResponse.
when (updaterResult) {
is UpdaterResult.Error.Exception -> StoreWriteResponse.Error.Exception(updaterResult.error)
is UpdaterResult.Error.Message -> StoreWriteResponse.Error.Message(updaterResult.message)
is UpdaterResult.Success.Typed<*> -> {
val typedValue = updaterResult.value as? Response
if (typedValue == null) {
StoreWriteResponse.Success.Untyped(updaterResult.value)
} else {
StoreWriteResponse.Success.Typed(updaterResult.value)
// Only proceed to network if local write succeeded.
when (val delegateWriteResult = delegate.write(writeRequest.key, writeRequest.value)) {
is StoreDelegateWriteResult.Error.Exception -> {
StoreWriteResponse.Error.Exception(delegateWriteResult.error)
}
is StoreDelegateWriteResult.Error.Message -> {
StoreWriteResponse.Error.Message(delegateWriteResult.error)
}
is StoreDelegateWriteResult.Success -> {
// Try to sync to network.
when (val updaterResult = tryUpdateServer(writeRequest)) {
is UpdaterResult.Error.Exception -> StoreWriteResponse.Error.Exception(updaterResult.error)
is UpdaterResult.Error.Message -> StoreWriteResponse.Error.Message(updaterResult.message)
is UpdaterResult.Success.Typed<*> -> {
val typedValue = updaterResult.value as? Response
if (typedValue == null) {
StoreWriteResponse.Success.Untyped(updaterResult.value)
} else {
StoreWriteResponse.Success.Typed(updaterResult.value)
}
}
is UpdaterResult.Success.Untyped -> StoreWriteResponse.Success.Untyped(updaterResult.value)
}
}

is UpdaterResult.Success.Untyped -> StoreWriteResponse.Success.Untyped(updaterResult.value)
}
} catch (throwable: Throwable) {
StoreWriteResponse.Error.Exception(throwable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,13 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
value: Output,
): StoreDelegateWriteResult =
try {
memCache?.put(key, value)
sourceOfTruth?.write(key, converter.fromOutputToLocal(value))
StoreDelegateWriteResult.Success
val writeException = sourceOfTruth?.write(key, converter.fromOutputToLocal(value))
if (writeException != null) {
StoreDelegateWriteResult.Error.Exception(writeException)
} else {
memCache?.put(key, value)
StoreDelegateWriteResult.Success
}
} catch (error: Throwable) {
StoreDelegateWriteResult.Error.Exception(error)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,18 @@ internal class SourceOfTruthWithBarrier<Key : Any, Network : Any, Output : Any,
}
}

/**
* Writes a value to the underlying [SourceOfTruth] and returns any error that occurred.
*
* @return The [SourceOfTruth.WriteException] if the write failed, or null if successful.
* Callers like [RealStore.write] can check this to determine if the write succeeded.
* The barrier mechanism also notifies readers of the error via [BarrierMsg.Open.writeError].
*/
@Suppress("UNCHECKED_CAST")
suspend fun write(
key: Key,
value: Local,
) {
): SourceOfTruth.WriteException? {
val barrier = barriers.acquire(key)
try {
barrier.emit(BarrierMsg.Blocked(versionCounter.incrementAndGet()))
Expand All @@ -164,24 +171,27 @@ internal class SourceOfTruthWithBarrier<Key : Any, Network : Any, Output : Any,
}
}

// Avoid double-wrapping if the error is already a WriteException.
val writeException =
writeError?.let {
writeError as? SourceOfTruth.WriteException
?: SourceOfTruth.WriteException(
key = key,
value = value,
cause = writeError,
)
}

barrier.emit(
BarrierMsg.Open(
version = versionCounter.incrementAndGet(),
writeError =
writeError?.let {
SourceOfTruth.WriteException(
key = key,
value = value,
cause = writeError,
)
},
writeError = writeException,
),
)
if (writeError is CancellationException) {
// only throw if it failed because of cancelation.
// otherwise, we take care of letting downstream know that there was a write error
throw writeError
}

// Return the error so callers know the operation failed.
// The barrier message above notifies readers of the error.
return writeException
} finally {
barriers.release(key, barrier)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.test.runTest
import org.mobilenativefoundation.store.core5.ExperimentalStoreApi
import org.mobilenativefoundation.store.store5.FetcherResult
import org.mobilenativefoundation.store.store5.SourceOfTruth
import org.mobilenativefoundation.store.store5.StoreReadRequest
import org.mobilenativefoundation.store.store5.StoreReadResponse
import org.mobilenativefoundation.store.store5.StoreWriteRequest
Expand Down Expand Up @@ -297,6 +298,115 @@ class RealMutableStoreTest {
assertNotNull(testBookkeeper.getLastFailedSync("exceptionKey"))
}

@Test
fun write_givenSourceOfTruthFailure_whenCalled_thenSurfacesWriteError() =
runTest {
// Given
val key = "key"
val errorMessage = "write error"
testSourceOfTruth.throwOnWrite(key) {
IllegalStateException(errorMessage)
}
val request =
StoreWriteRequest.of<String, Note, Unit>(
key = key,
value = Note(key, "content"),
created = 3333L,
onCompletions = null,
)

// When
val response = mutableStore.write(request)

// Then
val errorResponse = assertIs<StoreWriteResponse.Error.Exception>(response)
val writeException = assertIs<SourceOfTruth.WriteException>(errorResponse.error)
val cause = assertIs<IllegalStateException>(writeException.cause)
assertEquals(errorMessage, cause.message)
}

@Test
fun write_givenSourceOfTruthFailure_whenCalled_thenNetworkSyncNotAttempted() =
runTest {
// Given
val key = "key"
testUpdater.postCallCount = 0
testSourceOfTruth.throwOnWrite(key) { IllegalStateException("SOT failure") }

val request =
StoreWriteRequest.of<String, Note, Unit>(
key = key,
value = Note(key, "content"),
created = 4444L,
onCompletions = null,
)

// When
val response = mutableStore.write(request)

// Then
assertIs<StoreWriteResponse.Error.Exception>(response)
assertEquals(0, testUpdater.postCallCount, "Network updater should not be called when SOT write fails")
}

@Test
fun write_givenSourceOfTruthFailure_whenCalled_thenMemCacheNotUpdated() =
runTest {
// Given
val key = "key"
testSourceOfTruth.throwOnWrite(key) { IllegalStateException("SOT failure") }

val request =
StoreWriteRequest.of<String, Note, Unit>(
key = key,
value = Note(key, "content"),
created = 6666L,
onCompletions = null,
)

// When
val response = mutableStore.write(request)

// Then
assertIs<StoreWriteResponse.Error.Exception>(response)
assertNull(delegateStore.latestOrNull(key), "Value should not be in cache after SOT write failure")
}

@Test
fun write_givenNoSourceOfTruth_whenCalled_thenSucceeds() =
runTest {
// Given
val storeWithoutSot =
testStore(
fetcher = testFetcher,
sourceOfTruth = null,
converter = testConverter,
validator = testValidator,
memoryCache = testCache,
)
val mutableStoreWithoutSot =
RealMutableStore(
delegate = storeWithoutSot,
updater = testUpdater,
bookkeeper = testBookkeeper,
logger = testLogger,
)

val request =
StoreWriteRequest.of<String, Note, Unit>(
key = "noSotKey",
value = Note("id", "content"),
created = 5555L,
onCompletions = null,
)

// When
val response = mutableStoreWithoutSot.write(request)

// Then
assertIs<StoreWriteResponse.Success>(response)
}

@Test
fun clearAll_givenSomeKeys_whenCalled_thenDelegateIsCleared() =
runTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ internal fun <Key : Any, Network : Any, Output : Any, Local : Any> testStore(
dispatcher: CoroutineDispatcher = Dispatchers.Default,
scope: CoroutineScope = CoroutineScope(dispatcher),
fetcher: Fetcher<Key, Network> = TestFetcher(),
sourceOfTruth: SourceOfTruth<Key, Local, Output> = TestSourceOfTruth(),
sourceOfTruth: SourceOfTruth<Key, Local, Output>? = TestSourceOfTruth(),
converter: Converter<Network, Local, Output> = TestConverter(),
validator: Validator<Output> = TestValidator(),
memoryCache: Cache<Key, Output> = TestCache(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ class TestUpdater<Key : Any, Output : Any, Response : Any> : Updater<Key, Output
var exception: Throwable? = null
var errorMessage: String? = null
var successValue: Response? = null
var postCallCount: Int = 0

override suspend fun post(
key: Key,
value: Output,
): UpdaterResult {
postCallCount++
exception?.let { return UpdaterResult.Error.Exception(it) }
errorMessage?.let { return UpdaterResult.Error.Message(it) }
successValue?.let { return UpdaterResult.Success.Typed(it) }
Expand Down
Loading