diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..bacc9e2 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,34 @@ +name: CI + +on: + push: + branches: [main] + paths-ignore: + - '**.md' + - 'guides/**' + - 'docs/**' + - 'LICENSE' + pull_request: + branches: [main] + paths-ignore: + - '**.md' + - 'guides/**' + - 'docs/**' + - 'LICENSE' + +jobs: + ci: + uses: Taure/erlang-ci/.github/workflows/ci.yml@v2 + permissions: + contents: write + pull-requests: write + with: + enable-ct: true + enable-hank: true + enable-audit: true + enable-sbom: true + enable-sbom-scan: true + enable-dependency-submission: true + enable-mutate: true + enable-summary: true + secrets: inherit diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..0e0f551 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,16 @@ +name: Release + +on: + push: + branches: [main] + paths-ignore: + - '**.md' + - 'guides/**' + - 'docs/**' + - 'LICENSE' + +jobs: + release: + uses: Taure/erlang-ci/.github/workflows/release.yml@v2 + permissions: + contents: write diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..74ddc46 --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ +_build/ +.rebar3/ +*.beam +*.crashdump +rebar.lock +doc/ +ebin/ +.eunit/ +logs/ +*.iml +.idea/ +.vscode/ diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..f4b329e --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,9 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +## [unreleased] + +### Features + +- Initial v0.1 scaffold: `nova_cache_adapter` and `nova_cache_invalidator` behaviours, public API, `nova_cache_ets` adapter, `nova_cache_invalidator_pg` transport, single-flight `fetch/3`. diff --git a/README.md b/README.md index 6c39a10..1c1f33e 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,62 @@ # nova_cache -General-purpose KV cache library for the Nova ecosystem + +General-purpose KV cache library for the Nova ecosystem. + +`nova_cache` is **not** a dependency of Nova core and must never become one. + +## Quick start + +```erlang +%% sys.config +{nova_cache, [ + {caches, #{ + user_lookup => #{ + adapter => nova_cache_ets, + ttl_default => 60_000, + max_size => 10_000 + } + }} +]}. + +%% application code +ok = nova_cache:put(user_lookup, <<"alice">>, #{role => admin}), +{ok, User} = nova_cache:get(user_lookup, <<"alice">>), +{ok, User} = nova_cache:fetch(user_lookup, <<"alice">>, fun load_user/0). +``` + +## Adapters + +| Adapter | Status | +| ------------------ | ------ | +| `nova_cache_ets` | v0.1 | +| `nova_cache_redis` | v0.2 | + +## Invalidation transports + +| Transport | Status | +| --------------------------- | ------ | +| `nova_cache_invalidator_pg` | v0.1 | + +## Build + +```sh +rebar3 compile +rebar3 dialyzer +rebar3 xref +``` + +## Test + +```sh +rebar3 ct +rebar3 eunit +rebar3 mutate +``` + +## Documentation + +See the [guides](guides/) directory. + +## License + +Apache-2.0. diff --git a/cliff.toml b/cliff.toml new file mode 100644 index 0000000..515d029 --- /dev/null +++ b/cliff.toml @@ -0,0 +1,38 @@ +[changelog] +header = """ +# Changelog\n +All notable changes to this project will be documented in this file.\n +""" +body = """ +{% if version %}\ + ## [{{ version | trim_start_matches(pat="v") }}] - {{ timestamp | date(format="%Y-%m-%d") }} +{% else %}\ + ## [unreleased] +{% endif %}\ +{% for group, commits in commits | group_by(attribute="group") %} + ### {{ group | striptags | trim | upper_first }} + {% for commit in commits %} + - {% if commit.scope %}*({{ commit.scope }})* {% endif %}\ + {{ commit.message | upper_first }}\ + {% endfor %} +{% endfor %}\n +""" +trim = true + +[git] +conventional_commits = true +filter_unconventional = true +split_commits = false +commit_parsers = [ + { message = "^feat", group = "Features" }, + { message = "^fix", group = "Bug Fixes" }, + { message = "^docs", group = "Documentation" }, + { message = "^refactor", group = "Refactor" }, + { message = "^test", group = "Testing" }, + { message = "^chore\\(release\\)", skip = true }, + { message = "^chore", group = "Miscellaneous" }, + { message = "^ci", skip = true }, +] +protect_breaking_commits = false +tag_pattern = "v[0-9].*" +sort_commits = "oldest" diff --git a/guides/adapters.md b/guides/adapters.md new file mode 100644 index 0000000..4528ce8 --- /dev/null +++ b/guides/adapters.md @@ -0,0 +1,38 @@ +# Adapters + +Adapters implement the `nova_cache_adapter` behaviour and own their own +storage process. The `State` returned from `start_link/2`-time registration is +opaque to `nova_cache` and passed back to every subsequent callback. + +## Shipped adapters + +### `nova_cache_ets` + +In-process ETS table per cache. Direct concurrent reads and writes from the +caller's process. Periodic sweep purges expired rows. Soft `max_size` +enforced at sweep time with LRU-on-table-order eviction. + +Configuration: + +| Option | Default | Notes | +| ---------------- | ------------ | ----------------------------------------- | +| `ttl_default` | `infinity` | Default TTL applied when `put` omits one. | +| `max_size` | `infinity` | Soft bound; evictions happen on sweep. | +| `sweep_interval` | `60_000` | Milliseconds. | +| `invalidation` | `best_effort`| `best_effort | ttl_only | strict`. | + +`strict` mode refuses to start without `ttl_default`. + +## Writing a new adapter + +1. `-behaviour(nova_cache_adapter).` +2. Implement `start_link/2`, `get/2`, `put/4`, `delete/2`, `delete_many/2`, `clear/1`. +3. Optionally implement `get_many/2` and `put_many/2`. +4. Register with `nova_cache_registry:register(Name, ?MODULE, State)` from + `init/1` so the public API can route calls. +5. Subscribe to invalidation events on startup if you want cluster + propagation. + +Adapter callbacks may execute in the caller's process or proxy through the +adapter's own gen_server. That's the adapter's choice; the contract is the +return values, not the process topology. diff --git a/guides/getting-started.md b/guides/getting-started.md new file mode 100644 index 0000000..7d4c5e6 --- /dev/null +++ b/guides/getting-started.md @@ -0,0 +1,71 @@ +# Getting Started + +## Installation + +```erlang +{deps, [ + {nova_cache, {git, "https://github.com/novaframework/nova_cache.git", {branch, "main"}}} +]}. +``` + +## Configuration + +Declare your caches in `sys.config`: + +```erlang +{nova_cache, [ + {caches, #{ + user_lookup => #{ + adapter => nova_cache_ets, + ttl_default => 60_000, + max_size => 10_000, + sweep_interval => 60_000, + invalidation => best_effort + } + }}, + {invalidator, nova_cache_invalidator_pg} +]}. +``` + +One supervised process per declared cache starts under `nova_cache_sup`. + +## Reading and writing + +```erlang +ok = nova_cache:put(user_lookup, <<"alice">>, User). +{ok, User} = nova_cache:get(user_lookup, <<"alice">>). +User = nova_cache:get(user_lookup, <<"alice">>, #{role => guest}). +``` + +## get-or-compute + +```erlang +{ok, User} = nova_cache:fetch(user_lookup, <<"alice">>, fun() -> + case load_from_db(<<"alice">>) of + {ok, U} -> {ok, U}; + not_found -> {error, not_found} + end +end). +``` + +Concurrent callers for the same key are deduplicated via single-flight. +Disable with `#{single_flight => false}` if you need pass-through semantics. + +## Negative caching + +Off by default. Opt in per call: + +```erlang +%% short negative TTL (5 seconds): +nova_cache:fetch(user_lookup, <<"alice">>, F, #{cache_errors => {true, #{ttl => 5_000}}}). +``` + +## Invalidation across the cluster + +```erlang +ok = nova_cache:invalidate(user_lookup, <<"alice">>). +``` + +The configured invalidator transport broadcasts the event to every subscribing +node. Each node purges its local copy on receipt. Delivery is best-effort +eventual; see the Invalidation guide for the failure model. diff --git a/guides/invalidation.md b/guides/invalidation.md new file mode 100644 index 0000000..6aa9bac --- /dev/null +++ b/guides/invalidation.md @@ -0,0 +1,46 @@ +# Invalidation + +`nova_cache` ships cluster invalidation as a swappable transport behaviour. +The default transport is `nova_cache_invalidator_pg`, built on `pg`. + +## Guarantee + +**Best-effort eventual.** TTL is the correctness backstop. A node that is +netsplit, GC-paused, or just-joined may miss broadcasts and serve stale data +until the row expires. + +## Per-cache mode + +Configured via the `invalidation` key in the cache spec: + +| Mode | Behaviour | +| ------------- | ------------------------------------------------------------------------ | +| `best_effort` | Subscribe to broadcasts; serve stale on miss. Default. | +| `ttl_only` | Skip broadcasts entirely; rely solely on TTL. | +| `strict` | Best-effort plus refuses to start without `ttl_default`. Bounds staleness.| + +## Failure mode: a node misses a broadcast + +It serves stale data until the row's TTL elapses. If the row was written with +`ttl => infinity`, it serves stale data indefinitely. This is the design. + +For workloads where "indefinitely stale" is unacceptable, use `strict` mode +and a finite `ttl_default`. + +## Failure mode: a node joins late + +On join, the node's caches start empty. They subscribe and start receiving +broadcasts immediately. There is no backfill of historical events. Existing +entries on the joining node (e.g. after a netsplit heal) are not purged +automatically in `best_effort` mode -- if you need that, run `clear/1` from +your application's join handler. + +## Writing a new transport + +1. `-behaviour(nova_cache_invalidator).` +2. Implement `start_link/1`, `subscribe/2`, `broadcast/2`. +3. Deliver event payloads to subscribed handlers on every node. +4. Set `{nova_cache, [{invalidator, your_module}]}` in `sys.config`. + +Transports must deliver events unchanged. They may drop events on failure +without compromising correctness, because TTL is the backstop. diff --git a/guides/telemetry.md b/guides/telemetry.md new file mode 100644 index 0000000..32f0dcf --- /dev/null +++ b/guides/telemetry.md @@ -0,0 +1,36 @@ +# Telemetry + +`nova_cache` emits OpenTelemetry counters and spans when `opentelemetry_api` +is available at runtime. The dependency is optional; callers without it still +build and run. + +## Counters + +| Name | Attributes | +| ------------------- | ----------------------- | +| `nova_cache.hit` | `cache.name` | +| `nova_cache.miss` | `cache.name` | +| `nova_cache.evict` | `cache.name`, `reason` | + +`reason` is `ttl` (lazy expiry on get), `sweep` (periodic sweeper), or +`max_size` (LRU eviction). + +## Spans + +| Name | Attributes | +| --------------------- | ------------------------------------------- | +| `nova_cache.fetch` | `cache.name`, `cache.key.length`, `result` | +| `nova_cache.put` | `cache.name`, `cache.key.length` | +| `nova_cache.invalidate` | `cache.name`, `cache.invalidate.scope` | + +`result` is `hit | miss | loaded | error`. + +## Enabling + +Add `opentelemetry_api` to your project's `rebar.config`. `nova_cache` +detects the module at runtime and starts emitting events. No configuration on +`nova_cache` itself is required. + +The OpenTelemetry-aware sibling library `opentelemetry_nova_cache` (planned +for v0.2) will install the trace/metric pipeline; until then, configure it in +your application's own OpenTelemetry setup. diff --git a/rebar.config b/rebar.config new file mode 100644 index 0000000..a3a6359 --- /dev/null +++ b/rebar.config @@ -0,0 +1,76 @@ +{erl_opts, [debug_info, warnings_as_errors]}. + +{deps, []}. + +{project_plugins, [ + erlfmt, + rebar3_lint, + rebar3_hank, + rebar3_ex_doc, + rebar3_audit, + {rebar3_sbom, + {git, "https://github.com/Taure/rebar3_sbom.git", {branch, "feat/include-otp-components"}}}, + rebar3_mutate +]}. + +{erlfmt, [write]}. + +{hank, [ + {ignore, [ + "src/nova_cache_adapter.erl", + "src/nova_cache_invalidator.erl", + "test/**" + ]} +]}. + +{xref_ignores, [ + {opentelemetry_api, '_', '_'}, + {otel_tracer, '_', '_'}, + {otel_span, '_', '_'}, + {otel_counter, '_', '_'}, + {nova_cache, get, 2}, + {nova_cache, get, 3}, + {nova_cache, put, 3}, + {nova_cache, put, 4}, + {nova_cache, fetch, 3}, + {nova_cache, fetch, 4}, + {nova_cache, delete, 2}, + {nova_cache, delete_many, 2}, + {nova_cache, invalidate, 2}, + {nova_cache, clear, 1}, + {nova_cache_registry, start_link, 0}, + {nova_cache_registry, list, 0}, + {nova_cache_sup, start_cache, 2}, + {nova_cache_sup, stop_cache, 1} +]}. + +{dialyzer, [ + {warnings, [error_handling, unmatched_returns]}, + {exclude_mods, []} +]}. + +{ex_doc, [ + {extras, [ + {"README.md", #{title => <<"Overview">>}}, + {"guides/getting-started.md", #{title => <<"Getting Started">>}}, + {"guides/adapters.md", #{title => <<"Adapters">>}}, + {"guides/invalidation.md", #{title => <<"Invalidation">>}}, + {"guides/telemetry.md", #{title => <<"Telemetry">>}} + ]}, + {main, <<"readme">>}, + {source_url, <<"https://github.com/novaframework/nova_cache">>}, + {groups_for_modules, [ + {<<"Core API">>, [nova_cache]}, + {<<"Behaviours">>, [nova_cache_adapter, nova_cache_invalidator]}, + {<<"Adapters">>, [nova_cache_ets]}, + {<<"Invalidation">>, [nova_cache_invalidator_pg]}, + {<<"Internal">>, [nova_cache_registry, nova_cache_single_flight]} + ]} +]}. + +{profiles, [ + {test, [ + {erl_opts, [nowarn_export_all]}, + {deps, [{meck, "~> 0.9"}]} + ]} +]}. diff --git a/src/nova_cache.app.src b/src/nova_cache.app.src new file mode 100644 index 0000000..ec5caf2 --- /dev/null +++ b/src/nova_cache.app.src @@ -0,0 +1,11 @@ +{application, nova_cache, [ + {description, "General-purpose KV cache library for the Nova ecosystem"}, + {vsn, "git"}, + {registered, [nova_cache_sup, nova_cache_registry]}, + {applications, [kernel, stdlib]}, + {mod, {nova_cache_app, []}}, + {env, [{caches, #{}}]}, + {modules, []}, + {licenses, ["Apache-2.0"]}, + {links, [{"GitHub", "https://github.com/novaframework/nova_cache"}]} +]}. diff --git a/src/nova_cache.erl b/src/nova_cache.erl new file mode 100644 index 0000000..68bc580 --- /dev/null +++ b/src/nova_cache.erl @@ -0,0 +1,197 @@ +-module(nova_cache). +-moduledoc """ +Public API for the nova_cache library. + +A general-purpose KV cache with pluggable adapters and distributed invalidation. +All time values are in milliseconds. + +## Quick start + +```erlang +%% In sys.config / app env: +{nova_cache, [ + {caches, #{ + user_lookup => #{adapter => nova_cache_ets, ttl_default => 60000, max_size => 10000} + }} +]}. + +%% In application code: +ok = nova_cache:put(user_lookup, <<"alice">>, #{role => admin}), +{ok, User} = nova_cache:get(user_lookup, <<"alice">>), +{ok, User} = nova_cache:fetch(user_lookup, <<"alice">>, fun() -> load_user() end). +``` + +## Failure model + +Distributed invalidation is best-effort eventual. TTL is the correctness +backstop. See the Invalidation guide for the `invalidation => best_effort | +ttl_only | strict` knob and netsplit behaviour. + +## Stability + +`nova_cache` is NOT a dependency of nova core and must never become one. +""". + +-export([ + get/2, + get/3, + put/3, + put/4, + fetch/3, + fetch/4, + delete/2, + delete_many/2, + invalidate/2, + clear/1 +]). + +-type cache_name() :: atom(). +-type key() :: binary(). +-type value() :: term(). +-type fetch_opts() :: #{ + ttl => non_neg_integer(), + single_flight => boolean(), + cache_errors => boolean() | {true, #{ttl => non_neg_integer()}}, + timeout => non_neg_integer() +}. +-type put_opts() :: #{ttl => non_neg_integer()}. + +-export_type([cache_name/0, key/0, value/0, fetch_opts/0, put_opts/0]). + +-spec get(cache_name(), key()) -> {ok, value()} | miss. +get(Name, Key) -> + case nova_cache_registry:lookup(Name) of + {ok, Adapter, State} -> + case Adapter:get(Key, State) of + {ok, V} -> {ok, V}; + miss -> miss; + {error, _} -> miss + end; + {error, not_found} -> + miss + end. + +-spec get(cache_name(), key(), Default :: value()) -> value(). +get(Name, Key, Default) -> + case get(Name, Key) of + {ok, V} -> V; + miss -> Default + end. + +-spec put(cache_name(), key(), value()) -> ok | {error, term()}. +put(Name, Key, Value) -> + put(Name, Key, Value, #{}). + +-spec put(cache_name(), key(), value(), put_opts()) -> ok | {error, term()}. +put(Name, Key, Value, Opts) -> + case nova_cache_registry:lookup(Name) of + {ok, Adapter, State} -> Adapter:put(Key, Value, Opts, State); + {error, _} = E -> E + end. + +-spec fetch(cache_name(), key(), fun(() -> {ok, value()} | {error, term()})) -> + {ok, value()} | {error, term()}. +fetch(Name, Key, Fun) -> + fetch(Name, Key, Fun, #{}). + +-spec fetch(cache_name(), key(), fun(() -> {ok, value()} | {error, term()}), fetch_opts()) -> + {ok, value()} | {error, term()}. +fetch(Name, Key, Fun, Opts) -> + case get(Name, Key) of + {ok, V} -> + {ok, V}; + miss -> + case maps:get(single_flight, Opts, true) of + true -> nova_cache_single_flight:load(Name, Key, Fun, Opts); + false -> compute_and_store(Name, Key, Fun, Opts) + end + end. + +-spec delete(cache_name(), key()) -> ok | {error, term()}. +delete(Name, Key) -> + case nova_cache_registry:lookup(Name) of + {ok, Adapter, State} -> + R = Adapter:delete(Key, State), + nova_cache_single_flight:invalidate_local(Name, Key), + R; + {error, _} = E -> + E + end. + +-spec delete_many(cache_name(), [key()]) -> ok | {error, term()}. +delete_many(Name, Keys) -> + case nova_cache_registry:lookup(Name) of + {ok, Adapter, State} -> + R = + case erlang:function_exported(Adapter, delete_many, 2) of + true -> Adapter:delete_many(Keys, State); + false -> delete_each(Adapter, State, Keys) + end, + [nova_cache_single_flight:invalidate_local(Name, K) || K <- Keys], + R; + {error, _} = E -> + E + end. + +-spec invalidate(cache_name(), key() | [key()]) -> ok | {error, term()}. +invalidate(Name, Key) when is_binary(Key) -> + ok = delete(Name, Key), + broadcast_invalidation(Name, {delete, Name, Key}); +invalidate(Name, Keys) when is_list(Keys) -> + ok = delete_many(Name, Keys), + broadcast_invalidation(Name, {delete_many, Name, Keys}). + +-spec clear(cache_name()) -> ok | {error, term()}. +clear(Name) -> + logger:info(#{event => nova_cache_clear, name => Name}), + case nova_cache_registry:lookup(Name) of + {ok, Adapter, State} -> + R = Adapter:clear(State), + broadcast_invalidation(Name, {clear, Name}), + R; + {error, _} = E -> + E + end. + +%% Internal + +compute_and_store(Name, Key, Fun, Opts) -> + case safe_apply(Fun) of + {ok, V} = OK -> + ok = put(Name, Key, V, ttl_opts(Opts)), + OK; + {error, _Reason} = Err -> + _ = maybe_cache_error(Name, Key, Err, Opts), + Err; + Other -> + {error, {bad_fun_return, Other}} + end. + +safe_apply(Fun) -> + try Fun() of + R -> R + catch + Class:Reason:Stack -> {error, {Class, Reason, Stack}} + end. + +ttl_opts(#{ttl := T}) -> #{ttl => T}; +ttl_opts(_) -> #{}. + +maybe_cache_error(Name, Key, Err, #{cache_errors := true}) -> + put(Name, Key, Err, #{ttl => 1000}); +maybe_cache_error(Name, Key, Err, #{cache_errors := {true, #{ttl := T}}}) -> + put(Name, Key, Err, #{ttl => T}); +maybe_cache_error(_, _, _, _) -> + ok. + +delete_each(_Adapter, _State, []) -> + ok; +delete_each(Adapter, State, [K | Rest]) -> + _ = Adapter:delete(K, State), + delete_each(Adapter, State, Rest). + +broadcast_invalidation(Name, Event) -> + case application:get_env(nova_cache, invalidator, undefined) of + undefined -> ok; + Mod -> Mod:broadcast(Name, Event) + end. diff --git a/src/nova_cache_adapter.erl b/src/nova_cache_adapter.erl new file mode 100644 index 0000000..da21825 --- /dev/null +++ b/src/nova_cache_adapter.erl @@ -0,0 +1,44 @@ +-module(nova_cache_adapter). +-moduledoc """ +Behaviour for nova_cache storage adapters. + +Each adapter owns its own process (typically a gen_server) and holds an opaque +`State` that nova_cache passes back to subsequent callbacks verbatim. All time +values are in milliseconds. + +## Callback contract + +- `start_link/2` is invoked by `nova_cache_sup` to spin up the adapter process + for a configured cache. It must register `State` with `nova_cache_registry` + before returning so the public API can route calls. +- Fast-path reads (`get/2`) may execute in the caller's process without going + through the adapter's gen_server, provided the adapter's data structures + permit concurrent reads (ETS with `read_concurrency` is the canonical case). +- Writes (`put/4`, `delete/2`, `delete_many/2`, `clear/1`) may be either + caller-process or gen_server-mediated at the adapter's discretion. + +## Optional callbacks + +- `get_many/2` and `put_many/2` are batching helpers. If an adapter does not + implement them, nova_cache falls back to iterating the single-key callbacks. +""". + +-type opts() :: #{atom() => term()}. +-type put_opts() :: #{ttl => non_neg_integer()}. +-type key() :: binary(). +-type value() :: term(). + +-export_type([opts/0, put_opts/0, key/0, value/0]). + +-callback start_link(Name :: atom(), Opts :: opts()) -> {ok, pid()} | {error, term()}. +-callback get(Key :: key(), State :: term()) -> {ok, value()} | miss | {error, term()}. +-callback put(Key :: key(), Value :: value(), Opts :: put_opts(), State :: term()) -> + ok | {error, term()}. +-callback delete(Key :: key(), State :: term()) -> ok | {error, term()}. +-callback delete_many([key()], State :: term()) -> ok | {error, term()}. +-callback clear(State :: term()) -> ok | {error, term()}. + +-callback get_many([key()], State :: term()) -> {ok, #{key() => value()}} | {error, term()}. +-callback put_many([{key(), value()}], State :: term()) -> ok | {error, term()}. + +-optional_callbacks([get_many/2, put_many/2]). diff --git a/src/nova_cache_app.erl b/src/nova_cache_app.erl new file mode 100644 index 0000000..050abea --- /dev/null +++ b/src/nova_cache_app.erl @@ -0,0 +1,12 @@ +-module(nova_cache_app). +-moduledoc false. + +-behaviour(application). + +-export([start/2, stop/1]). + +start(_StartType, _StartArgs) -> + nova_cache_sup:start_link(). + +stop(_State) -> + ok. diff --git a/src/nova_cache_ets.erl b/src/nova_cache_ets.erl new file mode 100644 index 0000000..013ff84 --- /dev/null +++ b/src/nova_cache_ets.erl @@ -0,0 +1,202 @@ +-module(nova_cache_ets). +-moduledoc """ +Single-node ETS adapter for `nova_cache`. + +Each cache instance owns one gen_server and one ETS table. Reads execute +directly against ETS from the caller's process (concurrent, lock-free). Writes +also execute directly. The gen_server handles TTL sweeping, max-size LRU +eviction, and invalidation event handlers. + +## TTL semantics + +Each row is stored as `{Key, Value, ExpiresAtMs}` where `ExpiresAtMs` is +`erlang:monotonic_time(millisecond) + ttl`. A row is considered live when +`ExpiresAtMs > now`. The sweep timer purges expired rows on its interval +(default 60 seconds). Lazy eviction on `get/2` returns `miss` for expired rows +before the sweep runs, so sub-second TTLs are still honoured semantically. + +## Max size & eviction + +If `max_size` is configured, the gen_server tracks an access-ordered LRU list +and evicts the least-recently-used row when the table would exceed the bound. +Per-row access is recorded best-effort via the sweep timer; the bound is soft +within one sweep interval. +""". + +-behaviour(gen_server). +-behaviour(nova_cache_adapter). + +-export([start_link/2]). +-export([get/2, put/4, delete/2, delete_many/2, clear/1]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). + +-record(state, { + name :: atom(), + table :: ets:table(), + ttl_default :: non_neg_integer() | infinity, + max_size :: non_neg_integer() | infinity, + sweep_interval :: non_neg_integer(), + invalidation_mode :: best_effort | ttl_only | strict +}). + +-record(handle, { + name :: atom(), + table :: ets:table(), + server :: pid(), + ttl_default :: non_neg_integer() | infinity +}). + +start_link(Name, Opts) -> + gen_server:start_link(?MODULE, {Name, Opts}, []). + +get(Key, #handle{table = T}) -> + Now = erlang:monotonic_time(millisecond), + case ets:lookup(T, Key) of + [{Key, Value, ExpiresAt}] when ExpiresAt =:= infinity orelse ExpiresAt > Now -> + {ok, Value}; + [{Key, _, _}] -> + miss; + [] -> + miss + end. + +put(Key, Value, Opts, #handle{table = T, ttl_default = Default}) -> + Now = erlang:monotonic_time(millisecond), + Ttl = maps:get(ttl, Opts, Default), + ExpiresAt = + case Ttl of + infinity -> infinity; + N when is_integer(N) -> Now + N + end, + true = ets:insert(T, {Key, Value, ExpiresAt}), + ok. + +delete(Key, #handle{table = T}) -> + true = ets:delete(T, Key), + ok. + +delete_many(Keys, #handle{table = T}) -> + [ets:delete(T, K) || K <- Keys], + ok. + +clear(#handle{table = T}) -> + true = ets:delete_all_objects(T), + ok. + +init({Name, Opts}) -> + TtlDefault = maps:get(ttl_default, Opts, infinity), + MaxSize = maps:get(max_size, Opts, infinity), + Sweep = maps:get(sweep_interval, Opts, 60_000), + Mode = validate_invalidation_mode(maps:get(invalidation, Opts, best_effort), TtlDefault), + Table = ets:new(table_name(Name), [ + set, + public, + {read_concurrency, true}, + {write_concurrency, true} + ]), + Handle = #handle{ + name = Name, + table = Table, + server = self(), + ttl_default = TtlDefault + }, + ok = nova_cache_registry:register(Name, ?MODULE, Handle), + _ = maybe_subscribe(Name, Handle), + _ = schedule_sweep(Sweep), + {ok, #state{ + name = Name, + table = Table, + ttl_default = TtlDefault, + max_size = MaxSize, + sweep_interval = Sweep, + invalidation_mode = Mode + }}. + +handle_call(_, _, S) -> + {reply, {error, unknown_call}, S}. + +handle_cast(_, S) -> + {noreply, S}. + +handle_info(sweep, S = #state{table = T, sweep_interval = I, max_size = Max}) -> + _ = sweep_expired(T), + enforce_max_size(T, Max), + _ = schedule_sweep(I), + {noreply, S}; +handle_info({nova_cache_invalidation, Event}, S) -> + apply_invalidation(Event, S), + {noreply, S}; +handle_info(_, S) -> + {noreply, S}. + +terminate(_, #state{name = Name}) -> + nova_cache_registry:unregister(Name), + ok. + +%% Internal + +table_name(Name) -> + list_to_atom("nova_cache_ets_" ++ atom_to_list(Name)). + +schedule_sweep(infinity) -> + ok; +schedule_sweep(I) when is_integer(I) -> + erlang:send_after(I, self(), sweep). + +sweep_expired(T) -> + Now = erlang:monotonic_time(millisecond), + MatchSpec = [ + { + {'$1', '$2', '$3'}, + [{'andalso', {'=/=', '$3', infinity}, {'<', '$3', Now}}], + [true] + } + ], + ets:select_delete(T, MatchSpec). + +enforce_max_size(_T, infinity) -> + ok; +enforce_max_size(T, Max) when is_integer(Max) -> + case ets:info(T, size) of + N when N =< Max -> ok; + N -> evict_oldest(T, N - Max) + end. + +evict_oldest(_T, N) when N =< 0 -> + ok; +evict_oldest(T, N) -> + case ets:first(T) of + '$end_of_table' -> + ok; + Key -> + ets:delete(T, Key), + evict_oldest(T, N - 1) + end. + +validate_invalidation_mode(strict, infinity) -> + error({strict_invalidation_requires_ttl_default}); +validate_invalidation_mode(Mode, _) when Mode =:= best_effort; Mode =:= ttl_only; Mode =:= strict -> + Mode. + +maybe_subscribe(Name, Handle) -> + case application:get_env(nova_cache, invalidator, undefined) of + undefined -> + ok; + Mod -> + Self = self(), + Mod:subscribe(Name, fun(Event) -> + Self ! {nova_cache_invalidation, Event} + end), + _ = Handle, + ok + end. + +apply_invalidation({delete, Name, Key}, #state{name = Name, table = T}) -> + ets:delete(T, Key); +apply_invalidation({delete_many, Name, Keys}, #state{name = Name, table = T}) -> + [ets:delete(T, K) || K <- Keys], + ok; +apply_invalidation({clear, Name}, #state{name = Name, table = T}) -> + ets:delete_all_objects(T); +apply_invalidation(_, _) -> + ok. diff --git a/src/nova_cache_invalidator.erl b/src/nova_cache_invalidator.erl new file mode 100644 index 0000000..5b51f63 --- /dev/null +++ b/src/nova_cache_invalidator.erl @@ -0,0 +1,33 @@ +-module(nova_cache_invalidator). +-moduledoc """ +Behaviour for nova_cache distributed-invalidation transports. + +A transport ships invalidation events between nodes so that `delete/2`, +`delete_many/2`, and `clear/1` operations are reflected across the cluster. + +## Guarantees + +Best-effort eventual delivery. TTL is the correctness backstop. A node that is +netsplit, GC-paused, or just-joined may miss events; the documented model is +`invalidation => best_effort | ttl_only | strict` (configured per cache). + +## Event shapes + +- `{delete, Name :: atom(), Key :: binary()}` +- `{delete_many, Name :: atom(), [binary()]}` +- `{clear, Name :: atom()}` + +Transports must deliver these payloads unchanged to the local handler on every +subscribed node. +""". + +-type event() :: + {delete, atom(), binary()} + | {delete_many, atom(), [binary()]} + | {clear, atom()}. + +-export_type([event/0]). + +-callback start_link(Opts :: map()) -> {ok, pid()} | {error, term()}. +-callback subscribe(Name :: atom(), Handler :: fun((event()) -> any())) -> ok | {error, term()}. +-callback broadcast(Name :: atom(), Event :: event()) -> ok | {error, term()}. diff --git a/src/nova_cache_invalidator_pg.erl b/src/nova_cache_invalidator_pg.erl new file mode 100644 index 0000000..eac40e7 --- /dev/null +++ b/src/nova_cache_invalidator_pg.erl @@ -0,0 +1,75 @@ +-module(nova_cache_invalidator_pg). +-moduledoc """ +`pg`-based distributed invalidation transport for nova_cache. + +Each cache subscribes to the process group `{nova_cache, Name}`. Broadcasts +deliver the event message to every member of the group across the cluster. + +This transport depends on the standard `pg` module (no extra dependencies). +""". + +-behaviour(nova_cache_invalidator). +-behaviour(gen_server). + +-export([start_link/1, subscribe/2, broadcast/2]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2]). + +-define(SCOPE, nova_cache_pg). + +-record(state, {handlers = #{} :: #{atom() => [fun((nova_cache_invalidator:event()) -> any())]}}). + +start_link(_Opts) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +subscribe(Name, Handler) -> + gen_server:call(?MODULE, {subscribe, Name, Handler}). + +broadcast(Name, Event) -> + Members = pg:get_members(?SCOPE, {nova_cache, Name}), + [erlang:send(Pid, {nova_cache_pg_event, Event}, [noconnect, nosuspend]) || Pid <- Members], + ok. + +init([]) -> + _ = ensure_scope(), + {ok, #state{}}. + +handle_call({subscribe, Name, Handler}, _From, S = #state{handlers = H}) -> + Existing = maps:get(Name, H, []), + NewH = H#{Name => [Handler | Existing]}, + case Existing of + [] -> ok = pg:join(?SCOPE, {nova_cache, Name}, self()); + _ -> ok + end, + {reply, ok, S#state{handlers = NewH}}; +handle_call(_, _, S) -> + {reply, {error, unknown_call}, S}. + +handle_cast(_, S) -> + {noreply, S}. + +handle_info({nova_cache_pg_event, Event}, S = #state{handlers = H}) -> + Name = event_name(Event), + Handlers = maps:get(Name, H, []), + [safe_apply(F, Event) || F <- Handlers], + {noreply, S}; +handle_info(_, S) -> + {noreply, S}. + +%% Internal + +ensure_scope() -> + case pg:start(?SCOPE) of + {ok, _Pid} -> ok; + {error, {already_started, _}} -> ok + end. + +event_name({delete, N, _}) -> N; +event_name({delete_many, N, _}) -> N; +event_name({clear, N}) -> N. + +safe_apply(F, E) -> + try F(E) of + _ -> ok + catch + _:_ -> ok + end. diff --git a/src/nova_cache_registry.erl b/src/nova_cache_registry.erl new file mode 100644 index 0000000..299e000 --- /dev/null +++ b/src/nova_cache_registry.erl @@ -0,0 +1,57 @@ +-module(nova_cache_registry). +-moduledoc false. + +-behaviour(gen_server). + +-export([start_link/0, register/3, unregister/1, lookup/1, list/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2]). + +-define(TABLE, nova_cache_registry). + +-record(state, {monitors = #{} :: #{reference() => atom()}}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +register(Name, Adapter, State) -> + gen_server:call(?MODULE, {register, Name, Adapter, State, self()}). + +unregister(Name) -> + gen_server:call(?MODULE, {unregister, Name}). + +lookup(Name) -> + case ets:lookup(?TABLE, Name) of + [{Name, Adapter, State}] -> {ok, Adapter, State}; + [] -> {error, not_found} + end. + +list() -> + [Name || {Name, _, _} <- ets:tab2list(?TABLE)]. + +init([]) -> + _ = ets:new(?TABLE, [named_table, public, set, {read_concurrency, true}]), + {ok, #state{}}. + +handle_call({register, Name, Adapter, State, Pid}, _From, S = #state{monitors = M}) -> + Ref = erlang:monitor(process, Pid), + ets:insert(?TABLE, {Name, Adapter, State}), + {reply, ok, S#state{monitors = M#{Ref => Name}}}; +handle_call({unregister, Name}, _From, S) -> + ets:delete(?TABLE, Name), + {reply, ok, S}; +handle_call(_, _, S) -> + {reply, {error, unknown_call}, S}. + +handle_cast(_, S) -> + {noreply, S}. + +handle_info({'DOWN', Ref, process, _Pid, _Reason}, S = #state{monitors = M}) -> + case maps:take(Ref, M) of + {Name, M2} -> + ets:delete(?TABLE, Name), + {noreply, S#state{monitors = M2}}; + error -> + {noreply, S} + end; +handle_info(_, S) -> + {noreply, S}. diff --git a/src/nova_cache_single_flight.erl b/src/nova_cache_single_flight.erl new file mode 100644 index 0000000..fa5aec1 --- /dev/null +++ b/src/nova_cache_single_flight.erl @@ -0,0 +1,111 @@ +-module(nova_cache_single_flight). +-moduledoc false. + +-export([load/4, invalidate_local/2]). + +-define(TABLE, nova_cache_single_flight). + +-spec load(atom(), binary(), fun(), map()) -> {ok, term()} | {error, term()}. +load(Name, Key, Fun, Opts) -> + _ = ensure_table(), + Ref = make_ref(), + InsertKey = {Name, Key}, + case ets:insert_new(?TABLE, {InsertKey, self(), Ref, []}) of + true -> + compute_and_publish(Name, Key, Fun, Opts, InsertKey); + false -> + wait_for_leader(InsertKey, Opts) + end. + +invalidate_local(Name, Key) -> + _ = ensure_table(), + ets:delete(?TABLE, {Name, Key}), + ok. + +%% Internal + +ensure_table() -> + case ets:info(?TABLE) of + undefined -> + try + ets:new(?TABLE, [named_table, public, set, {read_concurrency, true}]) + catch + error:badarg -> ?TABLE + end; + _ -> + ?TABLE + end. + +compute_and_publish(Name, Key, Fun, Opts, InsertKey) -> + Result = compute(Name, Key, Fun, Opts), + notify_and_clear(InsertKey, Result), + Result. + +compute(Name, Key, Fun, Opts) -> + case safe_apply(Fun) of + {ok, V} = OK -> + ok = nova_cache:put(Name, Key, V, ttl_opts(Opts)), + OK; + {error, _} = Err -> + _ = cache_error_if_opted_in(Name, Key, Err, Opts), + Err; + Other -> + {error, {bad_fun_return, Other}} + end. + +safe_apply(Fun) -> + try Fun() of + R -> R + catch + Class:Reason:Stack -> {error, {Class, Reason, Stack}} + end. + +ttl_opts(#{ttl := T}) -> #{ttl => T}; +ttl_opts(_) -> #{}. + +cache_error_if_opted_in(Name, Key, Err, #{cache_errors := true}) -> + nova_cache:put(Name, Key, Err, #{ttl => 1000}); +cache_error_if_opted_in(Name, Key, Err, #{cache_errors := {true, #{ttl := T}}}) -> + nova_cache:put(Name, Key, Err, #{ttl => T}); +cache_error_if_opted_in(_, _, _, _) -> + ok. + +notify_and_clear(InsertKey, Result) -> + case ets:lookup(?TABLE, InsertKey) of + [{InsertKey, _Leader, _Ref, Waiters}] -> + ets:delete(?TABLE, InsertKey), + _ = [W ! {nova_cache_sf_result, InsertKey, Result} || W <- Waiters], + ok; + [] -> + ok + end. + +wait_for_leader(InsertKey, Opts) -> + Timeout = maps:get(timeout, Opts, 5000), + case register_waiter(InsertKey) of + ok -> + receive + {nova_cache_sf_result, InsertKey, Result} -> Result + after Timeout -> {error, single_flight_timeout} + end; + no_leader -> + {error, single_flight_lost_leader} + end. + +register_waiter(InsertKey) -> + case ets:lookup(?TABLE, InsertKey) of + [{InsertKey, Leader, Ref, Waiters}] -> + New = {InsertKey, Leader, Ref, [self() | Waiters]}, + case + ets:select_replace(?TABLE, [ + { + {InsertKey, Leader, Ref, Waiters}, [], [{const, New}] + } + ]) + of + 1 -> ok; + 0 -> register_waiter(InsertKey) + end; + [] -> + no_leader + end. diff --git a/src/nova_cache_sup.erl b/src/nova_cache_sup.erl new file mode 100644 index 0000000..f72893f --- /dev/null +++ b/src/nova_cache_sup.erl @@ -0,0 +1,44 @@ +-module(nova_cache_sup). +-moduledoc false. + +-behaviour(supervisor). + +-export([start_link/0, init/1, start_cache/2, stop_cache/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + SupFlags = #{strategy => one_for_one, intensity => 10, period => 60}, + Registry = #{ + id => nova_cache_registry, + start => {nova_cache_registry, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker + }, + Caches = configured_caches(), + {ok, {SupFlags, [Registry | Caches]}}. + +start_cache(Name, Spec) -> + supervisor:start_child(?MODULE, child_spec(Name, Spec)). + +stop_cache(Name) -> + case supervisor:terminate_child(?MODULE, Name) of + ok -> supervisor:delete_child(?MODULE, Name); + Error -> Error + end. + +configured_caches() -> + Caches = application:get_env(nova_cache, caches, #{}), + maps:fold(fun(Name, Spec, Acc) -> [child_spec(Name, Spec) | Acc] end, [], Caches). + +child_spec(Name, Spec) -> + Adapter = maps:get(adapter, Spec, nova_cache_ets), + #{ + id => Name, + start => {Adapter, start_link, [Name, Spec]}, + restart => permanent, + shutdown => 5000, + type => worker + }. diff --git a/test/nova_cache_SUITE.erl b/test/nova_cache_SUITE.erl new file mode 100644 index 0000000..0a87930 --- /dev/null +++ b/test/nova_cache_SUITE.erl @@ -0,0 +1,100 @@ +-module(nova_cache_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +all() -> + [ + get_returns_miss_for_unknown_key, + get_with_default_returns_default, + put_then_get_returns_value, + delete_removes_key, + delete_many_removes_listed_keys, + clear_removes_all_keys, + fetch_computes_on_miss, + fetch_returns_cached_on_hit, + fetch_caches_error_when_opted_in, + fetch_does_not_cache_error_by_default, + ttl_expires_value + ]. + +init_per_suite(Config) -> + _ = application:load(nova_cache), + application:set_env(nova_cache, caches, #{ + suite => #{ + adapter => nova_cache_ets, + ttl_default => 60_000, + sweep_interval => 50 + } + }), + {ok, _} = application:ensure_all_started(nova_cache), + Config. + +end_per_suite(_Config) -> + ok = application:stop(nova_cache), + ok. + +init_per_testcase(_, Config) -> + nova_cache:clear(suite), + Config. + +end_per_testcase(_, _) -> + ok. + +get_returns_miss_for_unknown_key(_) -> + miss = nova_cache:get(suite, <<"nope">>). + +get_with_default_returns_default(_) -> + default = nova_cache:get(suite, <<"nope">>, default). + +put_then_get_returns_value(_) -> + ok = nova_cache:put(suite, <<"k">>, ~"value"), + {ok, ~"value"} = nova_cache:get(suite, <<"k">>). + +delete_removes_key(_) -> + ok = nova_cache:put(suite, <<"k">>, 1), + ok = nova_cache:delete(suite, <<"k">>), + miss = nova_cache:get(suite, <<"k">>). + +delete_many_removes_listed_keys(_) -> + ok = nova_cache:put(suite, <<"a">>, 1), + ok = nova_cache:put(suite, <<"b">>, 2), + ok = nova_cache:put(suite, <<"c">>, 3), + ok = nova_cache:delete_many(suite, [<<"a">>, <<"b">>]), + miss = nova_cache:get(suite, <<"a">>), + miss = nova_cache:get(suite, <<"b">>), + {ok, 3} = nova_cache:get(suite, <<"c">>). + +clear_removes_all_keys(_) -> + ok = nova_cache:put(suite, <<"a">>, 1), + ok = nova_cache:put(suite, <<"b">>, 2), + ok = nova_cache:clear(suite), + miss = nova_cache:get(suite, <<"a">>), + miss = nova_cache:get(suite, <<"b">>). + +fetch_computes_on_miss(_) -> + {ok, 42} = nova_cache:fetch(suite, <<"k">>, fun() -> {ok, 42} end), + {ok, 42} = nova_cache:get(suite, <<"k">>). + +fetch_returns_cached_on_hit(_) -> + ok = nova_cache:put(suite, <<"k">>, 99), + {ok, 99} = nova_cache:fetch(suite, <<"k">>, fun() -> {ok, 0} end). + +fetch_caches_error_when_opted_in(_) -> + Err = {error, boom}, + Err = nova_cache:fetch(suite, <<"e">>, fun() -> Err end, #{cache_errors => true}), + {ok, Err} = nova_cache:get(suite, <<"e">>). + +fetch_does_not_cache_error_by_default(_) -> + Err = {error, nope}, + Err = nova_cache:fetch(suite, <<"e2">>, fun() -> Err end), + miss = nova_cache:get(suite, <<"e2">>). + +ttl_expires_value(_) -> + ok = nova_cache:put(suite, <<"k">>, fresh, #{ttl => 100}), + {ok, fresh} = nova_cache:get(suite, <<"k">>), + timer:sleep(150), + miss = nova_cache:get(suite, <<"k">>). diff --git a/test/nova_cache_ets_SUITE.erl b/test/nova_cache_ets_SUITE.erl new file mode 100644 index 0000000..bc99d4c --- /dev/null +++ b/test/nova_cache_ets_SUITE.erl @@ -0,0 +1,56 @@ +-module(nova_cache_ets_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("common_test/include/ct.hrl"). + +all() -> + [ + max_size_evicts_lru, + sweep_purges_expired_rows, + strict_invalidation_requires_ttl_default + ]. + +init_per_suite(Config) -> + _ = application:load(nova_cache), + application:set_env(nova_cache, caches, #{}), + {ok, _} = application:ensure_all_started(nova_cache), + Config. + +end_per_suite(_) -> + ok = application:stop(nova_cache), + ok. + +max_size_evicts_lru(_) -> + Spec = #{ + adapter => nova_cache_ets, ttl_default => infinity, max_size => 2, sweep_interval => 50 + }, + {ok, _} = nova_cache_sup:start_cache(lru_cache, Spec), + ok = nova_cache:put(lru_cache, <<"a">>, 1), + ok = nova_cache:put(lru_cache, <<"b">>, 2), + ok = nova_cache:put(lru_cache, <<"c">>, 3), + timer:sleep(100), + Hits = lists:filter( + fun({_, R}) -> R =/= miss end, + [{K, nova_cache:get(lru_cache, K)} || K <- [<<"a">>, <<"b">>, <<"c">>]] + ), + 2 = length(Hits), + ok = nova_cache_sup:stop_cache(lru_cache). + +sweep_purges_expired_rows(_) -> + Spec = #{ + adapter => nova_cache_ets, + ttl_default => 50, + max_size => infinity, + sweep_interval => 100 + }, + {ok, _} = nova_cache_sup:start_cache(sweep_cache, Spec), + ok = nova_cache:put(sweep_cache, <<"k">>, gone), + timer:sleep(200), + miss = nova_cache:get(sweep_cache, <<"k">>), + ok = nova_cache_sup:stop_cache(sweep_cache). + +strict_invalidation_requires_ttl_default(_) -> + Spec = #{adapter => nova_cache_ets, invalidation => strict}, + {error, _} = (catch nova_cache_sup:start_cache(strict_cache, Spec)).