Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 60 additions & 32 deletions burr/core/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@
except ImportError:
Self = None

# Error message template for uninitialized SQLitePersister
_UNINITIALIZED_PERSISTER_ERROR = (
"Uninitialized persister: table '{table_name}' does not exist. "
"Make sure to call .initialize() on the persister before passing it "
"to the ApplicationBuilder."
)


class PersistedStateData(TypedDict):
partition_key: str
Expand Down Expand Up @@ -444,12 +451,19 @@ def list_app_ids(self, partition_key: Optional[str], **kwargs) -> list[str]:
)

cursor = self.connection.cursor()
cursor.execute(
f"SELECT DISTINCT app_id FROM {self.table_name} "
f"WHERE partition_key = ? "
f"ORDER BY created_at DESC",
(partition_key,),
)
try:
cursor.execute(
f"SELECT DISTINCT app_id FROM {self.table_name} "
f"WHERE partition_key = ? "
f"ORDER BY created_at DESC",
(partition_key,),
)
except sqlite3.OperationalError as e:
if "no such table" in str(e):
raise RuntimeError(
_UNINITIALIZED_PERSISTER_ERROR.format(table_name=self.table_name)
) from e
raise
app_ids = [row[0] for row in cursor.fetchall()]
return app_ids

Expand All @@ -475,27 +489,34 @@ def load(
)
logger.debug("Loading %s, %s, %s", partition_key, app_id, sequence_id)
cursor = self.connection.cursor()
if app_id is None:
# get latest for all app_ids
cursor.execute(
f"SELECT position, state, sequence_id, app_id, created_at, status FROM {self.table_name} "
f"WHERE partition_key = ? "
f"ORDER BY CREATED_AT DESC LIMIT 1",
(partition_key,),
)
elif sequence_id is None:
cursor.execute(
f"SELECT position, state, sequence_id, app_id, created_at, status FROM {self.table_name} "
f"WHERE partition_key = ? AND app_id = ? "
f"ORDER BY sequence_id DESC LIMIT 1",
(partition_key, app_id),
)
else:
cursor.execute(
f"SELECT position, state, sequence_id, app_id, created_at, status FROM {self.table_name} "
f"WHERE partition_key = ? AND app_id = ? AND sequence_id = ?",
(partition_key, app_id, sequence_id),
)
try:
if app_id is None:
# get latest for all app_ids
cursor.execute(
f"SELECT position, state, sequence_id, app_id, created_at, status FROM {self.table_name} "
f"WHERE partition_key = ? "
f"ORDER BY CREATED_AT DESC LIMIT 1",
(partition_key,),
)
elif sequence_id is None:
cursor.execute(
f"SELECT position, state, sequence_id, app_id, created_at, status FROM {self.table_name} "
f"WHERE partition_key = ? AND app_id = ? "
f"ORDER BY sequence_id DESC LIMIT 1",
(partition_key, app_id),
)
else:
cursor.execute(
f"SELECT position, state, sequence_id, app_id, created_at, status FROM {self.table_name} "
f"WHERE partition_key = ? AND app_id = ? AND sequence_id = ?",
(partition_key, app_id, sequence_id),
)
except sqlite3.OperationalError as e:
if "no such table" in str(e):
raise RuntimeError(
_UNINITIALIZED_PERSISTER_ERROR.format(table_name=self.table_name)
) from e
raise
row = cursor.fetchone()
if row is None:
return None
Expand Down Expand Up @@ -551,11 +572,18 @@ def save(
)
cursor = self.connection.cursor()
json_state = json.dumps(state.serialize(**self.serde_kwargs))
cursor.execute(
f"INSERT INTO {self.table_name} (partition_key, app_id, sequence_id, position, state, status) "
f"VALUES (?, ?, ?, ?, ?, ?)",
(partition_key, app_id, sequence_id, position, json_state, status),
)
try:
cursor.execute(
f"INSERT INTO {self.table_name} (partition_key, app_id, sequence_id, position, state, status) "
f"VALUES (?, ?, ?, ?, ?, ?)",
(partition_key, app_id, sequence_id, position, json_state, status),
)
except sqlite3.OperationalError as e:
if "no such table" in str(e):
raise RuntimeError(
_UNINITIALIZED_PERSISTER_ERROR.format(table_name=self.table_name)
) from e
raise
self.connection.commit()

def cleanup(self):
Expand Down
32 changes: 32 additions & 0 deletions tests/core/test_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,38 @@ def test_sqlite_persistence_is_initialized_true_new_connection(tmp_path):
p2.cleanup()


def test_sqlite_persister_load_without_initialize_raises_runtime_error():
"""Test that calling load() without initialize() raises a clear RuntimeError."""
persister = SQLLitePersister(db_path=":memory:", table_name="test_table")
try:
with pytest.raises(RuntimeError, match="Uninitialized persister"):
persister.load("partition_key", "app_id")
finally:
persister.cleanup()


def test_sqlite_persister_save_without_initialize_raises_runtime_error():
"""Test that calling save() without initialize() raises a clear RuntimeError."""
persister = SQLLitePersister(db_path=":memory:", table_name="test_table")
try:
with pytest.raises(RuntimeError, match="Uninitialized persister"):
persister.save(
"partition_key", "app_id", 1, "position", State({"key": "value"}), "completed"
)
finally:
persister.cleanup()


def test_sqlite_persister_list_app_ids_without_initialize_raises_runtime_error():
"""Test that calling list_app_ids() without initialize() raises a clear RuntimeError."""
persister = SQLLitePersister(db_path=":memory:", table_name="test_table")
try:
with pytest.raises(RuntimeError, match="Uninitialized persister"):
persister.list_app_ids("partition_key")
finally:
persister.cleanup()


@pytest.mark.parametrize(
"method_name,kwargs",
[
Expand Down
Loading