Skip to content

#714 Release all locks when Pramen job exits abnormally.#715

Merged
yruslan merged 2 commits intomainfrom
feature/714-auto-release-all-locks
Mar 4, 2026
Merged

#714 Release all locks when Pramen job exits abnormally.#715
yruslan merged 2 commits intomainfrom
feature/714-auto-release-all-locks

Conversation

@yruslan
Copy link
Collaborator

@yruslan yruslan commented Feb 27, 2026

Closes #714

Summary by CodeRabbit

  • New Features

    • Centralized token lock registry to track and release active locks
    • Locks are now automatically released when a pipeline finishes
    • Token locks expose a token identifier property
  • Refactor

    • Enhanced lock lifecycle and cleanup to ensure deregistration on release
    • Factory now propagates token into created lock instances
  • Tests

    • Added a test to verify registry-driven release of all locks

@coderabbitai
Copy link

coderabbitai bot commented Feb 27, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 9858334b-5946-47c1-9552-03b7b32e8e8c

📥 Commits

Reviewing files that changed from the base of the PR and between 7fa6b3e and 1ade7f5.

📒 Files selected for processing (2)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockBase.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockRegistry.scala
🚧 Files skipped from review as they are similar to previous changes (1)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockRegistry.scala

Walkthrough

Introduces a token property to the TokenLock API and adds a TokenLockRegistry singleton to track, unregister, and release active token locks. TokenLockBase and implementations now register/unregister with the registry; pipeline shutdown calls releaseAllLocks() to clean up remaining locks.

Changes

Cohort / File(s) Summary
Token Lock API
pramen/api/src/main/scala/za/co/absa/pramen/api/lock/TokenLock.scala
Added abstract def token: String to TokenLock trait (API surface change).
Token Lock Base & Registry
pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockBase.scala, pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockRegistry.scala
TokenLockBase constructor now exposes override val token: String; acquisition registers lock via TokenLockRegistry.registerLock(this) and release ensures unregisterLock(this) in finally. New TokenLockRegistry singleton added to manage active locks and bulk release.
Implementations & Factory
pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockAllow.scala, pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryAllow.scala
TokenLockAllow now accepts override val token: String and implements close(); factory constructs TokenLockAllow with token parameter.
Pipeline integration
pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala
onAppFinish() now calls TokenLockRegistry.releaseAllLocks() to release remaining locks on finish.
Tests / Mocks
pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/lock/TokenLockMock.scala, pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala
TokenLockMock constructor exposes override val token: String = "mock"; new test asserts registry releaseAllLocks() releases acquired locks.

Sequence Diagram

sequenceDiagram
    participant Job as Job / Pipeline
    participant Lock as TokenLock Instance
    participant Registry as TokenLockRegistry
    participant State as PipelineStateImpl

    Job->>Lock: tryAcquire()
    activate Lock
    Lock->>Lock: evaluate isAcquired
    alt acquired
        Lock->>Registry: registerLock(this)
        activate Registry
        Registry->>Registry: add lock to list
        deactivate Registry
        Lock->>Job: return true
    else not acquired
        Lock->>Job: return false
    end
    deactivate Lock

    Job->>Lock: release()
    activate Lock
    Lock->>Lock: release guard & resource
    Lock->>Registry: unregisterLock(this)
    activate Registry
    Registry->>Registry: remove lock from list
    deactivate Registry
    deactivate Lock

    Job->>State: onAppFinish()
    activate State
    State->>Registry: releaseAllLocks()
    activate Registry
    Registry->>Registry: snapshot active locks
    loop for each lock
        Registry->>Lock: release()
        activate Lock
        Lock->>Lock: release resource
        deactivate Lock
    end
    Registry->>Registry: clear registry
    deactivate Registry
    State->>State: continue shutdown steps
    deactivate State
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

Poem

🐇 I hold a token, small and bright,
I register locks by day and night,
On finish call I hop and free,
No stray locks left — that's right for me! 🥕

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main change: automatically releasing all locks when a Pramen job exits abnormally, which is the primary objective of the PR.
Linked Issues check ✅ Passed The PR implements the core requirement from issue #714: automatically releasing token locks when the job exits abnormally via System.exit() or signals by introducing TokenLockRegistry and integrating lock release into the pipeline shutdown flow.
Out of Scope Changes check ✅ Passed All changes are directly related to implementing automatic token lock release on abnormal job termination, with no unrelated modifications to other features or components detected.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/714-auto-release-all-locks

Tip

Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs).
Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockBase.scala (1)

100-106: ⚠️ Potential issue | 🟠 Major

Ensure registry unregistration and hook removal run even when guard release fails.

If releaseGuardLock() throws, TokenLockRegistry.unregisterLock(this) and JvmUtils.safeRemoveShutdownHook(shutdownHook) are skipped, leaving stale lifecycle state.

🛠️ Proposed fix
     if (wasAcquired) {
       watcherThreadOpt.foreach(_.interrupt())
       watcherThreadOpt = None
-      releaseGuardLock()
-      JvmUtils.safeRemoveShutdownHook(shutdownHook)
-      TokenLockRegistry.unregisterLock(this)
+      try {
+        releaseGuardLock()
+      } finally {
+        JvmUtils.safeRemoveShutdownHook(shutdownHook)
+        TokenLockRegistry.unregisterLock(this)
+      }
       log.info(s"Lock released: '$escapedToken'.")
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockBase.scala`
around lines 100 - 106, The existing release path may skip
JvmUtils.safeRemoveShutdownHook(shutdownHook) and
TokenLockRegistry.unregisterLock(this) if releaseGuardLock() throws; update the
code to ensure shutdownHook removal and registry unregistration always run by
wrapping releaseGuardLock() in a try/finally (or equivalent) so that after
attempting releaseGuardLock() you always call
JvmUtils.safeRemoveShutdownHook(shutdownHook) and
TokenLockRegistry.unregisterLock(this); preserve the existing watcherThreadOpt
interruption and watcherThreadOpt = None/wasAcquired handling but move the
removal/unregister calls into the finally block to guarantee cleanup even on
exceptions.
🧹 Nitpick comments (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala (1)

202-214: Make lock cleanup unconditional in onAppFinish().

TokenLockRegistry.releaseAllLocks() is currently the last statement. Wrapping preceding steps in try/finally keeps lock release guaranteed even if new failure paths are introduced later.

♻️ Proposed refactor
   private[state] def onAppFinish(): Unit = {
     if (!exitedNormally && failureException.isEmpty && signalException.isDefined) {
       failureException = signalException
       exitCode |= EXIT_CODE_SIGNAL_RECEIVED
     }

     finishedInstant = Option(Instant.now())
-    sendPipelineNotifications()
-    runCustomShutdownHook()
-    removeSignalHandlers()
-    sendNotificationEmail()
-    TokenLockRegistry.releaseAllLocks()
+    try {
+      sendPipelineNotifications()
+      runCustomShutdownHook()
+      removeSignalHandlers()
+      sendNotificationEmail()
+    } finally {
+      TokenLockRegistry.releaseAllLocks()
+    }
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala`
around lines 202 - 214, The lock cleanup call
TokenLockRegistry.releaseAllLocks() must run unconditionally from onAppFinish();
wrap the existing sequence (the conditional setting of
failureException/exitCode, finishedInstant assignment,
sendPipelineNotifications(), runCustomShutdownHook(), removeSignalHandlers(),
sendNotificationEmail()) in a try block and move
TokenLockRegistry.releaseAllLocks() into the finally block so locks are always
released even if any of those methods throw; preserve existing variable updates
(exitedNormally, failureException, exitCode, finishedInstant) and ensure
exceptions continue to propagate after locks are released.
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala (1)

102-108: Strengthen this test to verify external lock release, not only internal state.

Checking isAcquired confirms local state, but re-acquiring the same token validates that the backing lock was actually released.

🧪 Proposed test improvement
     "lock registry releases all locks" in {
       val lock1 = getLock("token1")
       assert(lock1.tryAcquire())

       TokenLockRegistry.releaseAllLocks()
       assert(!lock1.asInstanceOf[TokenLockBase].isAcquired)
+
+      val lock2 = getLock("token1")
+      assert(lock2.tryAcquire())
+      lock2.release()
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala`
around lines 102 - 108, The test currently only checks internal state via
TokenLockBase.isAcquired after calling TokenLockRegistry.releaseAllLocks();
instead, after releasing, attempt to actually re-acquire the backing lock for
the same token to ensure external release: call
TokenLockRegistry.releaseAllLocks(), then obtain a new lock for the same token
using getLock("token1") (or reuse lock1) and assert that tryAcquire() returns
true (and release it afterwards). Reference getLock,
TokenLockRegistry.releaseAllLocks, TokenLockBase.isAcquired and tryAcquire to
locate where to add the re-acquire assertion.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockRegistry.scala`:
- Around line 55-60: The catch block in TokenLockRegistry where
currentListCopy.foreach calls l.release() swallows the exception; update the
NonFatal(ex) handler to include the exception when logging so failures to
release locks are visible—e.g., change the log.warn call in the catch for
NonFatal(ex) to pass the exception (or its message) along with the descriptive
text that includes l.token, ensuring the warning contains the exception/stack
trace for diagnostics.

---

Outside diff comments:
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockBase.scala`:
- Around line 100-106: The existing release path may skip
JvmUtils.safeRemoveShutdownHook(shutdownHook) and
TokenLockRegistry.unregisterLock(this) if releaseGuardLock() throws; update the
code to ensure shutdownHook removal and registry unregistration always run by
wrapping releaseGuardLock() in a try/finally (or equivalent) so that after
attempting releaseGuardLock() you always call
JvmUtils.safeRemoveShutdownHook(shutdownHook) and
TokenLockRegistry.unregisterLock(this); preserve the existing watcherThreadOpt
interruption and watcherThreadOpt = None/wasAcquired handling but move the
removal/unregister calls into the finally block to guarantee cleanup even on
exceptions.

---

Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala`:
- Around line 202-214: The lock cleanup call TokenLockRegistry.releaseAllLocks()
must run unconditionally from onAppFinish(); wrap the existing sequence (the
conditional setting of failureException/exitCode, finishedInstant assignment,
sendPipelineNotifications(), runCustomShutdownHook(), removeSignalHandlers(),
sendNotificationEmail()) in a try block and move
TokenLockRegistry.releaseAllLocks() into the finally block so locks are always
released even if any of those methods throw; preserve existing variable updates
(exitedNormally, failureException, exitCode, finishedInstant) and ensure
exceptions continue to propagate after locks are released.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala`:
- Around line 102-108: The test currently only checks internal state via
TokenLockBase.isAcquired after calling TokenLockRegistry.releaseAllLocks();
instead, after releasing, attempt to actually re-acquire the backing lock for
the same token to ensure external release: call
TokenLockRegistry.releaseAllLocks(), then obtain a new lock for the same token
using getLock("token1") (or reuse lock1) and assert that tryAcquire() returns
true (and release it afterwards). Reference getLock,
TokenLockRegistry.releaseAllLocks, TokenLockBase.isAcquired and tryAcquire to
locate where to add the re-acquire assertion.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1a7f048 and 7fa6b3e.

📒 Files selected for processing (8)
  • pramen/api/src/main/scala/za/co/absa/pramen/api/lock/TokenLock.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockAllow.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockBase.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryAllow.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockRegistry.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/lock/TokenLockMock.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala

@github-actions
Copy link

github-actions bot commented Feb 27, 2026

Unit Test Coverage

Overall Project 84.36% -0.01% 🍏
Files changed 91.94% 🍏

Module Coverage
pramen:core Jacoco Report 86.31% -0.01% 🍏
Files
Module File Coverage
pramen:core Jacoco Report TokenLockFactoryAllow.scala 100% 🍏
TokenLockRegistry.scala 95.89% -9.59% 🍏
PipelineStateImpl.scala 90.41% 🍏
TokenLockBase.scala 87.11% 🍏
TokenLockAllow.scala 69.23% -23.08%

@yruslan yruslan merged commit 70d2a69 into main Mar 4, 2026
7 checks passed
@yruslan yruslan deleted the feature/714-auto-release-all-locks branch March 4, 2026 08:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Automatically release token locks if the job invokes 'System.exit()'

1 participant