Skip to content

Commit 6c87b5f

Browse files
[Refactor] Split data cache engine into disk cache engine and memory cache engine. (backport #62760) (#62799)
Signed-off-by: trueeyu <[email protected]> Co-authored-by: trueeyu <[email protected]>
1 parent 790a87c commit 6c87b5f

35 files changed

+140
-112
lines changed

be/src/agent/task_worker_pool.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -848,7 +848,8 @@ void* ReportDataCacheMetricsTaskWorkerPool::_worker_thread_callback(void* arg_th
848848
request.__set_report_version(g_report_version.load(std::memory_order_relaxed));
849849

850850
TDataCacheMetrics t_metrics{};
851-
const LocalCacheEngine* cache = DataCache::GetInstance()->local_cache();
851+
// TODO: mem_metrics + disk_metrics
852+
const LocalCacheEngine* cache = DataCache::GetInstance()->local_disk_cache();
852853
if (cache != nullptr && cache->is_initialized()) {
853854
const auto metrics = cache->cache_metrics();
854855
DataCacheUtils::set_metrics_from_thrift(t_metrics, metrics);

be/src/bench/object_cache_bench.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,6 @@ void ObjectCacheBench::init_cache(CacheType cache_type) {
129129
_page_cache = std::make_shared<StoragePageCache>();
130130
_page_cache->init(_lru_cache.get());
131131
} else {
132-
opt.engine = "starcache";
133-
134132
_star_cache = std::make_shared<StarCacheEngine>();
135133
Status st = _star_cache->init(opt);
136134
if (!st.ok()) {

be/src/cache/block_cache/block_cache.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ BlockCache::~BlockCache() {
3636
(void)shutdown();
3737
}
3838

39-
Status BlockCache::init(const CacheOptions& options, std::shared_ptr<LocalCacheEngine> local_cache,
39+
Status BlockCache::init(const BlockCacheOptions& options, std::shared_ptr<LocalCacheEngine> local_cache,
4040
std::shared_ptr<RemoteCacheEngine> remote_cache) {
4141
_block_size = std::min(options.block_size, MAX_BLOCK_SIZE);
4242
_local_cache = std::move(local_cache);

be/src/cache/block_cache/block_cache.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class BlockCache {
3333
~BlockCache();
3434

3535
// Init the block cache instance
36-
Status init(const CacheOptions& options, std::shared_ptr<LocalCacheEngine> local_cache,
36+
Status init(const BlockCacheOptions& options, std::shared_ptr<LocalCacheEngine> local_cache,
3737
std::shared_ptr<RemoteCacheEngine> remote_cache);
3838

3939
// Write data buffer to cache, the `offset` must be aligned by block size

be/src/cache/cache_options.h

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,15 @@ struct DirSpace {
4242
size_t size;
4343
};
4444

45-
struct CacheOptions {
45+
struct RemoteCacheOptions {
46+
double skip_read_factor = 0;
47+
};
48+
49+
struct MemCacheOptions {
50+
size_t mem_space_size = 0;
51+
};
52+
53+
struct DiskCacheOptions {
4654
// basic
4755
size_t mem_space_size = 0;
4856
std::vector<DirSpace> dir_spaces;
@@ -54,7 +62,6 @@ struct CacheOptions {
5462
bool enable_direct_io = false;
5563
bool enable_tiered_cache = true;
5664
bool enable_datacache_persistence = false;
57-
std::string engine;
5865
size_t max_concurrent_inserts = 0;
5966
size_t max_flying_memory_mb = 0;
6067
double scheduler_threads_per_cpu = 0;
@@ -63,6 +70,10 @@ struct CacheOptions {
6370
std::string eviction_policy;
6471
};
6572

73+
struct BlockCacheOptions {
74+
size_t block_size = 0;
75+
};
76+
6677
struct WriteCacheOptions {
6778
int8_t priority = 0;
6879
// If ttl_seconds=0 (default), no ttl restriction will be set. If an old one exists, remove it.

be/src/cache/datacache.cpp

Lines changed: 50 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -44,37 +44,32 @@ Status DataCache::init(const std::vector<StorePath>& store_paths) {
4444
_page_cache = std::make_shared<StoragePageCache>();
4545

4646
#if defined(WITH_STARCACHE)
47-
if (config::datacache_engine == "" || config::datacache_engine == "starcache") {
48-
config::datacache_engine = "starcache";
49-
} else {
50-
config::datacache_engine = "lrucache";
51-
}
52-
#else
53-
config::datacache_engine = "lrucache";
47+
_local_disk_cache_engine = "starcache";
5448
#endif
49+
_local_mem_cache_engine = "lrucache";
5550

5651
if (!config::datacache_enable) {
5752
config::disable_storage_page_cache = true;
5853
config::block_cache_enable = false;
5954
return Status::OK();
6055
}
6156

62-
ASSIGN_OR_RETURN(auto cache_options, _init_cache_options());
57+
ASSIGN_OR_RETURN(auto mem_cache_options, _init_mem_cache_options());
6358

64-
if (config::datacache_engine == "starcache") {
6559
#if defined(WITH_STARCACHE)
66-
RETURN_IF_ERROR(_init_starcache_engine(&cache_options));
67-
RETURN_IF_ERROR(_init_peer_cache(cache_options));
60+
ASSIGN_OR_RETURN(auto disk_cache_options, _init_disk_cache_options());
61+
RETURN_IF_ERROR(_init_starcache_engine(&disk_cache_options));
6862

69-
if (config::block_cache_enable) {
70-
RETURN_IF_ERROR(_block_cache->init(cache_options, _local_cache, _remote_cache));
71-
}
72-
#else
73-
return Status::InternalError("starcache engine is not supported");
74-
#endif
75-
} else {
76-
RETURN_IF_ERROR(_init_lrucache_engine(cache_options));
63+
auto remote_cache_options = _init_remote_cache_options();
64+
RETURN_IF_ERROR(_init_peer_cache(remote_cache_options));
65+
66+
if (config::block_cache_enable) {
67+
auto block_cache_options = _init_block_cache_options();
68+
RETURN_IF_ERROR(_block_cache->init(block_cache_options, _local_disk_cache, _remote_cache));
7769
}
70+
#endif
71+
72+
RETURN_IF_ERROR(_init_lrucache_engine(mem_cache_options));
7873

7974
RETURN_IF_ERROR(_init_page_cache());
8075

@@ -100,14 +95,15 @@ void DataCache::destroy() {
10095
LOG(INFO) << "pagecache shutdown successfully";
10196

10297
_block_cache.reset();
103-
_local_cache.reset();
98+
_local_mem_cache.reset();
99+
_local_disk_cache.reset();
104100
_remote_cache.reset();
105101
LOG(INFO) << "datacache shutdown successfully";
106102
}
107103

108104
bool DataCache::adjust_mem_capacity(int64_t delta, size_t min_capacity) {
109-
if (_local_cache != nullptr) {
110-
Status st = _local_cache->adjust_mem_quota(delta, min_capacity);
105+
if (_local_mem_cache != nullptr) {
106+
Status st = _local_mem_cache->adjust_mem_quota(delta, min_capacity);
111107
if (st.ok()) {
112108
return true;
113109
} else {
@@ -119,52 +115,67 @@ bool DataCache::adjust_mem_capacity(int64_t delta, size_t min_capacity) {
119115
}
120116

121117
size_t DataCache::get_mem_capacity() const {
122-
if (_local_cache != nullptr) {
123-
return _local_cache->mem_quota();
118+
if (_local_mem_cache != nullptr) {
119+
return _local_mem_cache->mem_quota();
124120
} else {
125121
return 0;
126122
}
127123
}
128124

129-
Status DataCache::_init_lrucache_engine(const CacheOptions& cache_options) {
130-
_local_cache = std::make_shared<LRUCacheEngine>();
131-
RETURN_IF_ERROR(_local_cache->init(cache_options));
125+
Status DataCache::_init_lrucache_engine(const MemCacheOptions& cache_options) {
126+
_local_mem_cache = std::make_shared<LRUCacheEngine>();
127+
RETURN_IF_ERROR(reinterpret_cast<LRUCacheEngine*>(_local_mem_cache.get())->init(cache_options));
132128
LOG(INFO) << "lrucache engine init successfully";
133129
return Status::OK();
134130
}
135131

136132
Status DataCache::_init_page_cache() {
137-
_page_cache->init(_local_cache.get());
133+
_page_cache->init(_local_mem_cache.get());
138134
_page_cache->init_metrics();
139135
LOG(INFO) << "storage page cache init successfully";
140136
return Status::OK();
141137
}
142138

143139
#if defined(WITH_STARCACHE)
144-
Status DataCache::_init_starcache_engine(CacheOptions* cache_options) {
140+
Status DataCache::_init_starcache_engine(DiskCacheOptions* cache_options) {
145141
// init starcache & disk monitor
146142
// TODO: DiskSpaceMonitor needs to be decoupled from StarCacheEngine.
147-
_local_cache = std::make_shared<StarCacheEngine>();
148-
_disk_space_monitor = std::make_shared<DiskSpaceMonitor>(_local_cache.get());
143+
_local_disk_cache = std::make_shared<StarCacheEngine>();
144+
_disk_space_monitor = std::make_shared<DiskSpaceMonitor>(_local_disk_cache.get());
149145
RETURN_IF_ERROR(_disk_space_monitor->init(&cache_options->dir_spaces));
150-
RETURN_IF_ERROR(_local_cache->init(*cache_options));
146+
RETURN_IF_ERROR(reinterpret_cast<StarCacheEngine*>(_local_disk_cache.get())->init(*cache_options));
151147
_disk_space_monitor->start();
152148
return Status::OK();
153149
}
154150

155-
Status DataCache::_init_peer_cache(const CacheOptions& cache_options) {
151+
Status DataCache::_init_peer_cache(const RemoteCacheOptions& cache_options) {
156152
_remote_cache = std::make_shared<PeerCacheEngine>();
157153
return _remote_cache->init(cache_options);
158154
}
159155
#endif
160156

161-
StatusOr<CacheOptions> DataCache::_init_cache_options() {
162-
CacheOptions cache_options;
157+
RemoteCacheOptions DataCache::_init_remote_cache_options() {
158+
RemoteCacheOptions cache_options{.skip_read_factor = config::datacache_skip_read_factor};
159+
return cache_options;
160+
}
161+
162+
StatusOr<MemCacheOptions> DataCache::_init_mem_cache_options() {
163+
MemCacheOptions cache_options;
163164
RETURN_IF_ERROR(DataCacheUtils::parse_conf_datacache_mem_size(
164165
config::datacache_mem_size, _global_env->process_mem_limit(), &cache_options.mem_space_size));
165-
cache_options.engine = config::datacache_engine;
166+
return cache_options;
167+
}
168+
169+
BlockCacheOptions DataCache::_init_block_cache_options() {
170+
BlockCacheOptions cache_options;
171+
cache_options.block_size = config::datacache_block_size;
172+
return cache_options;
173+
}
174+
175+
StatusOr<DiskCacheOptions> DataCache::_init_disk_cache_options() {
176+
DiskCacheOptions cache_options;
166177

167-
if (config::datacache_engine == "starcache") {
178+
if (_local_disk_cache_engine == "starcache") {
168179
#ifdef USE_STAROS
169180
std::vector<string> corresponding_starlet_dirs;
170181
if (config::datacache_unified_instance_enable && !config::starlet_cache_dir.empty()) {
@@ -276,8 +287,8 @@ void DataCache::try_release_resource_before_core_dump() {
276287
return release_all || modules.contains(name);
277288
};
278289

279-
if (_local_cache != nullptr && need_release("data_cache")) {
280-
(void)_local_cache->update_mem_quota(0, false);
290+
if (_local_mem_cache != nullptr && need_release("data_cache")) {
291+
(void)_local_mem_cache->update_mem_quota(0, false);
281292
}
282293
}
283294

be/src/cache/datacache.h

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ namespace starrocks {
2323
class Status;
2424
class StorePath;
2525
class RemoteCacheEngine;
26-
class CacheOptions;
26+
class DiskCacheOptions;
2727
class GlobalEnv;
2828
class DiskSpaceMonitor;
2929
class MemSpaceMonitor;
@@ -39,10 +39,16 @@ class DataCache {
3939

4040
void try_release_resource_before_core_dump();
4141

42-
void set_local_cache(std::shared_ptr<LocalCacheEngine> local_cache) { _local_cache = std::move(local_cache); }
42+
void set_local_mem_cache(std::shared_ptr<LocalCacheEngine> local_mem_cache) {
43+
_local_mem_cache = std::move(local_mem_cache);
44+
}
45+
void set_local_disk_cache(std::shared_ptr<LocalCacheEngine> local_disk_cache) {
46+
_local_disk_cache = std::move(local_disk_cache);
47+
}
4348
void set_page_cache(std::shared_ptr<StoragePageCache> page_cache) { _page_cache = std::move(page_cache); }
4449

45-
LocalCacheEngine* local_cache() { return _local_cache.get(); }
50+
LocalCacheEngine* local_mem_cache() { return _local_mem_cache.get(); }
51+
LocalCacheEngine* local_disk_cache() { return _local_disk_cache.get(); }
4652
BlockCache* block_cache() const { return _block_cache.get(); }
4753
void set_block_cache(std::shared_ptr<BlockCache> block_cache) { _block_cache = std::move(block_cache); }
4854
StoragePageCache* page_cache() const { return _page_cache.get(); }
@@ -56,19 +62,26 @@ class DataCache {
5662
size_t get_mem_capacity() const;
5763

5864
private:
59-
StatusOr<CacheOptions> _init_cache_options();
65+
StatusOr<MemCacheOptions> _init_mem_cache_options();
66+
StatusOr<DiskCacheOptions> _init_disk_cache_options();
67+
RemoteCacheOptions _init_remote_cache_options();
68+
BlockCacheOptions _init_block_cache_options();
69+
6070
#if defined(WITH_STARCACHE)
61-
Status _init_starcache_engine(CacheOptions* cache_options);
62-
Status _init_peer_cache(const CacheOptions& cache_options);
71+
Status _init_starcache_engine(DiskCacheOptions* cache_options);
72+
Status _init_peer_cache(const RemoteCacheOptions& cache_options);
6373
#endif
64-
Status _init_lrucache_engine(const CacheOptions& cache_options);
74+
Status _init_lrucache_engine(const MemCacheOptions& cache_options);
6575
Status _init_page_cache();
6676

6777
GlobalEnv* _global_env;
6878
std::vector<StorePath> _store_paths;
6979

7080
// cache engine
71-
std::shared_ptr<LocalCacheEngine> _local_cache;
81+
std::string _local_mem_cache_engine;
82+
std::string _local_disk_cache_engine;
83+
std::shared_ptr<LocalCacheEngine> _local_mem_cache;
84+
std::shared_ptr<LocalCacheEngine> _local_disk_cache;
7285
std::shared_ptr<RemoteCacheEngine> _remote_cache;
7386

7487
std::shared_ptr<BlockCache> _block_cache;

be/src/cache/local_cache_engine.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ class LocalCacheEngine {
2727
public:
2828
virtual ~LocalCacheEngine() = default;
2929

30-
virtual Status init(const CacheOptions& options) = 0;
3130
virtual bool is_initialized() const = 0;
3231

3332
// Write data to cache

be/src/cache/lrucache_engine.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
#include <butil/fast_rand.h>
1818

1919
namespace starrocks {
20-
Status LRUCacheEngine::init(const CacheOptions& options) {
20+
Status LRUCacheEngine::init(const MemCacheOptions& options) {
2121
_cache = std::make_unique<ShardedLRUCache>(options.mem_space_size);
2222
_initialized.store(true, std::memory_order_relaxed);
2323
return Status::OK();

be/src/cache/lrucache_engine.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class LRUCacheEngine final : public LocalCacheEngine {
2525
LRUCacheEngine() = default;
2626
virtual ~LRUCacheEngine() override = default;
2727

28-
Status init(const CacheOptions& options) override;
28+
Status init(const MemCacheOptions& options);
2929
bool is_initialized() const override { return _initialized.load(std::memory_order_relaxed); }
3030

3131
Status write(const std::string& key, const IOBuffer& buffer, WriteCacheOptions* options) override;

0 commit comments

Comments
 (0)