Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,18 @@ struct ErrorCode {
static constexpr int INVALID_ALTER_TABLE_EXCEPTION = 56;
/// Deletion operations are disabled on this table.
static constexpr int DELETION_DISABLED_EXCEPTION = 57;

/// Returns true if retrying the request may succeed. Mirrors Java's RetriableException hierarchy.
static constexpr bool IsRetriable(int32_t code) {
return code == NETWORK_EXCEPTION || code == CORRUPT_MESSAGE ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we update docs, btw?

code == SCHEMA_NOT_EXIST || code == LOG_STORAGE_EXCEPTION ||
code == KV_STORAGE_EXCEPTION || code == NOT_LEADER_OR_FOLLOWER ||
code == CORRUPT_RECORD_EXCEPTION ||
code == UNKNOWN_TABLE_OR_BUCKET_EXCEPTION || code == REQUEST_TIME_OUT ||
code == STORAGE_EXCEPTION ||
code == NOT_ENOUGH_REPLICAS_AFTER_APPEND_EXCEPTION ||
code == NOT_ENOUGH_REPLICAS_EXCEPTION || code == LEADER_NOT_AVAILABLE_EXCEPTION;
}
};

struct Date {
Expand Down Expand Up @@ -326,6 +338,9 @@ struct Result {
std::string error_message;

bool Ok() const { return error_code == 0; }

/// Returns true if retrying the request may succeed. Client-side errors always return false.
bool IsRetriable() const { return ErrorCode::IsRetriable(error_code); }
};

struct TablePath {
Expand Down
2 changes: 2 additions & 0 deletions bindings/python/fluss/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,8 @@ class FlussError(Exception):
error_code: int
def __init__(self, message: str, error_code: int = -2) -> None: ...
def __str__(self) -> str: ...
@property
def is_retriable(self) -> bool: ...

class LakeSnapshot:
def __init__(self, snapshot_id: int) -> None: ...
Expand Down
10 changes: 10 additions & 0 deletions bindings/python/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ impl FlussError {
format!("FlussError: {}", self.message)
}
}

/// Returns ``True`` if retrying the request may succeed. Client-side errors always return ``False``.
#[getter]
fn is_retriable(&self) -> bool {
use fluss::rpc::FlussError as CoreFlussError;
if self.error_code == CLIENT_ERROR_CODE {
return false;
}
CoreFlussError::for_code(self.error_code).is_retriable()
}
}

impl FlussError {
Expand Down
11 changes: 11 additions & 0 deletions crates/fluss/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,17 @@ impl Error {
None
}
}

/// Returns `true` if retrying the request may succeed.
/// [`Error::RpcError`] is always retriable; [`Error::FlussAPIError`] delegates to
/// [`ApiError::is_retriable`]; all other variants are not.
pub fn is_retriable(&self) -> bool {
match self {
Error::RpcError { .. } => true,
Error::FlussAPIError { api_error } => api_error.is_retriable(),
_ => false,
}
}
}

impl From<ArrowError> for Error {
Expand Down
77 changes: 77 additions & 0 deletions crates/fluss/src/rpc/fluss_api_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ impl Display for ApiError {
}
}

impl ApiError {
/// Returns `true` if retrying the request may succeed. Delegates to [`FlussError::is_retriable`].
pub fn is_retriable(&self) -> bool {
FlussError::for_code(self.code).is_retriable()
}
}

/// Fluss protocol errors. These errors are part of the client-server protocol.
/// The error codes cannot be changed, but the names can be.
///
Expand Down Expand Up @@ -172,6 +179,25 @@ impl FlussError {
*self as i32
}

pub fn is_retriable(&self) -> bool {
matches!(
self,
FlussError::NetworkException
| FlussError::CorruptMessage
| FlussError::SchemaNotExist
| FlussError::LogStorageException
| FlussError::KvStorageException
| FlussError::NotLeaderOrFollower
| FlussError::CorruptRecordException
| FlussError::UnknownTableOrBucketException
| FlussError::RequestTimeOut
| FlussError::StorageException
| FlussError::NotEnoughReplicasAfterAppendException
| FlussError::NotEnoughReplicasException
| FlussError::LeaderNotAvailableException
)
}

/// Returns a friendly description of the error.
pub fn message(&self) -> &'static str {
match self {
Expand Down Expand Up @@ -403,4 +429,55 @@ mod tests {
let fluss_error = FlussError::from(api_error);
assert_eq!(fluss_error, FlussError::TableNotExist);
}

#[test]
fn is_retriable_known_retriable_errors() {
let retriable = [
FlussError::NetworkException,
FlussError::CorruptMessage,
FlussError::SchemaNotExist,
FlussError::LogStorageException,
FlussError::KvStorageException,
FlussError::NotLeaderOrFollower,
FlussError::CorruptRecordException,
FlussError::UnknownTableOrBucketException,
FlussError::RequestTimeOut,
FlussError::StorageException,
FlussError::NotEnoughReplicasAfterAppendException,
FlussError::NotEnoughReplicasException,
FlussError::LeaderNotAvailableException,
];
for err in &retriable {
assert!(err.is_retriable(), "{err:?} should be retriable");
}
}

#[test]
fn is_retriable_known_non_retriable_errors() {
let non_retriable = [
FlussError::UnknownServerError,
FlussError::None,
FlussError::TableNotExist,
FlussError::AuthenticateException,
FlussError::AuthorizationException,
FlussError::RecordTooLargeException,
FlussError::DeletionDisabledException,
FlussError::InvalidCoordinatorException,
FlussError::FencedLeaderEpochException,
FlussError::FencedTieringEpochException,
FlussError::RetriableAuthenticateException,
];
for err in &non_retriable {
assert!(!err.is_retriable(), "{err:?} should not be retriable");
}
}

#[test]
fn api_error_is_retriable_delegates_to_fluss_error() {
let retriable_api = FlussError::RequestTimeOut.to_api_error(None);
assert!(retriable_api.is_retriable());

let permanent_api = FlussError::TableNotExist.to_api_error(None);
assert!(!permanent_api.is_retriable());
}
}