Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fs_attachment/__manifest__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
{
"name": "Base Attachment Object Store",
"summary": "Store attachments on external object store",
"version": "16.0.2.0.1",
"version": "16.0.2.0.3",
"author": "Camptocamp, ACSONE SA/NV, Odoo Community Association (OCA)",
"license": "AGPL-3",
"development_status": "Beta",
Expand Down
94 changes: 55 additions & 39 deletions fs_attachment/models/fs_file_gc.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright 2023 ACSONE SA/NV
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
import gc
import logging
import threading
from contextlib import closing, contextmanager
Expand Down Expand Up @@ -118,51 +119,66 @@ def _gc_files(self) -> None:
# commit to release the lock
cr.commit() # pylint: disable=invalid-commit

# Max orphan files processed per batch. Bounds the per-run memory
# footprint of the autovacuum job when many orphans are queued:
# fsspec backend clients (adlfs, s3fs, ...) keep response buffers
# and connection-pool state alive until their referents are
# collected. Loading tens of thousands of fnames into a single
# Python list and iterating ``fs.rm`` in one pass was enough to
# hit Odoo's ``limit_memory_hard`` on production workers.
_GC_BATCH_SIZE = 500

def _gc_files_unsafe(self) -> None:
# get the list of fs.storage codes that must be autovacuumed
codes = (
self.env["fs.storage"].search([]).filtered("autovacuum_gc").mapped("code")
)
if not codes:
return
# we process by batch of storage codes.
self._cr.execute(
"""
SELECT
fs_storage_code,
array_agg(store_fname)

FROM
fs_file_gc
WHERE
fs_storage_code IN %s
AND NOT EXISTS (
SELECT 1
FROM ir_attachment
WHERE store_fname = fs_file_gc.store_fname
)
GROUP BY
fs_storage_code
""",
(tuple(codes),),
)
for code, store_fnames in self._cr.fetchall():
# Process one storage at a time and paginate both the SELECT and
# the fs.rm loop so neither the Python list of file names nor
# the storage SDK's response buffers can grow unbounded.
for code in codes:
self.env["fs.storage"].get_by_code(code)
fs = self.env["fs.storage"].get_fs_by_code(code)
for store_fname in store_fnames:
try:
file_path = store_fname.partition("://")[2]
fs.rm(file_path)
except Exception:
_logger.debug("Failed to remove file %s", store_fname)

# delete the records from the table fs_file_gc
self._cr.execute(
"""
DELETE FROM
fs_file_gc
WHERE
fs_storage_code IN %s
""",
(tuple(codes),),
)
while True:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tank you for your proposal. This makes the code more robust since it becomes less sensitive to the amount of data the garbage collector has to process. I'm still not sure about the first part of the change (#596) But this one LGTM. 😏 So let's discuss about #596 so we can move forward.

self._cr.execute(
"""
SELECT store_fname
FROM fs_file_gc
WHERE fs_storage_code = %s
AND NOT EXISTS (
SELECT 1
FROM ir_attachment
WHERE store_fname = fs_file_gc.store_fname
)
LIMIT %s
""",
(code, self._GC_BATCH_SIZE),
)
rows = self._cr.fetchall()
if not rows:
break
fnames = [row[0] for row in rows]
for store_fname in fnames:
try:
file_path = store_fname.partition("://")[2]
fs.rm(file_path)
except Exception:
_logger.debug("Failed to remove file %s", store_fname)
# Always clear this batch from fs_file_gc — fs.rm failures
# leak the blob in the backend (same behaviour as the
# pre-batching implementation) but the DB row must go or
# the next SELECT would re-fetch the same rows and the
# loop would never terminate.
self._cr.execute(
"DELETE FROM fs_file_gc WHERE store_fname = ANY(%s)",
(fnames,),
)
# Force a collection between batches to reclaim response
# buffers and connection objects held by the storage SDK
# that would otherwise only be freed on worker exit. Do
# NOT commit here: the caller (_gc_files) holds a SHARE
# lock on fs_file_gc and ir_attachment for consistency
# and commits at the end.
gc.collect()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TecnologiaIG This looks like a bug into the related fsspec's client backends... But batching process could be a good idea. Taking into account your PR and #599 I've the feeling that we must define a proper api that will allow to share the same methods between the 2 addons. I'll think about and make a proposal.

16 changes: 16 additions & 0 deletions fs_attachment/readme/newsfragments/gc_batching.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
Paginate the autovacuum GC loop to bound worker memory.

``FsFileGC._gc_files_unsafe`` used to load the entire backlog of orphan
files into a single Python list via ``array_agg(store_fname)`` and iterate
``fs.rm`` over all of them in one pass. With the Azure Blob backend and
tens of thousands of orphans queued, each HEAD+DELETE pair retained
response buffers and connection-pool state inside the adlfs client that
was only released when the worker exited. The autovacuum cron hit Odoo's
``limit_memory_hard`` and got ``SIGKILL``'d mid-run every time, so the
queue never drained and the next worker ran the same failing loop.

The SELECT and the ``fs.rm`` loop are now paginated in batches of 500 per
storage, with an explicit ``gc.collect()`` between batches. The caller
(``_gc_files``) still holds the ``SHARE`` lock and performs the final
commit, so the consistency guarantees and transactional semantics are
unchanged.
Loading