Skip to content

feat: remote stage in#468

Open
cmeesters wants to merge 27 commits into
mainfrom
feat/remote_stage_in
Open

feat: remote stage in#468
cmeesters wants to merge 27 commits into
mainfrom
feat/remote_stage_in

Conversation

@cmeesters

@cmeesters cmeesters commented Jun 8, 2026

Copy link
Copy Markdown
Member

This PR will add the option for multi-node stage in on SLURM clusters - will only work with a corresponding update of jobstep plugin.

The ultimate goal is to enable pooling of shared memory tasks (e.g. all mapping rules of a workflow within one SLURM job, possibly spanning multiple nodes). This is useful, because:

  • demanded by some cluster admins, who otherwise see too many jobs
  • we then only have to have one stage-in process instead of many (as are now demanded per job by this PR for this plugin or for the fs storage plugin in general).

However, together with the corresponding PR for the jobstep plugin (snakemake/snakemake-executor-plugin-slurm-jobstep#48) this has the additional charm that there is no dependency for the fs storage plugin.

Furthermore, pooling of smp tasks is hardly beneficial if there is no automatic stage-in process as this will inevitably lead to random I/O (which already is an issue as some users do not throttle the submission rate, jobs reading shared reference file and then simple cause random access and impede file system performance).

After this PR, the plugin will use sbcast for files <= 4G for multi node stage-in and scp otherwise. The latter relies on ssh being possible across nodes on a cluster (either by auth based logins on own jobs or otherwise). Currently, there is no fallback to sbcast if files are too big and ssh is not working.

Summary by CodeRabbit

  • New Features
    • Automatic node-local stage-in for SLURM runs when inputs use random/mixed access patterns, configurable via node-local-prefix and suppress-auto-stagein.
    • Improved SLURM jobstep flag handling, including optional pass-command-as-script support.
    • Adds warnings when staged inputs may be slow for larger files.
  • Documentation
    • Added “Automatic Stage-In” documentation, including enable/disable behavior and transfer mechanism details.
  • Tests
    • Added integration coverage for stage-in via sbcast and SSH.
  • Chores
    • Updated CI formatting checks to run Black with an additional debug step.

@coderabbitai

coderabbitai Bot commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Warning

Review limit reached

@cmeesters, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 23 minutes and 11 seconds. Learn how PR review limits work.

Your organization has used up its prepaid credits, and credit purchases are no longer available. Enable the review add-on in the billing tab to keep reviews running — you're only billed for reviews past your plan's rate limits ($0.25/file).

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

To avoid repeated limits, reduce automatic review volume by pausing incremental auto-reviews earlier, using label-based review opt-in, excluding WIP or generated PR titles, or requesting reviews manually when the PR is ready. If your team needs uninterrupted high-volume reviews, an organization admin can enable usage-based credits.

🚦 How do rate limits work?

CodeRabbit enforces per-developer PR review limits for each organization. Most developers receive the normal plan refill rate.

For paid Pro and Pro+ PR reviews, CodeRabbit uses adaptive limits for sustained high-volume activity. When a developer's recent PR review activity reaches the 95th percentile or higher among CodeRabbit users, the refill rate gradually slows as usage increases. The highest same-day bursts are limited more strictly.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 7ed2e9ec-4d78-4cad-b15b-9aad3ada8498

📥 Commits

Reviewing files that changed from the base of the PR and between 6869eb5 and 411d053.

📒 Files selected for processing (3)
  • .github/workflows/ci.yml
  • pyproject.toml
  • snakemake_executor_plugin_slurm/__init__.py

Walkthrough

The PR adds automatic node-local stage-in support to the SLURM executor plugin. Two new utility functions (encode_deferred_envvars, get_file_size) are added to utils.py. ExecutorSettings gains node_local_prefix and suppress_auto_stagein fields. The executor wires the new prefix into jobstep arguments and warns about large RANDOM/MIXED-access inputs before job submission. Two Snakefile test fixtures and integration tests validate sbcast and SSH stage-in paths. Documentation and a CI debug step are also included.

Changes

Automatic Node-Local Stage-In Feature

Layer / File(s) Summary
Stage-in utility functions and unit tests
snakemake_executor_plugin_slurm/utils.py, tests/test_node_local_prefix.py
encode_deferred_envvars() rewrites unescaped $VAR/${VAR} to __ENV_VAR__ placeholders; get_file_size() sums job input bytes recursively and returns GB rounded via round_half_up. Unit tests cover rewriting and literal-passthrough.
ExecutorSettings fields and executor arg/job wiring
snakemake_executor_plugin_slurm/__init__.py
Imports AccessPattern, STORE_KEY, and get_file_size. Adds node_local_prefix and suppress_auto_stagein dataclass fields. additional_general_args() appends --slurm-jobstep-node-local-prefix with encoded envvars when enabled. run_jobs() scans RANDOM/MIXED access-pattern inputs and logs a size warning for inputs exceeding 100 GB. String formatting for preemption warning is also adjusted.
Snakefile fixtures and integration tests
tests/testcases/stagein_sbcast/Snakefile, tests/testcases/stagein_ssh/Snakefile, tests/test_stagein.py
stagein_sbcast uses access.random to count words via wc -l; stagein_ssh creates a large sparse file and stages it via access.multi. LocalStageinTestcasesBase runs workflows via SnakemakeApi; TestStageInSbcast and TestStageInSSH assert expected output files and contents.
Automatic Stage-In documentation
docs/further.md
Adds the #### Automatic Stage-In subsection describing enablement, prefix configuration, suppression option, sbcast-vs-scp selection logic, and current limitations.

CI Black Debug Step

Layer / File(s) Summary
CI Black environment debug step
.github/workflows/ci.yml
Inserts a step in the formatting job that runs black --version and black --check --diff . before the existing formatting check.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐇 Hop, hop, the files go zoom,
Stage-in magic fills the room!
sbcast for the small, scp for big,
Envvars encoded — what a jig.
Node-local paths all neatly set,
The fastest HPC run yet! 🌟

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 66.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: remote stage in' directly corresponds to the main feature being added: remote stage-in functionality for SLURM clusters enabling multi-node operations.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/remote_stage_in

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@cmeesters

Copy link
Copy Markdown
Member Author

Good, only the dependency on the jobstep executor is not working - needs a release, first.

Now:

  • docs
  • and test for working ssh. Perhaps check for a running job whether ssh is possible to a node and give an error when appropriate?

@cmeesters cmeesters marked this pull request as ready for review June 17, 2026 09:04
@cmeesters

Copy link
Copy Markdown
Member Author

@johanneskoester & @fbartusch first, the corresponding jobstep PR needs merging for the tests to be succeeding (or be refined as nessary).

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
tests/test_stagein.py (1)

114-124: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

test_stagein_ssh assertions are too weak to validate SSH behavior.

Lines 121-123 only verify output/log presence and a broad substring. That can pass without proving SSH stage-in happened or that the expected “SSH unavailable” error path is handled. Please assert explicit stage-in mechanism evidence (or explicit failure messaging) in ssh.log.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/test_stagein.py` around lines 114 - 124, The assertions in
test_stagein_ssh only verify that files exist and check for a broad substring in
size_file, but do not validate the actual SSH stage-in behavior or proper error
handling. Replace or supplement the weak assertions (lines checking
size_file.exists(), ssh_log.exists(), and the "data/big" substring check) with
stronger assertions that explicitly verify SSH behavior evidence or expected
error messaging in ssh.log. Specifically, read the contents of ssh.log and
assert for specific SSH mechanism details or expected failure messages that
prove either successful SSH stage-in occurred or that SSH unavailability was
properly handled.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In @.github/workflows/ci.yml:
- Around line 27-31: The "Debug black environment" step runs black with the
--check flag, which causes it to exit with a non-zero code when formatting
issues are found, blocking subsequent steps from executing. To make this debug
step non-blocking while preserving diagnostic output, add `continue-on-error:
true` as a property of the step (at the same indentation level as the name and
run keys), or alternatively remove the --check flag from the black command and
keep only the diagnostic outputs (version and diff). This allows the subsequent
"Check formatting" step to always execute and report the official formatting
check result.

In `@snakemake_executor_plugin_slurm/__init__.py`:
- Around line 697-715: The get_file_size function call in the job input file
validation loop is incorrect. The function expects a JobExecutorInterface
parameter but is being passed inp.path instead, which will cause a runtime
error. Replace the get_file_size(inp.path) call with the correct invocation that
passes the job object. Additionally, update the warning message to reflect the
correct unit of measurement returned by get_file_size (GB) instead of bytes,
since the function returns rounded GB values not byte counts.

In `@tests/testcases/stagein_ssh/Snakefile`:
- Around line 17-19: The default value assigned to size_bytes (approximately 2
GiB) is below the 4 GB threshold needed to trigger the scp stage-in path for
SSH. According to the documentation, sbcast handles files ≤ 4 GB while scp
handles files > 4 GB. Increase the default value in the os.environ.get call for
"STAGEIN_TEST_BIG_BYTES" from 2 * 1024 * 1024 * 1024 + 1 to a value larger than
4 GB (such as 4 * 1024 * 1024 * 1024 + 1 or greater) to ensure the test
exercises the scp path as intended.

---

Outside diff comments:
In `@tests/test_stagein.py`:
- Around line 114-124: The assertions in test_stagein_ssh only verify that files
exist and check for a broad substring in size_file, but do not validate the
actual SSH stage-in behavior or proper error handling. Replace or supplement the
weak assertions (lines checking size_file.exists(), ssh_log.exists(), and the
"data/big" substring check) with stronger assertions that explicitly verify SSH
behavior evidence or expected error messaging in ssh.log. Specifically, read the
contents of ssh.log and assert for specific SSH mechanism details or expected
failure messages that prove either successful SSH stage-in occurred or that SSH
unavailability was properly handled.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 4481e155-74c3-40cd-b083-4df76bbc9da7

📥 Commits

Reviewing files that changed from the base of the PR and between cdb97d5 and 037495e.

📒 Files selected for processing (8)
  • .github/workflows/ci.yml
  • docs/further.md
  • snakemake_executor_plugin_slurm/__init__.py
  • snakemake_executor_plugin_slurm/utils.py
  • tests/test_node_local_prefix.py
  • tests/test_stagein.py
  • tests/testcases/stagein_sbcast/Snakefile
  • tests/testcases/stagein_ssh/Snakefile

Comment thread .github/workflows/ci.yml
Comment on lines +697 to +715
for job in jobs:
for inp in job.input:
if isinstance(
inp.flags.get(STORE_KEY), AccessPattern
) and inp.flags[STORE_KEY] in {
AccessPattern.RANDOM,
AccessPattern.MIXED,
}:
size = get_file_size(inp.path)
if size is not None and size > 100:
self.logger.warning(
f"Job '{job.name}' has input file '{inp.path}' with "
f"random or mixed access pattern and size {size} "
"bytes. Snakemake will attempt to stage in this file "
"to the node local directory specified by "
f"{self.workflow.executor_settings.node_local_prefix}. "
"However, for files of this size the process might be "
"slow."
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Fix get_file_size call contract; current code can crash before submission.

Line 705 passes inp.path into get_file_size, but that helper expects a JobExecutorInterface and iterates job.input_files. This will raise at runtime on RANDOM/MIXED inputs and break job submission. Also, the log text says “bytes” although the helper returns rounded GB.

Proposed fix
-            for job in jobs:
-                for inp in job.input:
-                    if isinstance(
-                        inp.flags.get(STORE_KEY), AccessPattern
-                    ) and inp.flags[STORE_KEY] in {
-                        AccessPattern.RANDOM,
-                        AccessPattern.MIXED,
-                    }:
-                        size = get_file_size(inp.path)
-                        if size is not None and size > 100:
-                            self.logger.warning(
-                                f"Job '{job.name}' has input file '{inp.path}' with "
-                                f"random or mixed access pattern and size {size} "
-                                "bytes. Snakemake will attempt to stage in this file "
-                                "to the node local directory specified by "
-                                f"{self.workflow.executor_settings.node_local_prefix}. "
-                                "However, for files of this size the process might be "
-                                "slow."
-                            )
+            for job in jobs:
+                has_random_or_mixed = any(
+                    isinstance(inp.flags.get(STORE_KEY), AccessPattern)
+                    and inp.flags[STORE_KEY] in {AccessPattern.RANDOM, AccessPattern.MIXED}
+                    for inp in job.input
+                )
+                if has_random_or_mixed:
+                    size_gb = get_file_size(job)
+                    if size_gb > 100:
+                        self.logger.warning(
+                            f"Job '{job.name}' has RANDOM/MIXED inputs totaling "
+                            f"about {size_gb} GB. Snakemake will attempt node-local "
+                            f"stage-in via {self.workflow.executor_settings.node_local_prefix}, "
+                            "which may be slow for inputs of this size."
+                        )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@snakemake_executor_plugin_slurm/__init__.py` around lines 697 - 715, The
get_file_size function call in the job input file validation loop is incorrect.
The function expects a JobExecutorInterface parameter but is being passed
inp.path instead, which will cause a runtime error. Replace the
get_file_size(inp.path) call with the correct invocation that passes the job
object. Additionally, update the warning message to reflect the correct unit of
measurement returned by get_file_size (GB) instead of bytes, since the function
returns rounded GB values not byte counts.

Comment thread tests/testcases/stagein_ssh/Snakefile
cmeesters and others added 2 commits June 17, 2026 11:57
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
snakemake_executor_plugin_slurm/__init__.py (1)

698-716: ⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Critical: Variable shadowing, redundant loop, and API mismatch break stage-in detection.

The current code has multiple issues:

  1. Variable shadowing: The outer for inp in job.input: (line 698) is immediately shadowed by the inner any(...for inp in job.input) (line 703), making the outer loop's inp value unpredictable.

  2. Redundant outer loop: has_random_or_mixed is computed identically on every iteration since the any() always iterates over all inputs.

  3. Wrong variable after loop: At line 706, inp references the last input from the outer loop, not necessarily one with random/mixed access pattern.

  4. API mismatch: get_file_size(inp.path) will fail at runtime—the function expects a JobExecutorInterface, not a path (see utils.py contract).

  5. Unit error: Warning text says "bytes" but get_file_size returns rounded GB.

Proposed fix
         if (
             not self.workflow.executor_settings.suppress_auto_stagein
             and self.workflow.executor_settings.node_local_prefix
         ):
             for job in jobs:
-                for inp in job.input:
-                    has_random_or_mixed = any(
-                        isinstance(inp.flags.get(STORE_KEY), AccessPattern)
-                        and inp.flags[STORE_KEY]
-                        in {AccessPattern.RANDOM, AccessPattern.MIXED}
-                        for inp in job.input
-                    )
-                if has_random_or_mixed:
-                    size = get_file_size(inp.path)
-                    if size is not None and size > 100:
-                        self.logger.warning(
-                            f"Job '{job.name}' has input file '{inp.path}' with "
-                            f"random or mixed access pattern and size {size} "
-                            "bytes. Snakemake will attempt to stage in this file "
-                            "to the node local directory specified by "
-                            f"{self.workflow.executor_settings.node_local_prefix}. "
-                            "However, for files of this size the process might be "
-                            "slow."
-                        )
+                has_random_or_mixed = any(
+                    isinstance(inp.flags.get(STORE_KEY), AccessPattern)
+                    and inp.flags[STORE_KEY] in {AccessPattern.RANDOM, AccessPattern.MIXED}
+                    for inp in job.input
+                )
+                if has_random_or_mixed:
+                    size_gb = get_file_size(job)
+                    if size_gb is not None and size_gb > 100:
+                        self.logger.warning(
+                            f"Job '{job.name}' has RANDOM/MIXED inputs totaling "
+                            f"about {size_gb} GB. Snakemake will attempt node-local "
+                            f"stage-in via {self.workflow.executor_settings.node_local_prefix}, "
+                            "which may be slow for inputs of this size."
+                        )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@snakemake_executor_plugin_slurm/__init__.py` around lines 698 - 716, Remove
the outer redundant for loop that starts at line 698 and move the
has_random_or_mixed check outside to compute it once. Then create a separate
loop after the check to iterate only through inputs that have random or mixed
access patterns, checking the file size for each matching input. Fix the
get_file_size() function call to pass the correct parameters according to its
signature in utils.py (it expects a JobExecutorInterface or similar, not just a
path). Finally, update the warning message to use the correct unit label that
get_file_size actually returns (likely GB instead of bytes) to match the actual
return value.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@snakemake_executor_plugin_slurm/__init__.py`:
- Around line 698-716: Remove the outer redundant for loop that starts at line
698 and move the has_random_or_mixed check outside to compute it once. Then
create a separate loop after the check to iterate only through inputs that have
random or mixed access patterns, checking the file size for each matching input.
Fix the get_file_size() function call to pass the correct parameters according
to its signature in utils.py (it expects a JobExecutorInterface or similar, not
just a path). Finally, update the warning message to use the correct unit label
that get_file_size actually returns (likely GB instead of bytes) to match the
actual return value.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: def6cc08-111f-431f-81c7-b6a7e2b33dc8

📥 Commits

Reviewing files that changed from the base of the PR and between 024803e and 6869eb5.

📒 Files selected for processing (1)
  • snakemake_executor_plugin_slurm/__init__.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant