diff --git a/docs/reference/feature-servers/python-feature-server.md b/docs/reference/feature-servers/python-feature-server.md index f8e121ad6a..5045a81fd6 100644 --- a/docs/reference/feature-servers/python-feature-server.md +++ b/docs/reference/feature-servers/python-feature-server.md @@ -199,6 +199,26 @@ requests.post( "http://localhost:6566/push", data=json.dumps(push_data)) ``` +#### Offline write batching for `/push` + +The Python feature server supports configurable batching for the **offline** +portion of writes executed via the `/push` endpoint. + +Only the offline part of a push is affected: + +- `to: "offline"` → **fully batched** +- `to: "online_and_offline"` → **online written immediately**, **offline batched** +- `to: "online"` → unaffected, always immediate + +Enable batching in your `feature_store.yaml`: + +```yaml +feature_server: + type: local + offline_push_batching_enabled: true + offline_push_batching_batch_size: 1000 + offline_push_batching_batch_interval_seconds: 10 +``` ### Materializing features diff --git a/docs/reference/feature-store-yaml.md b/docs/reference/feature-store-yaml.md index 01f586c047..820731064f 100644 --- a/docs/reference/feature-store-yaml.md +++ b/docs/reference/feature-store-yaml.md @@ -25,6 +25,22 @@ online_store: * **project\_id** — Optional parameter for the datastore online store. Sets the GCP project id used by Feast, if not set Feast will use the default GCP project id in the local environment. * **project** — Defines a namespace for the entire feature store. Can be used to isolate multiple deployments in a single installation of Feast. +### feature_server + +The `feature_server` block configures the Python Feature Server when it is used +to serve online features and handle `/push` requests. This section is optional +and only applies when running the Python feature server. + +An example configuration: + +```yaml +feature_server: + type: local + offline_push_batching_enabled: true # Enables batching of offline writes processed by /push. Online writes are unaffected. + offline_push_batching_batch_size: 100 # Maximum number of buffered rows before writing to the offline store. + offline_push_batching_batch_interval_seconds: 5 # Maximum time rows may remain buffered before a forced flush. +``` + ## Providers The `provider` field defines the environment in which Feast will execute data flows. As a result, it also determines the default values for other fields. diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index e3ec16496c..b5d98fa9a6 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -1,13 +1,29 @@ +# Copyright 2025 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import asyncio import os import sys import threading import time import traceback +from collections import defaultdict from contextlib import asynccontextmanager from datetime import datetime from importlib import resources as importlib_resources -from typing import Any, Dict, List, Optional, Union +from types import SimpleNamespace +from typing import Any, DefaultDict, Dict, List, NamedTuple, Optional, Union import pandas as pd import psutil @@ -195,6 +211,41 @@ def get_app( registry_proto = None shutting_down = False active_timer: Optional[threading.Timer] = None + # --- Offline write batching config and batcher --- + fs_cfg = getattr(store.config, "feature_server", None) + batching_cfg = None + if fs_cfg is not None: + enabled = getattr(fs_cfg, "offline_push_batching_enabled", False) + batch_size = getattr(fs_cfg, "offline_push_batching_batch_size", None) + batch_interval_seconds = getattr( + fs_cfg, "offline_push_batching_batch_interval_seconds", None + ) + + if enabled is True: + size_ok = isinstance(batch_size, int) and not isinstance(batch_size, bool) + interval_ok = isinstance(batch_interval_seconds, int) and not isinstance( + batch_interval_seconds, bool + ) + if size_ok and interval_ok: + batching_cfg = SimpleNamespace( + enabled=True, + batch_size=batch_size, + batch_interval_seconds=batch_interval_seconds, + ) + else: + logger.warning( + "Offline write batching enabled but missing or invalid numeric values; " + "disabling batching (batch_size=%r, batch_interval_seconds=%r)", + batch_size, + batch_interval_seconds, + ) + + offline_batcher: Optional[OfflineWriteBatcher] = None + if batching_cfg is not None and batching_cfg.enabled is True: + offline_batcher = OfflineWriteBatcher(store=store, cfg=batching_cfg) + logger.debug("Offline write batching is ENABLED") + else: + logger.debug("Offline write batching is DISABLED") def stop_refresh(): nonlocal shutting_down @@ -219,9 +270,13 @@ def async_refresh(): async def lifespan(app: FastAPI): await store.initialize() async_refresh() - yield - stop_refresh() - await store.close() + try: + yield + finally: + stop_refresh() + if offline_batcher is not None: + offline_batcher.shutdown() + await store.close() app = FastAPI(lifespan=lifespan) @@ -326,22 +381,58 @@ async def push(request: PushFeaturesRequest) -> None: for feature_view in fvs_with_push_sources: assert_permissions(resource=feature_view, actions=actions) - push_params = dict( - push_source_name=request.push_source_name, - df=df, - allow_registry_cache=request.allow_registry_cache, - to=to, - transform_on_write=request.transform_on_write, - ) + async def _push_with_to(push_to: PushMode) -> None: + """ + Helper for performing a single push operation. + + NOTE: + - Feast providers **do not currently support async offline writes**. + - Therefore: + * ONLINE and ONLINE_AND_OFFLINE → may be async, depending on provider.async_supported.online.write + * OFFLINE → always synchronous, but executed via run_in_threadpool when called from HTTP handlers. + - The OfflineWriteBatcher handles offline writes directly in its own background thread, but the offline store writes are currently synchronous only. + """ + push_source_name = request.push_source_name + allow_registry_cache = request.allow_registry_cache + transform_on_write = request.transform_on_write + + # Async currently only applies to online store writes (ONLINE / ONLINE_AND_OFFLINE paths) as theres no async for offline store + if push_to in (PushMode.ONLINE, PushMode.ONLINE_AND_OFFLINE) and ( + store._get_provider().async_supported.online.write + ): + await store.push_async( + push_source_name=push_source_name, + df=df, + allow_registry_cache=allow_registry_cache, + to=push_to, + transform_on_write=transform_on_write, + ) + else: + await run_in_threadpool( + lambda: store.push( + push_source_name=push_source_name, + df=df, + allow_registry_cache=allow_registry_cache, + to=push_to, + transform_on_write=transform_on_write, + ) + ) - should_push_async = ( - store._get_provider().async_supported.online.write - and to in [PushMode.ONLINE, PushMode.ONLINE_AND_OFFLINE] - ) - if should_push_async: - await store.push_async(**push_params) + needs_online = to in (PushMode.ONLINE, PushMode.ONLINE_AND_OFFLINE) + needs_offline = to in (PushMode.OFFLINE, PushMode.ONLINE_AND_OFFLINE) + + if offline_batcher is None or not needs_offline: + await _push_with_to(to) else: - store.push(**push_params) + if needs_online: + await _push_with_to(PushMode.ONLINE) + + offline_batcher.enqueue( + push_source_name=request.push_source_name, + df=df, + allow_registry_cache=request.allow_registry_cache, + transform_on_write=request.transform_on_write, + ) async def _get_feast_object( feature_view_name: str, allow_registry_cache: bool @@ -683,3 +774,170 @@ def start_server( ) else: uvicorn.run(app, host=host, port=port, access_log=(not no_access_log)) + + +class _OfflineBatchKey(NamedTuple): + push_source_name: str + allow_registry_cache: bool + transform_on_write: bool + + +class OfflineWriteBatcher: + """ + In-process offline write batcher for /push requests. + + - Buffers DataFrames per (push_source_name, allow_registry_cache, transform_on_write) + - Flushes when either: + * total rows in a buffer >= batch_size, or + * time since last flush >= batch_interval_seconds + - Flush runs in a dedicated background thread so the HTTP event loop stays unblocked. + """ + + def __init__(self, store: "feast.FeatureStore", cfg: Any): + self._store = store + self._cfg = cfg + + # Buffers and timestamps keyed by batch key + self._buffers: DefaultDict[_OfflineBatchKey, List[pd.DataFrame]] = defaultdict( + list + ) + self._last_flush: DefaultDict[_OfflineBatchKey, float] = defaultdict(time.time) + + self._lock = threading.Lock() + self._stop_event = threading.Event() + + # Start background flusher thread + self._thread = threading.Thread( + target=self._run, name="offline_write_batcher", daemon=True + ) + self._thread.start() + + logger.debug( + "OfflineWriteBatcher initialized: batch_size=%s, batch_interval_seconds=%s", + getattr(cfg, "batch_size", None), + getattr(cfg, "batch_interval_seconds", None), + ) + + # ---------- Public API ---------- + + def enqueue( + self, + push_source_name: str, + df: pd.DataFrame, + allow_registry_cache: bool, + transform_on_write: bool, + ) -> None: + """ + Enqueue a dataframe for offline write, grouped by push source + flags. + Cheap and non-blocking; heavy I/O happens in background thread. + """ + key = _OfflineBatchKey( + push_source_name=push_source_name, + allow_registry_cache=allow_registry_cache, + transform_on_write=transform_on_write, + ) + + with self._lock: + self._buffers[key].append(df) + total_rows = sum(len(d) for d in self._buffers[key]) + + # Size-based flush + if total_rows >= self._cfg.batch_size: + logger.debug( + "OfflineWriteBatcher size threshold reached for %s: %s rows", + key, + total_rows, + ) + self._flush_locked(key) + + def flush_all(self) -> None: + """ + Flush all buffers synchronously. Intended for graceful shutdown. + """ + with self._lock: + keys = list(self._buffers.keys()) + for key in keys: + self._flush_locked(key) + + def shutdown(self, timeout: float = 5.0) -> None: + """ + Stop the background thread and perform a best-effort flush. + """ + logger.debug("Shutting down OfflineWriteBatcher") + self._stop_event.set() + try: + self._thread.join(timeout=timeout) + except Exception: + logger.exception("Error joining OfflineWriteBatcher thread") + + # Best-effort final flush + try: + self.flush_all() + except Exception: + logger.exception("Error during final OfflineWriteBatcher flush") + + # ---------- Internal helpers ---------- + + def _run(self) -> None: + """ + Background loop: periodically checks for buffers that should be flushed + based on time since last flush. + """ + interval = max(1, int(getattr(self._cfg, "batch_interval_seconds", 30))) + logger.debug( + "OfflineWriteBatcher background loop started with check interval=%s", + interval, + ) + + while not self._stop_event.wait(timeout=interval): + now = time.time() + try: + with self._lock: + for key, dfs in list(self._buffers.items()): + if not dfs: + continue + last = self._last_flush[ + key + ] # this will also init the default timestamp + age = now - last + if age >= self._cfg.batch_interval_seconds: + logger.debug( + "OfflineWriteBatcher time threshold reached for %s: age=%s", + key, + age, + ) + self._flush_locked(key) + except Exception: + logger.exception("Error in OfflineWriteBatcher background loop") + + logger.debug("OfflineWriteBatcher background loop exiting") + + def _flush_locked(self, key: _OfflineBatchKey) -> None: + """ + Flush a single buffer; caller must hold self._lock. + """ + dfs = self._buffers.get(key) + if not dfs: + return + + batch_df = pd.concat(dfs, ignore_index=True) + self._buffers[key].clear() + self._last_flush[key] = time.time() + + logger.debug( + "Flushing offline batch for push_source=%s with %s rows", + key.push_source_name, + len(batch_df), + ) + + # NOTE: offline writes are currently synchronous only, so we call directly + try: + self._store.push( + push_source_name=key.push_source_name, + df=batch_df, + allow_registry_cache=key.allow_registry_cache, + to=PushMode.OFFLINE, + transform_on_write=key.transform_on_write, + ) + except Exception: + logger.exception("Error flushing offline batch for %s", key) diff --git a/sdk/python/feast/infra/feature_servers/base_config.py b/sdk/python/feast/infra/feature_servers/base_config.py index 1a348032e1..b13e23d035 100644 --- a/sdk/python/feast/infra/feature_servers/base_config.py +++ b/sdk/python/feast/infra/feature_servers/base_config.py @@ -1,3 +1,16 @@ +# Copyright 2025 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from typing import Optional from pydantic import StrictBool, StrictInt @@ -32,3 +45,12 @@ class BaseFeatureServerConfig(FeastConfigBaseModel): feature_logging: Optional[FeatureLoggingConfig] = None """ Feature logging configuration """ + + offline_push_batching_enabled: Optional[StrictBool] = None + """Whether to batch writes to the offline store via the `/push` endpoint.""" + + offline_push_batching_batch_size: Optional[StrictInt] = None + """The maximum batch size for offline writes via `/push`.""" + + offline_push_batching_batch_interval_seconds: Optional[StrictInt] = None + """The batch interval between offline writes via `/push`.""" diff --git a/sdk/python/tests/unit/test_feature_server.py b/sdk/python/tests/unit/test_feature_server.py index 21c01d6176..1999be9447 100644 --- a/sdk/python/tests/unit/test_feature_server.py +++ b/sdk/python/tests/unit/test_feature_server.py @@ -1,4 +1,20 @@ +# Copyright 2025 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import json +import time +from collections import Counter +from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock import pytest @@ -207,3 +223,241 @@ def test_materialize_request_model(): assert req2.disable_event_timestamp is False assert req2.start_ts == "2021-01-01T00:00:00" assert req2.end_ts == "2021-01-02T00:00:00" + + +def _enable_offline_batching_config( + fs, enabled: bool = True, batch_size: int = 1, batch_interval_seconds: int = 60 +): + """ + Attach a minimal feature_server.offline_push_batching config + to a mocked FeatureStore. + """ + if not hasattr(fs, "config") or fs.config is None: + fs.config = SimpleNamespace() + + if not hasattr(fs.config, "feature_server") or fs.config.feature_server is None: + fs.config.feature_server = SimpleNamespace() + + fs.config.feature_server.offline_push_batching_enabled = enabled + fs.config.feature_server.offline_push_batching_batch_size = batch_size + fs.config.feature_server.offline_push_batching_batch_interval_seconds = ( + batch_interval_seconds + ) + + +def push_body_many(push_mode=PushMode.ONLINE, count: int = 2, id_start: int = 100): + """Build a push body with multiple entities.""" + driver_ids = list(range(id_start, id_start + count)) + lats = [float(i) for i in driver_ids] + longs = [str(lat) for lat in lats] + event_ts = [str(_utc_now()) for _ in range(count)] + created_ts = [str(_utc_now()) for _ in range(count)] + + return { + "push_source_name": "driver_locations_push", + "df": { + "driver_lat": lats, + "driver_long": longs, + "driver_id": driver_ids, + "event_timestamp": event_ts, + "created_timestamp": created_ts, + }, + "to": push_mode.name.lower(), + } + + +@pytest.mark.parametrize("online_write", [True, False]) +@pytest.mark.parametrize("batching_enabled", [True, False]) +@pytest.mark.parametrize( + "push_mode", + [PushMode.ONLINE, PushMode.OFFLINE, PushMode.ONLINE_AND_OFFLINE], +) +def test_push_batched_matrix( + online_write, batching_enabled, push_mode, mock_fs_factory +): + """ + Matrix over: + - online_write ∈ {True, False} + - batching_enabled ∈ {True, False} + - push_mode ∈ {ONLINE, OFFLINE, ONLINE_AND_OFFLINE} + + Asserts: + - which of fs.push / fs.push_async are called + - how many times + - with which `to` values + + For batching_enabled=True, batch_size=1 ensures immediate flush of offline part. + """ + fs = mock_fs_factory(online_write=online_write) + + _enable_offline_batching_config( + fs, + enabled=batching_enabled, + batch_size=1, # flush immediately on a single offline request + batch_interval_seconds=60, + ) + + client = TestClient(get_app(fs)) + + # use a multi-row payload to ensure we test non-trivial dfs + resp = client.post("/push", json=push_body_many(push_mode, count=2, id_start=100)) + assert resp.status_code == 200 + + # Collect calls + sync_calls = fs.push.call_args_list + async_calls = fs.push_async.await_args_list + sync_tos = [c.kwargs.get("to") for c in sync_calls] + async_tos = [c.kwargs.get("to") for c in async_calls] + + # ------------------------------- + # Build expectations + # ------------------------------- + expected_sync_calls = 0 + expected_async_calls = 0 + expected_sync_tos = [] + expected_async_tos = [] + + if push_mode == PushMode.ONLINE: + # Only online path, batching irrelevant + if online_write: + expected_async_calls = 1 + expected_async_tos = [PushMode.ONLINE] + else: + expected_sync_calls = 1 + expected_sync_tos = [PushMode.ONLINE] + + elif push_mode == PushMode.OFFLINE: + # Only offline path, never async + if batching_enabled: + # via batcher, but externally still one push(to=OFFLINE) + expected_sync_calls = 1 + expected_sync_tos = [PushMode.OFFLINE] + else: + # direct push(to=OFFLINE) + expected_sync_calls = 1 + expected_sync_tos = [PushMode.OFFLINE] + + elif push_mode == PushMode.ONLINE_AND_OFFLINE: + if not batching_enabled: + # Old behaviour: single call with to=ONLINE_AND_OFFLINE + if online_write: + expected_async_calls = 1 + expected_async_tos = [PushMode.ONLINE_AND_OFFLINE] + else: + expected_sync_calls = 1 + expected_sync_tos = [PushMode.ONLINE_AND_OFFLINE] + else: + # Batching enabled: ONLINE part and OFFLINE part are split + if online_write: + # async ONLINE + sync OFFLINE (via batcher) + expected_async_calls = 1 + expected_async_tos = [PushMode.ONLINE] + expected_sync_calls = 1 + expected_sync_tos = [PushMode.OFFLINE] + else: + # both ONLINE and OFFLINE via sync push + expected_sync_calls = 2 + expected_sync_tos = [PushMode.ONLINE, PushMode.OFFLINE] + + # ------------------------------- + # Assert counts + # ------------------------------- + assert fs.push.call_count == expected_sync_calls + assert fs.push_async.await_count == expected_async_calls + + # Allow ordering differences by comparing as multisets + assert Counter(sync_tos) == Counter(expected_sync_tos) + assert Counter(async_tos) == Counter(expected_async_tos) + + +def test_offline_batches_are_separated_by_flags(mock_fs_factory): + """ + Offline batches must be separated by (allow_registry_cache, transform_on_write). + + If we send three offline pushes with the same push_source_name but different + combinations of allow_registry_cache / transform_on_write, they must result + in three separate fs.push(...) calls, not one merged batch. + """ + fs = mock_fs_factory(online_write=True) + # Large batch_size so we rely on interval-based flush, not size-based. + _enable_offline_batching_config( + fs, enabled=True, batch_size=100, batch_interval_seconds=1 + ) + + client = TestClient(get_app(fs)) + + # Base body: allow_registry_cache=True, transform_on_write=True (default) + body_base = push_body_many(PushMode.OFFLINE, count=2, id_start=100) + + # 1) Default flags: allow_registry_cache=True, transform_on_write=True + resp1 = client.post("/push", json=body_base) + assert resp1.status_code == 200 + + # 2) Different allow_registry_cache + body_allow_false = dict(body_base) + body_allow_false["allow_registry_cache"] = False + resp2 = client.post("/push", json=body_allow_false) + assert resp2.status_code == 200 + + # 3) Different transform_on_write + body_transform_false = dict(body_base) + body_transform_false["transform_on_write"] = False + resp3 = client.post("/push", json=body_transform_false) + assert resp3.status_code == 200 + + # Immediately after: no flush expected yet (interval-based) + assert fs.push.call_count == 0 + + # Wait up to ~3 seconds for interval-based flush + deadline = time.time() + 3.0 + while time.time() < deadline and fs.push.call_count < 3: + time.sleep(0.1) + + # We expect exactly 3 separate pushes, each with 2 rows and to=OFFLINE + assert fs.push.call_count == 3 + + lengths = [c.kwargs["df"].shape[0] for c in fs.push.call_args_list] + tos = [c.kwargs["to"] for c in fs.push.call_args_list] + allow_flags = [c.kwargs["allow_registry_cache"] for c in fs.push.call_args_list] + transform_flags = [c.kwargs["transform_on_write"] for c in fs.push.call_args_list] + + assert all(t == PushMode.OFFLINE for t in tos) + assert lengths == [2, 2, 2] + + # Ensure we really saw 3 distinct (allow_registry_cache, transform_on_write) combos + assert len({(a, t) for a, t in zip(allow_flags, transform_flags)}) == 3 + + +def test_offline_batcher_interval_flush(mock_fs_factory): + """ + With batching enabled and a large batch_size, ensure that the time-based + flush still triggers even when the size threshold is never reached. + """ + fs = mock_fs_factory(online_write=True) + _enable_offline_batching_config( + fs, + enabled=True, + batch_size=100, # won't be hit by this test + batch_interval_seconds=1, # small interval + ) + + client = TestClient(get_app(fs)) + + # Send a single OFFLINE push (2 rows), below size threshold + resp = client.post( + "/push", json=push_body_many(PushMode.OFFLINE, count=2, id_start=500) + ) + assert resp.status_code == 200 + + # Immediately after: no sync push yet (buffer only) + assert fs.push.call_count == 0 + + # Wait up to ~3 seconds for interval-based flush + deadline = time.time() + 3.0 + while time.time() < deadline and fs.push.call_count < 1: + time.sleep(0.1) + + assert fs.push.call_count == 1 + kwargs = fs.push.call_args.kwargs + assert kwargs["to"] == PushMode.OFFLINE + assert len(kwargs["df"]) == 2