diff --git a/Cargo.toml b/Cargo.toml index 58489778..d4d262ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,9 +18,8 @@ [workspace.package] authors = ["Apache Fluss "] categories = ["api-bindings", "database"] -description = "The rust implementation of fluss" edition = "2024" -homepage = "https://fluss.apache.org/" +homepage = "https://clients.fluss.apache.org/" license = "Apache-2.0" repository = "https://github.com/apache/fluss-rust" rust-version = "1.85" @@ -36,7 +35,6 @@ fluss = { package = "fluss-rs", version = "0.2.0", path = "crates/fluss", featur tokio = { version = "1.44.2", features = ["full"] } clap = { version = "4.5.37", features = ["derive"] } arrow = { version = "57.0.0", features = ["ipc_compression"] } - bigdecimal = "0.4" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml index db1348a0..e33ecb50 100644 --- a/crates/fluss/Cargo.toml +++ b/crates/fluss/Cargo.toml @@ -22,10 +22,12 @@ rust-version = { workspace = true } version = { workspace = true } name = "fluss-rs" authors = { workspace = true } -description = { workspace = true } -homepage = { workspace = true } +description = "The official rust client of Apache Fluss (Incubating)" +homepage = "https://clients.fluss.apache.org/" repository = { workspace = true } keywords = { workspace = true } +categories = { workspace = true } +documentation = "https://docs.rs/fluss-rs" [lib] name = "fluss" @@ -80,6 +82,5 @@ jiff = { workspace = true, features = ["js"] } testcontainers = "0.25.0" test-env-helpers = "0.2.2" - [build-dependencies] -prost-build = { version = "0.14" } +prost-build = "0.14" diff --git a/crates/fluss/README.md b/crates/fluss/README.md index aad8de96..b37a75df 100644 --- a/crates/fluss/README.md +++ b/crates/fluss/README.md @@ -1,25 +1,105 @@ -# Apache Fluss™ Rust Client (Incubating) +# Apache Fluss (Incubating) Official Rust Client -Rust client library for [Apache Fluss™](https://fluss.apache.org/). This crate provides the core client used by the fluss-rust workspace and by the Python and C++ bindings. +Official Rust client library for [Apache Fluss (Incubating)](https://fluss.apache.org/). -# Todo: move how to use to the first, and how to build to the last, https://github.com/apache/opendal/blob/main/core/README.md -# is a good reference +[![crates.io](https://img.shields.io/crates/v/fluss-rs.svg)](https://crates.io/crates/fluss-rs) +[![docs.rs](https://img.shields.io/docsrs/fluss-rs)](https://docs.rs/fluss-rs/) -## Requirements +## Usage -- Rust (see [rust-toolchain.toml](../../rust-toolchain.toml) at repo root) -- protobuf (for build) +The following example shows both **primary key (KV) tables** and **log tables** in one flow: connect, create a KV table (upsert + lookup), then create a log table (append + scan). -## Build +```rust +use fluss::client::EARLIEST_OFFSET; +use fluss::client::FlussConnection; +use fluss::config::Config; +use fluss::error::Result; +use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; +use fluss::row::{GenericRow, InternalRow}; +use std::time::Duration; -From the repository root: +#[tokio::main] +async fn main() -> Result<()> { + let mut config = Config::default(); + config.bootstrap_servers = "127.0.0.1:9123".to_string(); + let connection = FlussConnection::new(config).await?; + let admin = connection.get_admin().await?; -```bash -cargo build -p fluss-rs + // ---- Primary key (KV) table: upsert and lookup ---- + let kv_path = TablePath::new("fluss", "users"); + let mut kv_schema = Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .column("age", DataTypes::bigint()) + .primary_key(vec!["id"]); + let kv_descriptor = TableDescriptor::builder() + .schema(kv_schema.build()?) + .build()?; + admin.create_table(&kv_path, &kv_descriptor, false).await?; + + let kv_table = connection.get_table(&kv_path).await?; + let upsert_writer = kv_table.new_upsert()?.create_writer()?; + let mut row = GenericRow::new(3); + row.set_field(0, 1i32); + row.set_field(1, "Alice"); + row.set_field(2, 30i64); + upsert_writer.upsert(&row)?; + upsert_writer.flush().await?; + + let mut lookuper = kv_table.new_lookup()?.create_lookuper()?; + let mut key = GenericRow::new(1); + key.set_field(0, 1i32); + let result = lookuper.lookup(&key).await?; + if let Some(r) = result.get_single_row()? { + println!("KV lookup: id={}, name={}, age={}", + r.get_int(0)?, r.get_string(1)?, r.get_long(2)?); + } + + // ---- Log table: append and scan ---- + let log_path = TablePath::new("fluss", "events"); + let log_schema = Schema::builder() + .column("ts", DataTypes::bigint()) + .column("message", DataTypes::string()) + .build()?; + let log_descriptor = TableDescriptor::builder() + .schema(log_schema) + .build()?; + admin.create_table(&log_path, &log_descriptor, false).await?; + + let log_table = connection.get_table(&log_path).await?; + let append_writer = log_table.new_append()?.create_writer()?; + let mut event = GenericRow::new(2); + event.set_field(0, 1700000000i64); + event.set_field(1, "hello"); + append_writer.append(&event)?; + append_writer.flush().await?; + + let scanner = log_table.new_scan().create_log_scanner()?; + scanner.subscribe(0, EARLIEST_OFFSET).await?; + let scan_records = scanner.poll(Duration::from_secs(1)).await?; + for record in scan_records { + let r = record.row(); + println!("Log scan: ts={}, message={}", r.get_long(0)?, r.get_string(1)?); + } + + Ok(()) +} ``` -## Quick start and examples +## Storage Support + +The Fluss client reads remote data by accessing Fluss’s **remote files** (e.g. log segments and snapshots) directly. The following **remote file systems** are supported; enable the matching feature(s) for your deployment: + +| Storage Backend | Feature Flag | Status | Description | +|----------------|--------------|--------|-------------| +| Local Filesystem | `storage-fs` | ✅ Stable | Local filesystem storage | +| Amazon S3 | `storage-s3` | ✅ Stable | Amazon S3 storage | +| Alibaba Cloud OSS | `storage-oss` | ✅ Stable | Alibaba Cloud Object Storage Service | -## TODO -- [ ] Expand API documentation and usage examples in this README. -- [ ] Add more examples for table, log scan, and write flows. +You can enable all storage backends at once using the `storage-all` feature flag. + +Example usage in Cargo.toml: +```toml +[dependencies] +fluss-rs = { version = "0.x.x", features = ["storage-s3", "storage-fs"] } +```