-
Notifications
You must be signed in to change notification settings - Fork 34
Fix threads in offline deployment #333
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Add max_threads configuration to settings.json with separate values for local (4) and online (2) deployments - Add get_max_threads() helper function in common.py that returns appropriate thread count based on deployment mode - Add Threads number input in parameter_section for local mode UI - Update CommandExecutor.run_multiple_commands() to use semaphore limiting based on max_threads setting - Update CommandExecutor.run_topp() to pass -threads parameter to TOPP tools with intelligently distributed thread allocation Thread distribution algorithm: - parallel_commands = min(num_files, max_threads) - threads_per_command = max(1, max_threads // parallel_commands) This ensures optimal resource usage: with 4 max_threads and 2 files, runs 2 parallel commands with 2 threads each; with 4 files, runs 4 parallel commands with 1 thread each. https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi
Pre-initialize max_threads_override in session state before rendering the widget, and remove the value parameter to let Streamlit read from session state. This ensures user changes are properly tracked. https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📝 WalkthroughWalkthroughReplaced Streamlit direct number input with the unified Changes
Sequence Diagram(s)sequenceDiagram
participant UI as UI (Streamlit input_widget)
participant Sess as SessionState
participant PM as ParameterManager
participant Common as common.get_max_threads
participant Exec as CommandExecutor
rect rgba(200,200,255,0.5)
UI->>Sess: render/input `max_threads` (key="max_threads")
Sess-->>PM: persist/reflect param (optional)
end
rect rgba(200,255,200,0.5)
Exec->>Common: get_max_threads(parameter_manager=PM)
Common->>PM: load persisted params (local path)
PM-->>Common: return persisted max_threads (or none)
Common-->>Exec: resolved max_threads
end
rect rgba(255,200,200,0.5)
Exec->>Exec: schedule/run commands using max_threads
end
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/common/common.py`:
- Around line 35-55: get_max_threads may return zero, negative or non-int values
from settings or the UI override which can cause Semaphore(0) deadlocks or
divide-by-zero in threads_per_command; update get_max_threads to coerce the
chosen value to an integer and clamp it to a minimum of 1 before returning
(validate values coming from max_threads_config.get("local"/"online") and
st.session_state.get("max_threads_override") and ensure any non-numeric input
falls back to the configured default then to 1). Ensure the function references:
get_max_threads, max_threads_config, and the "max_threads_override" session key
are used when determining/normalizing the returned value.
🧹 Nitpick comments (2)
src/workflow/StreamlitUI.py (1)
1111-1122: Consider capping “Threads” to available CPU cores.This prevents accidental oversubscription on local runs and keeps UI intent aligned with host capacity.
♻️ Proposed tweak
if not st.session_state.settings.get("online_deployment", False): # Initialize session state with default if not set if "max_threads_override" not in st.session_state: max_threads_config = st.session_state.settings.get("max_threads", {}) st.session_state.max_threads_override = max_threads_config.get("local", 4) + max_cpu = os.cpu_count() or 1 st.number_input( "Threads", min_value=1, + max_value=max_cpu, key="max_threads_override", help="Maximum threads for parallel processing. Threads are distributed between parallel commands and per-tool thread allocation." )src/workflow/CommandExecutor.py (1)
35-78: Prefer a bounded ThreadPoolExecutor instead of spawning one thread per command.Even with a semaphore, this still creates N threads; a pool limits thread count and reduces overhead for large batches.
♻️ Proposed refactor
def run_multiple_commands( self, commands: list[str] ) -> bool: @@ max_threads = get_max_threads() num_commands = len(commands) parallel_commands = min(num_commands, max_threads) + if num_commands == 0: + self.logger.log("No commands to run.", 1) + return True + # Log the start of command execution self.logger.log(f"Running {num_commands} commands (max {parallel_commands} parallel, {max_threads} total threads)...", 1) start_time = time.time() - results = [] - lock = threading.Lock() - semaphore = threading.Semaphore(parallel_commands) - - def run_and_track(cmd): - with semaphore: - success = self.run_command(cmd) - with lock: - results.append(success) - - # Initialize a list to keep track of threads - threads = [] - - # Start a new thread for each command - for cmd in commands: - thread = threading.Thread(target=run_and_track, args=(cmd,)) - thread.start() - threads.append(thread) - - # Wait for all threads to complete - for thread in threads: - thread.join() + from concurrent.futures import ThreadPoolExecutor + with ThreadPoolExecutor(max_workers=parallel_commands) as pool: + futures = [pool.submit(self.run_command, cmd) for cmd in commands] + results = [f.result() for f in futures]
Use input_widget() which integrates with the parameter manager to persist the value to params.json. Also copy value to session state for get_max_threads() to access. https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi
- Pass parameter_manager to get_max_threads() to read persisted value - Remove session state copying workaround in StreamlitUI - Cleaner design that reads directly from params.json https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi
- Move function from common.py to CommandExecutor as private method - Simplifies code by using self.parameter_manager directly - Remove unnecessary import and parameter passing https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi
Coerce to int and clamp to minimum of 1 to prevent: - Semaphore(0) deadlocks - Divide-by-zero in threads_per_command calculation - Issues from non-numeric config values https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi
Logs to minimal log (level 0) when max_threads is non-numeric or < 1. https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi
UI already enforces min_value=1, keep just the validation. https://claude.ai/code/session_01XGuw7AxoKRWHr9EZX1SDXi
Summary by CodeRabbit
Refactor
New Features
✏️ Tip: You can customize this high-level summary in your review settings.