diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index 9a62828c..355332aa 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -180,6 +180,18 @@ struct ErrorCode { static constexpr int INVALID_ALTER_TABLE_EXCEPTION = 56; /// Deletion operations are disabled on this table. static constexpr int DELETION_DISABLED_EXCEPTION = 57; + + /// Returns true if retrying the request may succeed. Mirrors Java's RetriableException hierarchy. + static constexpr bool IsRetriable(int32_t code) { + return code == NETWORK_EXCEPTION || code == CORRUPT_MESSAGE || + code == SCHEMA_NOT_EXIST || code == LOG_STORAGE_EXCEPTION || + code == KV_STORAGE_EXCEPTION || code == NOT_LEADER_OR_FOLLOWER || + code == CORRUPT_RECORD_EXCEPTION || + code == UNKNOWN_TABLE_OR_BUCKET_EXCEPTION || code == REQUEST_TIME_OUT || + code == STORAGE_EXCEPTION || + code == NOT_ENOUGH_REPLICAS_AFTER_APPEND_EXCEPTION || + code == NOT_ENOUGH_REPLICAS_EXCEPTION || code == LEADER_NOT_AVAILABLE_EXCEPTION; + } }; struct Date { @@ -326,6 +338,9 @@ struct Result { std::string error_message; bool Ok() const { return error_code == 0; } + + /// Returns true if retrying the request may succeed. Client-side errors always return false. + bool IsRetriable() const { return ErrorCode::IsRetriable(error_code); } }; struct TablePath { diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index 6f9ae0b3..f8e37c78 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -772,6 +772,8 @@ class FlussError(Exception): error_code: int def __init__(self, message: str, error_code: int = -2) -> None: ... def __str__(self) -> str: ... + @property + def is_retriable(self) -> bool: ... class LakeSnapshot: def __init__(self, snapshot_id: int) -> None: ... diff --git a/bindings/python/src/error.rs b/bindings/python/src/error.rs index 606b9f4f..10c6cfa0 100644 --- a/bindings/python/src/error.rs +++ b/bindings/python/src/error.rs @@ -51,6 +51,16 @@ impl FlussError { format!("FlussError: {}", self.message) } } + + /// Returns ``True`` if retrying the request may succeed. Client-side errors always return ``False``. + #[getter] + fn is_retriable(&self) -> bool { + use fluss::rpc::FlussError as CoreFlussError; + if self.error_code == CLIENT_ERROR_CODE { + return false; + } + CoreFlussError::for_code(self.error_code).is_retriable() + } } impl FlussError { diff --git a/crates/fluss/src/error.rs b/crates/fluss/src/error.rs index 59524a63..f6468377 100644 --- a/crates/fluss/src/error.rs +++ b/crates/fluss/src/error.rs @@ -165,6 +165,17 @@ impl Error { None } } + + /// Returns `true` if retrying the request may succeed. + /// [`Error::RpcError`] is always retriable; [`Error::FlussAPIError`] delegates to + /// [`ApiError::is_retriable`]; all other variants are not. + pub fn is_retriable(&self) -> bool { + match self { + Error::RpcError { .. } => true, + Error::FlussAPIError { api_error } => api_error.is_retriable(), + _ => false, + } + } } impl From for Error { diff --git a/crates/fluss/src/rpc/fluss_api_error.rs b/crates/fluss/src/rpc/fluss_api_error.rs index a501b997..95a39c69 100644 --- a/crates/fluss/src/rpc/fluss_api_error.rs +++ b/crates/fluss/src/rpc/fluss_api_error.rs @@ -39,6 +39,13 @@ impl Display for ApiError { } } +impl ApiError { + /// Returns `true` if retrying the request may succeed. Delegates to [`FlussError::is_retriable`]. + pub fn is_retriable(&self) -> bool { + FlussError::for_code(self.code).is_retriable() + } +} + /// Fluss protocol errors. These errors are part of the client-server protocol. /// The error codes cannot be changed, but the names can be. /// @@ -172,6 +179,25 @@ impl FlussError { *self as i32 } + pub fn is_retriable(&self) -> bool { + matches!( + self, + FlussError::NetworkException + | FlussError::CorruptMessage + | FlussError::SchemaNotExist + | FlussError::LogStorageException + | FlussError::KvStorageException + | FlussError::NotLeaderOrFollower + | FlussError::CorruptRecordException + | FlussError::UnknownTableOrBucketException + | FlussError::RequestTimeOut + | FlussError::StorageException + | FlussError::NotEnoughReplicasAfterAppendException + | FlussError::NotEnoughReplicasException + | FlussError::LeaderNotAvailableException + ) + } + /// Returns a friendly description of the error. pub fn message(&self) -> &'static str { match self { @@ -403,4 +429,55 @@ mod tests { let fluss_error = FlussError::from(api_error); assert_eq!(fluss_error, FlussError::TableNotExist); } + + #[test] + fn is_retriable_known_retriable_errors() { + let retriable = [ + FlussError::NetworkException, + FlussError::CorruptMessage, + FlussError::SchemaNotExist, + FlussError::LogStorageException, + FlussError::KvStorageException, + FlussError::NotLeaderOrFollower, + FlussError::CorruptRecordException, + FlussError::UnknownTableOrBucketException, + FlussError::RequestTimeOut, + FlussError::StorageException, + FlussError::NotEnoughReplicasAfterAppendException, + FlussError::NotEnoughReplicasException, + FlussError::LeaderNotAvailableException, + ]; + for err in &retriable { + assert!(err.is_retriable(), "{err:?} should be retriable"); + } + } + + #[test] + fn is_retriable_known_non_retriable_errors() { + let non_retriable = [ + FlussError::UnknownServerError, + FlussError::None, + FlussError::TableNotExist, + FlussError::AuthenticateException, + FlussError::AuthorizationException, + FlussError::RecordTooLargeException, + FlussError::DeletionDisabledException, + FlussError::InvalidCoordinatorException, + FlussError::FencedLeaderEpochException, + FlussError::FencedTieringEpochException, + FlussError::RetriableAuthenticateException, + ]; + for err in &non_retriable { + assert!(!err.is_retriable(), "{err:?} should not be retriable"); + } + } + + #[test] + fn api_error_is_retriable_delegates_to_fluss_error() { + let retriable_api = FlussError::RequestTimeOut.to_api_error(None); + assert!(retriable_api.is_retriable()); + + let permanent_api = FlussError::TableNotExist.to_api_error(None); + assert!(!permanent_api.is_retriable()); + } } diff --git a/website/docs/user-guide/cpp/error-handling.md b/website/docs/user-guide/cpp/error-handling.md index 76b03e3e..9e0cc150 100644 --- a/website/docs/user-guide/cpp/error-handling.md +++ b/website/docs/user-guide/cpp/error-handling.md @@ -97,6 +97,53 @@ if (!result.Ok()) { See `fluss::ErrorCode` in `fluss.hpp` for the full list of named constants. +## Retry Logic + +Some errors are transient, where the server may be temporarily unavailable, mid-election, or under load. `IsRetriable()` can be used for deciding to to retry an operation rather than treating the error as permanent. + +`ErrorCode::IsRetriable(int32_t code)` is a static helper available directly on the error code: + +```cpp +fluss::Result result = writer.Append(row); +if (!result.Ok()) { + if (result.IsRetriable()) { + // Transient failure — safe to retry + } else { + // Permanent failure — log and abort + std::cerr << "Fatal error (code " << result.error_code + << "): " << result.error_message << std::endl; + } +} +``` + +`Result::IsRetriable()` delegates to `ErrorCode::IsRetriable()`, so you can also call it directly on the code: + +```cpp +if (fluss::ErrorCode::IsRetriable(result.error_code)) { + // retry +} +``` + +### Retriable Error Codes + +| Constant | Code | Reason | +|-------------------------------------------------------------|------|-------------------------------------------| +| `ErrorCode::NETWORK_EXCEPTION` | 1 | Server disconnected | +| `ErrorCode::CORRUPT_MESSAGE` | 3 | CRC or size error | +| `ErrorCode::SCHEMA_NOT_EXIST` | 9 | Schema may not exist | +| `ErrorCode::LOG_STORAGE_EXCEPTION` | 10 | Transient log storage error | +| `ErrorCode::KV_STORAGE_EXCEPTION` | 11 | Transient KV storage error | +| `ErrorCode::NOT_LEADER_OR_FOLLOWER` | 12 | Leader election in progress | +| `ErrorCode::CORRUPT_RECORD_EXCEPTION` | 14 | Corrupt record | +| `ErrorCode::UNKNOWN_TABLE_OR_BUCKET_EXCEPTION` | 21 | Metadata not yet available | +| `ErrorCode::REQUEST_TIME_OUT` | 25 | Request timed out | +| `ErrorCode::STORAGE_EXCEPTION` | 26 | Transient storage error | +| `ErrorCode::NOT_ENOUGH_REPLICAS_AFTER_APPEND_EXCEPTION` | 28 | Wrote to server but with low ISR size | +| `ErrorCode::NOT_ENOUGH_REPLICAS_EXCEPTION` | 29 | Low ISR size at write time | +| `ErrorCode::LEADER_NOT_AVAILABLE_EXCEPTION` | 44 | No leader available for partition | + +Client-side errors (`ErrorCode::CLIENT_ERROR`, code -2) always return `false` from `IsRetriable()`. + ## Common Error Scenarios ### Connection Refused