Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
[workspace.package]
authors = ["Apache Fluss <dev@fluss.apache.org>"]
categories = ["api-bindings", "database"]
description = "The rust implementation of fluss"
edition = "2024"
homepage = "https://fluss.apache.org/"
homepage = "https://clients.fluss.apache.org/"
license = "Apache-2.0"
repository = "https://github.com/apache/fluss-rust"
rust-version = "1.85"
Expand All @@ -36,7 +35,6 @@ fluss = { package = "fluss-rs", version = "0.2.0", path = "crates/fluss", featur
tokio = { version = "1.44.2", features = ["full"] }
clap = { version = "4.5.37", features = ["derive"] }
arrow = { version = "57.0.0", features = ["ipc_compression"] }

bigdecimal = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand Down
9 changes: 5 additions & 4 deletions crates/fluss/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ rust-version = { workspace = true }
version = { workspace = true }
name = "fluss-rs"
authors = { workspace = true }
description = { workspace = true }
homepage = { workspace = true }
description = "The official rust client of Apache Fluss (Incubating)"
homepage = "https://clients.fluss.apache.org/"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will wait util https://clients.fluss.apache.org/ is read.

repository = { workspace = true }
keywords = { workspace = true }
categories = { workspace = true }
documentation = "https://docs.rs/fluss-rs"

[lib]
name = "fluss"
Expand Down Expand Up @@ -80,6 +82,5 @@ jiff = { workspace = true, features = ["js"] }
testcontainers = "0.25.0"
test-env-helpers = "0.2.2"


[build-dependencies]
prost-build = { version = "0.14" }
prost-build = "0.14"
110 changes: 95 additions & 15 deletions crates/fluss/README.md
Original file line number Diff line number Diff line change
@@ -1,25 +1,105 @@
# Apache Fluss™ Rust Client (Incubating)
# Apache Fluss (Incubating) Official Rust Client

Rust client library for [Apache Fluss](https://fluss.apache.org/). This crate provides the core client used by the fluss-rust workspace and by the Python and C++ bindings.
Official Rust client library for [Apache Fluss (Incubating)](https://fluss.apache.org/).

# Todo: move how to use to the first, and how to build to the last, https://github.com/apache/opendal/blob/main/core/README.md
# is a good reference
[![crates.io](https://img.shields.io/crates/v/fluss-rs.svg)](https://crates.io/crates/fluss-rs)
[![docs.rs](https://img.shields.io/docsrs/fluss-rs)](https://docs.rs/fluss-rs/)

## Requirements
## Usage

- Rust (see [rust-toolchain.toml](../../rust-toolchain.toml) at repo root)
- protobuf (for build)
The following example shows both **primary key (KV) tables** and **log tables** in one flow: connect, create a KV table (upsert + lookup), then create a log table (append + scan).

## Build
```rust
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder how easy it is to maintain this code snippet long-term in README.md, since it's not compiled, so API changes will silently make it stale. We already have examples in crates/examples/ that cover these flows.

Would we consider to keep short quickstart snippet to just give users a feel for API, and link example folder for detailed workflows. This was inspired by what OpenDAL does: https://github.com/apache/opendal/blob/main/core/README.md

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@charlesdong1991

This crossed my mind as well (I partially mitigate for python package docs by using generating it from other existing website documentations).

Generally there are duplications in test cases, examples, documentations and package readmes.

It sounds like there's an opportunity to improve this e.g., test cases as ground truth, scripts then populate docs code snippets from test cases. This way, there's less effort in separately maintaining doc as well as ensuring that documents are accurate. The work it saves increases as we have more language bindings. The script can even highlight gaps in testing /feature.

But this might be something that we consider in the mid/longer term.

It's certainly worth coming up with a proposal for it (there might be some other OS project which already solves this.) Happy to collaborate on proposing something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the thoughtful discussion! I think we can keep the current approach for now —
the snippet is relatively small, and the API it demonstrates is stable enough that it's
unlikely to go stale anytime soon. Let's leave it as-is and revisit if it becomes a
real maintenance pain point down the road.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool! Let's keep as is and revisit if becoming an issue long term! Thanks for the discussion!

use fluss::client::EARLIEST_OFFSET;
use fluss::client::FlussConnection;
use fluss::config::Config;
use fluss::error::Result;
use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
use fluss::row::{GenericRow, InternalRow};
use std::time::Duration;

From the repository root:
#[tokio::main]
async fn main() -> Result<()> {
let mut config = Config::default();
config.bootstrap_servers = "127.0.0.1:9123".to_string();
let connection = FlussConnection::new(config).await?;
let admin = connection.get_admin().await?;

```bash
cargo build -p fluss-rs
// ---- Primary key (KV) table: upsert and lookup ----
let kv_path = TablePath::new("fluss", "users");
let mut kv_schema = Schema::builder()
.column("id", DataTypes::int())
.column("name", DataTypes::string())
.column("age", DataTypes::bigint())
.primary_key(vec!["id"]);
let kv_descriptor = TableDescriptor::builder()
.schema(kv_schema.build()?)
.build()?;
admin.create_table(&kv_path, &kv_descriptor, false).await?;

let kv_table = connection.get_table(&kv_path).await?;
let upsert_writer = kv_table.new_upsert()?.create_writer()?;
let mut row = GenericRow::new(3);
row.set_field(0, 1i32);
row.set_field(1, "Alice");
row.set_field(2, 30i64);
upsert_writer.upsert(&row)?;
upsert_writer.flush().await?;

let mut lookuper = kv_table.new_lookup()?.create_lookuper()?;
let mut key = GenericRow::new(1);
key.set_field(0, 1i32);
let result = lookuper.lookup(&key).await?;
if let Some(r) = result.get_single_row()? {
println!("KV lookup: id={}, name={}, age={}",
r.get_int(0)?, r.get_string(1)?, r.get_long(2)?);
}

// ---- Log table: append and scan ----
let log_path = TablePath::new("fluss", "events");
let log_schema = Schema::builder()
.column("ts", DataTypes::bigint())
.column("message", DataTypes::string())
.build()?;
let log_descriptor = TableDescriptor::builder()
.schema(log_schema)
.build()?;
admin.create_table(&log_path, &log_descriptor, false).await?;

let log_table = connection.get_table(&log_path).await?;
let append_writer = log_table.new_append()?.create_writer()?;
let mut event = GenericRow::new(2);
event.set_field(0, 1700000000i64);
event.set_field(1, "hello");
append_writer.append(&event)?;
append_writer.flush().await?;

let scanner = log_table.new_scan().create_log_scanner()?;
scanner.subscribe(0, EARLIEST_OFFSET).await?;
let scan_records = scanner.poll(Duration::from_secs(1)).await?;
for record in scan_records {
let r = record.row();
println!("Log scan: ts={}, message={}", r.get_long(0)?, r.get_string(1)?);
}

Ok(())
}
```

## Quick start and examples
## Storage Support

The Fluss client reads remote data by accessing Fluss’s **remote files** (e.g. log segments and snapshots) directly. The following **remote file systems** are supported; enable the matching feature(s) for your deployment:

| Storage Backend | Feature Flag | Status | Description |
|----------------|--------------|--------|-------------|
| Local Filesystem | `storage-fs` | ✅ Stable | Local filesystem storage |
| Amazon S3 | `storage-s3` | ✅ Stable | Amazon S3 storage |
| Alibaba Cloud OSS | `storage-oss` | ✅ Stable | Alibaba Cloud Object Storage Service |

## TODO
- [ ] Expand API documentation and usage examples in this README.
- [ ] Add more examples for table, log scan, and write flows.
You can enable all storage backends at once using the `storage-all` feature flag.

Example usage in Cargo.toml:
```toml
[dependencies]
fluss-rs = { version = "0.x.x", features = ["storage-s3", "storage-fs"] }
```
Loading