From 14f77b96657ac2e6f7138e9e0622c393643273da Mon Sep 17 00:00:00 2001 From: Liu Yiqun Date: Wed, 3 Dec 2025 11:40:05 +0800 Subject: [PATCH 1/7] Support to save the running_states. --- .../subgraph_decompose_and_evaluation_step.py | 33 ++++++++++++------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/graph_net/subgraph_decompose_and_evaluation_step.py b/graph_net/subgraph_decompose_and_evaluation_step.py index eaa926b9e..c20f6d961 100755 --- a/graph_net/subgraph_decompose_and_evaluation_step.py +++ b/graph_net/subgraph_decompose_and_evaluation_step.py @@ -119,7 +119,7 @@ def save_decompose_config( max_subgraph_size: int, tasks_map: Dict[str, Union[int, str, list, dict]], incorrect_paths: Union[List[str], Set[str]], - failed_decomposition_models: Union[List[str], Set[str]], + running_states: Dict[str, Union[int, str, list, dict]], ): """Saves the current state to a JSON file.""" @@ -133,7 +133,7 @@ def save_decompose_config( "max_subgraph_size": max_subgraph_size, "incorrect_models": list(incorrect_paths), "tasks_map": tasks_map_copy, - "failed_decomposition_models": list(failed_decomposition_models), + "running_states": running_states, } config_path = get_decompose_config_path(workspace) @@ -283,6 +283,8 @@ def generate_initial_tasks(args): """Generates tasks for Pass 0 based on the initial log file.""" print(f"[Init] Pass 0: Reading from log file: {args.log_file}") initial_failures = get_incorrect_models(args.tolerance, args.log_file) + # t1_incorrect_models = get_incorrect_models(1, args.log_file) + # initial_failures = initial_failures - t1_incorrect_models tasks_map = {} for model_path in initial_failures: @@ -295,7 +297,8 @@ def generate_initial_tasks(args): } max_subgraph_size = args.max_subgraph_size - return tasks_map, max_subgraph_size + running_states = {"pass_0": {"incorrect_models": list(initial_failures)}} + return tasks_map, max_subgraph_size, running_states def generate_refined_tasks(base_output_dir, current_pass_id): @@ -306,13 +309,14 @@ def generate_refined_tasks(base_output_dir, current_pass_id): prev_config = load_decompose_config(prev_pass_dir) prev_incorrect_subgraphs = prev_config.get("incorrect_models", []) prev_tasks_map = prev_config.get("tasks_map", {}) + running_states = prev_config.get("running_states", {}) # Load previous max size as fallback prev_max_subgraph_size = prev_config.get("max_subgraph_size") max_subgraph_size = prev_max_subgraph_size // 2 if not prev_incorrect_subgraphs: - return {}, max_subgraph_size + return {}, max_subgraph_size, running_states tasks_map = {} for subgraph_path in prev_incorrect_subgraphs: @@ -339,14 +343,14 @@ def generate_refined_tasks(base_output_dir, current_pass_id): "split_positions": set(), } - return tasks_map, max_subgraph_size + return tasks_map, max_subgraph_size, running_states def prepare_tasks_and_verify(args, current_pass_id, base_output_dir): if current_pass_id == 0: - tasks_map, max_subgraph_size = generate_initial_tasks(args) + tasks_map, max_subgraph_size, running_states = generate_initial_tasks(args) else: - tasks_map, max_subgraph_size = generate_refined_tasks( + tasks_map, max_subgraph_size, running_states = generate_refined_tasks( base_output_dir, current_pass_id ) @@ -366,7 +370,7 @@ def prepare_tasks_and_verify(args, current_pass_id, base_output_dir): ) sys.exit(0) - return tasks_map, max_subgraph_size + return tasks_map, max_subgraph_size, running_states def execute_decomposition_phase(max_subgraph_size, tasks_map, framework, workspace): @@ -440,7 +444,7 @@ def main(args): print("=" * 80) # --- Step 1: Prepare Tasks and Workspace --- - tasks_map, max_subgraph_size = prepare_tasks_and_verify( + tasks_map, max_subgraph_size, running_states = prepare_tasks_and_verify( args, current_pass_id, base_output_dir ) pass_work_dir = get_decompose_workspace_path(base_output_dir, current_pass_id) @@ -448,7 +452,6 @@ def main(args): os.makedirs(pass_work_dir, exist_ok=True) # --- Step 2: Decomposition --- - failed_decomposition = [] if task_controller.task_scheduler["run_decomposer"]: print("\n--- Phase 1: Decomposition ---", flush=True) ( @@ -458,11 +461,14 @@ def main(args): ) = execute_decomposition_phase( max_subgraph_size, tasks_map, args.framework, pass_work_dir ) + running_states.get(f"pass_{current_pass_id}", {})[ + "failed_decomposition_models" + ] = list(failed_decomposition) else: config = load_decompose_config(pass_work_dir) max_subgraph_size = config["max_subgraph_size"] - failed_decomposition = config["failed_decomposition_models"] tasks_map = config.get("tasks_map", {}) + running_states = config.get("running_states", {}) # --- Step 3: Evaluation --- pass_log_path = os.path.join(pass_work_dir, "batch_test_result.log") @@ -476,6 +482,9 @@ def main(args): print("\n--- Phase 3: Analysis ---") next_round_models = get_incorrect_models(args.tolerance, pass_log_path) print(f"[Analysis] Found {len(next_round_models)} incorrect subgraphs.\n") + running_states[f"pass_{current_pass_id + 1}"] = { + "incorrect_models": list(next_round_models) + } print_summary_and_suggestion(next_round_models, max_subgraph_size) # --- Step 5: Save States --- @@ -484,7 +493,7 @@ def main(args): max_subgraph_size, tasks_map, next_round_models, - failed_decomposition, + running_states, ) From 5678f1c8c86326b8c267470a2510448cf8400692 Mon Sep 17 00:00:00 2001 From: Liu Yiqun Date: Wed, 3 Dec 2025 14:08:28 +0800 Subject: [PATCH 2/7] Define a dataclass DecomposeConfig. --- .../subgraph_decompose_and_evaluation_step.py | 121 +++++++++--------- 1 file changed, 57 insertions(+), 64 deletions(-) diff --git a/graph_net/subgraph_decompose_and_evaluation_step.py b/graph_net/subgraph_decompose_and_evaluation_step.py index c20f6d961..03b633f7a 100755 --- a/graph_net/subgraph_decompose_and_evaluation_step.py +++ b/graph_net/subgraph_decompose_and_evaluation_step.py @@ -7,7 +7,8 @@ import argparse import subprocess import glob -from typing import List, Set, Dict, Any, Union +from dataclasses import dataclass, field +from typing import List, Dict, Union from graph_net.analysis_util import get_incorrect_models from graph_net import path_utils @@ -18,6 +19,14 @@ def convert_b64_string_to_json(b64str): return json.loads(base64.b64decode(b64str).decode("utf-8")) +def convert_json_to_b64_string(config): + return base64.b64encode(json.dumps(config).encode()).decode() + + +def get_pass_name(pass_id): + return f"pass_{pass_id}" + + class TaskController: def __init__(self, args): self.root_output_dir = os.path.abspath(args.output_dir) @@ -82,6 +91,37 @@ def _print(self): print() +@dataclass +class DecomposeConfig: + max_subgraph_size: int = -1 + incorrect_models: List[str] = field(default_factory=list) + tasks_map: Dict[str, Union[int, str, list, dict]] = field(default_factory=dict) + running_states: Dict[str, Union[int, str, list, dict]] = field(default_factory=dict) + + def save(self, work_dir): + """Save the current config to a JSON file.""" + config_path = self.get_config_path(work_dir) + + with open(config_path, "w") as f: + json.dump(self.__dict__, f, indent=4) + print(f"\n[INFO] Save state to: {config_path}") + + @classmethod + def load(self, work_dir): + """Initialize a object from a JSON file.""" + config_path = self.get_config_path(work_dir) + if not os.path.exists(config_path): + raise FileNotFoundError(f"Missing configuration file: {config_path}") + + with open(config_path, "r") as f: + data = json.load(f) + return self(**data) + + @classmethod + def get_config_path(self, work_dir) -> str: + return os.path.join(work_dir, "decompose_config.json") + + def get_rectfied_model_path(model_path): graphnet_root = path_utils.get_graphnet_root() return os.path.join(graphnet_root, model_path.split("GraphNet/")[-1]) @@ -95,53 +135,10 @@ def count_samples(samples_dir): return num_samples -def get_decompose_config_path(output_dir: str) -> str: - """Returns the full path to the decompose configuration file.""" - return os.path.join(output_dir, "decompose_config.json") - - def get_decompose_workspace_path(output_dir, pass_id): return os.path.join(output_dir, f"pass_{pass_id}") -def load_decompose_config(work_dir: str) -> Dict[str, Any]: - """Loads the configuration file from the previous pass.""" - config_path = get_decompose_config_path(work_dir) - - if not os.path.exists(config_path): - raise FileNotFoundError(f"Missing configuration file: {config_path}") - with open(config_path, "r") as f: - return json.load(f) - - -def save_decompose_config( - workspace: str, - max_subgraph_size: int, - tasks_map: Dict[str, Union[int, str, list, dict]], - incorrect_paths: Union[List[str], Set[str]], - running_states: Dict[str, Union[int, str, list, dict]], -): - """Saves the current state to a JSON file.""" - - tasks_map_copy = {} - for model_name, task_info in tasks_map.items(): - tasks_map_copy[model_name] = {} - for key in ["original_path", "split_positions"]: - tasks_map_copy[model_name][key] = task_info[key] - - config = { - "max_subgraph_size": max_subgraph_size, - "incorrect_models": list(incorrect_paths), - "tasks_map": tasks_map_copy, - "running_states": running_states, - } - config_path = get_decompose_config_path(workspace) - - with open(config_path, "w") as f: - json.dump(config, f, indent=4) - print(f"\n[INFO] Save state to: {config_path}") - - def get_model_name_with_subgraph_tag(model_path): fields = model_path.rstrip("/").split(os.sep) pattern = r"^subgraph(_\d+)?$" @@ -172,9 +169,7 @@ def run_decomposer_for_single_model( }, }, } - decorator_config_b64 = base64.b64encode( - json.dumps(decorator_config).encode() - ).decode() + decorator_config_b64 = convert_json_to_b64_string(decorator_config) print( f"[Decomposition] model_path: {model_path}, split_positions: {split_positions}" @@ -290,7 +285,6 @@ def generate_initial_tasks(args): for model_path in initial_failures: model_name = get_model_name_with_subgraph_tag(model_path) tasks_map[model_name] = { - "subgraph_path": model_path, "original_path": model_path, "subgraph_size": [0, kMaxGraphSize], "split_positions": set(), @@ -306,13 +300,13 @@ def generate_refined_tasks(base_output_dir, current_pass_id): prev_pass_dir = get_decompose_workspace_path(base_output_dir, current_pass_id - 1) print(f"[Init] Resuming from Pass_{current_pass_id - 1} (Dir: {prev_pass_dir})...") - prev_config = load_decompose_config(prev_pass_dir) - prev_incorrect_subgraphs = prev_config.get("incorrect_models", []) - prev_tasks_map = prev_config.get("tasks_map", {}) - running_states = prev_config.get("running_states", {}) + prev_config = DecomposeConfig.load(prev_pass_dir) + prev_incorrect_subgraphs = prev_config.incorrect_models + prev_tasks_map = prev_config.tasks_map + running_states = prev_config.running_states # Load previous max size as fallback - prev_max_subgraph_size = prev_config.get("max_subgraph_size") + prev_max_subgraph_size = prev_config.max_subgraph_size max_subgraph_size = prev_max_subgraph_size // 2 if not prev_incorrect_subgraphs: @@ -337,7 +331,6 @@ def generate_refined_tasks(base_output_dir, current_pass_id): if model_name not in tasks_map: tasks_map[model_name] = { - "subgraph_path": subgraph_path, "original_path": pre_task_for_model["original_path"], "subgraph_size": subgraph_size[subgraph_idx], "split_positions": set(), @@ -465,10 +458,10 @@ def main(args): "failed_decomposition_models" ] = list(failed_decomposition) else: - config = load_decompose_config(pass_work_dir) - max_subgraph_size = config["max_subgraph_size"] - tasks_map = config.get("tasks_map", {}) - running_states = config.get("running_states", {}) + config = DecomposeConfig.load(pass_work_dir) + max_subgraph_size = config.max_subgraph_size + tasks_map = config.tasks_map + running_states = config.running_states # --- Step 3: Evaluation --- pass_log_path = os.path.join(pass_work_dir, "batch_test_result.log") @@ -488,13 +481,13 @@ def main(args): print_summary_and_suggestion(next_round_models, max_subgraph_size) # --- Step 5: Save States --- - save_decompose_config( - pass_work_dir, - max_subgraph_size, - tasks_map, - next_round_models, - running_states, + config = DecomposeConfig( + max_subgraph_size=max_subgraph_size, + incorrect_models=list(next_round_models), + tasks_map=tasks_map, + running_states=running_states, ) + config.save(pass_work_dir) if __name__ == "__main__": From 7d9581f7faee64ba05409736c9037af4296a0a62 Mon Sep 17 00:00:00 2001 From: Liu Yiqun Date: Wed, 3 Dec 2025 14:10:15 +0800 Subject: [PATCH 3/7] Fix a sample. --- .../subgraph_decompose_and_evaluation_step.py | 35 ++++++++++--------- .../Mask-RT-DETR-S/subgraph_14/input_meta.py | 4 +-- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/graph_net/subgraph_decompose_and_evaluation_step.py b/graph_net/subgraph_decompose_and_evaluation_step.py index 03b633f7a..980bab0f6 100755 --- a/graph_net/subgraph_decompose_and_evaluation_step.py +++ b/graph_net/subgraph_decompose_and_evaluation_step.py @@ -278,8 +278,8 @@ def generate_initial_tasks(args): """Generates tasks for Pass 0 based on the initial log file.""" print(f"[Init] Pass 0: Reading from log file: {args.log_file}") initial_failures = get_incorrect_models(args.tolerance, args.log_file) - # t1_incorrect_models = get_incorrect_models(1, args.log_file) - # initial_failures = initial_failures - t1_incorrect_models + t1_incorrect_models = get_incorrect_models(1, args.log_file) + initial_failures = initial_failures - t1_incorrect_models tasks_map = {} for model_path in initial_failures: @@ -291,7 +291,12 @@ def generate_initial_tasks(args): } max_subgraph_size = args.max_subgraph_size - running_states = {"pass_0": {"incorrect_models": list(initial_failures)}} + running_states = { + "pass_0": { + "num_incorrect_models": len(initial_failures), + "incorrect_models": list(sorted(initial_failures)), + } + } return tasks_map, max_subgraph_size, running_states @@ -301,19 +306,14 @@ def generate_refined_tasks(base_output_dir, current_pass_id): print(f"[Init] Resuming from Pass_{current_pass_id - 1} (Dir: {prev_pass_dir})...") prev_config = DecomposeConfig.load(prev_pass_dir) - prev_incorrect_subgraphs = prev_config.incorrect_models - prev_tasks_map = prev_config.tasks_map - running_states = prev_config.running_states - - # Load previous max size as fallback - prev_max_subgraph_size = prev_config.max_subgraph_size - max_subgraph_size = prev_max_subgraph_size // 2 - - if not prev_incorrect_subgraphs: - return {}, max_subgraph_size, running_states + max_subgraph_size = prev_config.max_subgraph_size // 2 + if not prev_config.incorrect_models: + return {}, max_subgraph_size, prev_config.running_states tasks_map = {} - for subgraph_path in prev_incorrect_subgraphs: + prev_tasks_map = prev_config.tasks_map + + for subgraph_path in sorted(prev_config.incorrect_models): # Parse model name and subgraph index model_name_with_subgraph_idx = subgraph_path.rstrip("/").split(os.sep)[-1] model_name = "_".join(model_name_with_subgraph_idx.split("_")[:-1]) @@ -336,7 +336,7 @@ def generate_refined_tasks(base_output_dir, current_pass_id): "split_positions": set(), } - return tasks_map, max_subgraph_size, running_states + return tasks_map, max_subgraph_size, prev_config.running_states def prepare_tasks_and_verify(args, current_pass_id, base_output_dir): @@ -473,10 +473,11 @@ def main(args): next_round_models = set() if task_controller.task_scheduler["post_analysis"]: print("\n--- Phase 3: Analysis ---") - next_round_models = get_incorrect_models(args.tolerance, pass_log_path) + next_round_models = sorted(get_incorrect_models(args.tolerance, pass_log_path)) print(f"[Analysis] Found {len(next_round_models)} incorrect subgraphs.\n") running_states[f"pass_{current_pass_id + 1}"] = { - "incorrect_models": list(next_round_models) + "num_incorrect_models": len(next_round_models), + "incorrect_models": list(next_round_models), } print_summary_and_suggestion(next_round_models, max_subgraph_size) diff --git a/paddle_samples/PaddleX/Mask-RT-DETR-S/subgraph_14/input_meta.py b/paddle_samples/PaddleX/Mask-RT-DETR-S/subgraph_14/input_meta.py index 87587cc85..1076c8f59 100644 --- a/paddle_samples/PaddleX/Mask-RT-DETR-S/subgraph_14/input_meta.py +++ b/paddle_samples/PaddleX/Mask-RT-DETR-S/subgraph_14/input_meta.py @@ -217,9 +217,7 @@ class Program_weight_tensor_data_21: shape = [1, 8400, 4] dtype = "float32" min_val = float("-4.36945") - max_val = float("inf") - mean = float("inf") - std = float("nan") + max_val = float("5.0") data = None From 864e7b3dafb5e039c988a57e7466c9e93e31fdef Mon Sep 17 00:00:00 2001 From: Liu Yiqun Date: Wed, 3 Dec 2025 17:44:27 +0800 Subject: [PATCH 4/7] Record the number of original incorrect models. --- .../subgraph_decompose_and_evaluation_step.py | 47 ++++++++++++------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/graph_net/subgraph_decompose_and_evaluation_step.py b/graph_net/subgraph_decompose_and_evaluation_step.py index 133b9773b..683b360d9 100755 --- a/graph_net/subgraph_decompose_and_evaluation_step.py +++ b/graph_net/subgraph_decompose_and_evaluation_step.py @@ -28,8 +28,7 @@ def get_pass_name(pass_id): def get_ranged_incorrect_models(tolerance_args: List[int], log_path: str) -> set: - if not os.path.exists(log_path): - return set() + assert os.path.exists(log_path) t_start = tolerance_args[0] models_start = set(get_incorrect_models(t_start, log_path)) @@ -40,13 +39,10 @@ def get_ranged_incorrect_models(tolerance_args: List[int], log_path: str) -> set t_end = tolerance_args[1] models_end = set(get_incorrect_models(t_end, log_path)) - print(f"[Filter] Tolerance Range: {t_start} -> {t_end}") print( - f"[Filter] Fail({t_start}): {len(models_start)}, Fail({t_end}): {len(models_end)}" + f"[Init] number of incorrect models: {len(models_start)} (tolerance={t_start}) - {len(models_end)} (tolerance={t_end})" ) - - diff_set = models_start - models_end - return diff_set + return models_start - models_end class TaskController: @@ -326,6 +322,14 @@ def generate_initial_tasks(args): return tasks_map, max_subgraph_size, running_states +def extract_model_name_and_subgraph_idx(subgraph_path): + # Parse model name and subgraph index + model_name_with_subgraph_idx = subgraph_path.rstrip("/").split(os.sep)[-1] + model_name = "_".join(model_name_with_subgraph_idx.split("_")[:-1]) + subgraph_idx = int(model_name_with_subgraph_idx.split("_")[-1]) + return model_name, subgraph_idx + + def generate_refined_tasks(base_output_dir, current_pass_id): """Generates tasks for Pass > 0 based on previous pass results.""" prev_pass_dir = get_decompose_workspace_path(base_output_dir, current_pass_id - 1) @@ -340,10 +344,7 @@ def generate_refined_tasks(base_output_dir, current_pass_id): prev_tasks_map = prev_config.tasks_map for subgraph_path in sorted(prev_config.incorrect_models): - # Parse model name and subgraph index - model_name_with_subgraph_idx = subgraph_path.rstrip("/").split(os.sep)[-1] - model_name = "_".join(model_name_with_subgraph_idx.split("_")[:-1]) - subgraph_idx = int(model_name_with_subgraph_idx.split("_")[-1]) + model_name, subgraph_idx = extract_model_name_and_subgraph_idx(subgraph_path) assert model_name in prev_tasks_map pre_task_for_model = prev_tasks_map[model_name] @@ -382,11 +383,11 @@ def prepare_tasks_and_verify(args, current_pass_id, base_output_dir): base_output_dir, current_pass_id ) - print(f"[INFO] initial max_subgraph_size: {max_subgraph_size}") - print(f"[INFO] number of incorrect models: {len(tasks_map)}") - for model_name, task_info in tasks_map.items(): + print(f"[Init] initial max_subgraph_size: {max_subgraph_size}") + print(f"[Init] number of incorrect models: {len(tasks_map)}") + for idx, (model_name, task_info) in enumerate(tasks_map.items()): original_path = task_info["original_path"] - print(f"- {original_path}") + print(f"- [{idx}] {original_path}") if not tasks_map: print("[FINISHED] No models need processing.") @@ -525,12 +526,24 @@ def main(args): ) print(f"\n--- Phase 3: Analysis (torlance={tolerance}) ---") next_round_models = sorted(get_incorrect_models(tolerance, pass_log_path)) + original_model_paths = set( + [ + model_name + for subgraph_path in next_round_models + for model_name, _ in [ + extract_model_name_and_subgraph_idx(subgraph_path) + ] + ] + ) + running_states[f"pass_{current_pass_id + 1}"] = { - "num_incorrect_models": len(next_round_models), + "num_incorrect_models": len(original_model_paths), "incorrect_models": list(next_round_models), } - print(f"[Analysis] Found {len(next_round_models)} incorrect subgraphs.\n") + print( + f"[Analysis] Found {len(next_round_models)} incorrect subgraphs ({len(original_model_paths)} original models)." + ) for idx, model_path in enumerate(next_round_models): print(f"- [{idx}] {model_path}") From 11965497f65873b7ceb10a373a735770e0a19113 Mon Sep 17 00:00:00 2001 From: Liu Yiqun Date: Mon, 8 Dec 2025 17:32:00 +0800 Subject: [PATCH 5/7] Optimzie codes. --- .../subgraph_decompose_and_evaluation_step.py | 110 +++++++++--------- 1 file changed, 58 insertions(+), 52 deletions(-) diff --git a/graph_net/subgraph_decompose_and_evaluation_step.py b/graph_net/subgraph_decompose_and_evaluation_step.py index 29546de00..308cb413b 100755 --- a/graph_net/subgraph_decompose_and_evaluation_step.py +++ b/graph_net/subgraph_decompose_and_evaluation_step.py @@ -139,6 +139,19 @@ def load(self, work_dir): def get_config_path(self, work_dir) -> str: return os.path.join(work_dir, "decompose_config.json") + def update_running_states(self, pass_id, **kwargs): + pass_key = get_pass_name(pass_id) + if self.running_states.get(pass_key, None) is None: + self.running_states[pass_key] = {} + + for key, value in kwargs.items(): + assert key in [ + "num_incorrect_models", + "incorrect_models", + "failed_decomposition_models", + ] + self.running_states[pass_key][key] = value + def get_rectfied_model_path(model_path): graphnet_root = path_utils.get_graphnet_root() @@ -268,11 +281,10 @@ def run_evaluation( def reconstruct_subgraph_size(split_positions: List[int]) -> List[list]: """Reconstructs subgraph size based on sorted split positions.""" - deduplicated_splits = list(dict.fromkeys(split_positions)) + deduplicated_splits = sorted(set(split_positions)) subgraph_size = [ - [deduplicated_splits[i], deduplicated_splits[i + 1]] - for i in range(len(deduplicated_splits) - 1) + deduplicated_splits[i : i + 2] for i in range(len(deduplicated_splits) - 1) ] return subgraph_size @@ -328,7 +340,7 @@ def extract_model_name_and_subgraph_idx(subgraph_path): return model_name, subgraph_idx -def generate_refined_tasks(base_output_dir, current_pass_id): +def generate_successor_tasks(base_output_dir, current_pass_id): """Generates tasks for Pass > 0 based on previous pass results.""" prev_pass_dir = get_decompose_workspace_path(base_output_dir, current_pass_id - 1) print(f"[Init] Resuming from Pass_{current_pass_id - 1} (Dir: {prev_pass_dir})...") @@ -377,7 +389,7 @@ def prepare_tasks_and_verify(args, current_pass_id, base_output_dir): if current_pass_id == 0: tasks_map, max_subgraph_size, running_states = generate_initial_tasks(args) else: - tasks_map, max_subgraph_size, running_states = generate_refined_tasks( + tasks_map, max_subgraph_size, running_states = generate_successor_tasks( base_output_dir, current_pass_id ) @@ -435,15 +447,13 @@ def execute_decomposition_phase(max_subgraph_size, tasks_map, framework, workspa os.makedirs(decomposed_samples_dir, exist_ok=True) max_subgraph_size = max(1, max_subgraph_size // 2) for model_name, task_info in tasks_map.items(): - splits = task_info["split_positions"] - if not splits or len(splits) < 2: + split_positions = task_info["split_positions"] + if not split_positions or len(split_positions) < 2: continue - start_pos = splits[0] - first_segment_end = splits[1] - new_splits = calculate_split_positions_for_subgraph( - [start_pos, first_segment_end], max_subgraph_size + new_split_positions = calculate_split_positions_for_subgraph( + split_positions[0:2], max_subgraph_size ) - task_info["split_positions"] = new_splits + task_info["split_positions"] = new_split_positions else: need_decompose = False print() @@ -454,6 +464,15 @@ def execute_decomposition_phase(max_subgraph_size, tasks_map, framework, workspa return tasks_map, failed_decomposition, max_subgraph_size +def count_unique_original_models(incorrect_models): + original_model_paths = set( + model_name + for subgraph_path in incorrect_models + for model_name, _ in [extract_model_name_and_subgraph_idx(subgraph_path)] + ) + return len(original_model_paths) + + def print_summary_and_suggestion(next_round_models, max_subgraph_size): """Print suggestion/result.""" print("\n" + "=" * 80) @@ -480,9 +499,14 @@ def main(args): tasks_map, max_subgraph_size, running_states = prepare_tasks_and_verify( args, current_pass_id, base_output_dir ) - pass_work_dir = get_decompose_workspace_path(base_output_dir, current_pass_id) - if not os.path.exists(pass_work_dir): - os.makedirs(pass_work_dir, exist_ok=True) + decompose_config = DecomposeConfig( + max_subgraph_size=max_subgraph_size, + tasks_map=tasks_map, + running_states=running_states, + ) + work_dir = get_decompose_workspace_path(base_output_dir, current_pass_id) + if not os.path.exists(work_dir): + os.makedirs(work_dir, exist_ok=True) # --- Step 2: Decomposition --- if task_controller.task_scheduler["run_decomposer"]: @@ -492,63 +516,45 @@ def main(args): failed_decomposition, max_subgraph_size, ) = execute_decomposition_phase( - max_subgraph_size, tasks_map, args.framework, pass_work_dir + max_subgraph_size, tasks_map, args.framework, work_dir + ) + decompose_config.update_running_states( + current_pass_id, failed_decomposition_models=list(failed_decomposition) ) - running_states.get(f"pass_{current_pass_id}", {})[ - "failed_decomposition_models" - ] = list(failed_decomposition) else: print("\n--- Phase 1: Decomposition (skipped) ---", flush=True) - config = DecomposeConfig.load(pass_work_dir) - max_subgraph_size = config.max_subgraph_size - tasks_map = config.tasks_map - running_states = config.running_states + decompose_config = DecomposeConfig.load(work_dir) # --- Step 3: Evaluation --- - pass_log_path = os.path.join(pass_work_dir, "batch_test_result.log") + log_path = os.path.join(work_dir, f"log_{task_controller.test_module_name}.txt") if task_controller.task_scheduler["run_evaluation"]: print(f"\n--- Phase 2: Evaluation ({task_controller.test_module_name}) ---") - run_evaluation(args.framework, args.test_config, pass_work_dir, pass_log_path) + run_evaluation(args.framework, args.test_config, work_dir, log_path) # --- Step 4: Analysis --- - next_round_models = set() + next_pass_incorrect_models = set() if task_controller.task_scheduler["post_analysis"]: tolerance = ( args.tolerance[0] if isinstance(args.tolerance, list) else args.tolerance ) print(f"\n--- Phase 3: Analysis (torlance={tolerance}) ---") - next_round_models = sorted(get_incorrect_models(tolerance, pass_log_path)) - original_model_paths = set( - [ - model_name - for subgraph_path in next_round_models - for model_name, _ in [ - extract_model_name_and_subgraph_idx(subgraph_path) - ] - ] + next_pass_incorrect_models = sorted(get_incorrect_models(tolerance, log_path)) + num_original_models = count_unique_original_models(next_pass_incorrect_models) + decompose_config.update_running_states( + current_pass_id + 1, + num_incorrect_models=num_original_models, + incorrect_models=list(next_pass_incorrect_models), ) - - running_states[f"pass_{current_pass_id + 1}"] = { - "num_incorrect_models": len(original_model_paths), - "incorrect_models": list(next_round_models), - } - print( - f"[Analysis] Found {len(next_round_models)} incorrect subgraphs ({len(original_model_paths)} original models)." + f"[Analysis] Found {len(next_pass_incorrect_models)} incorrect subgraphs ({num_original_models} original models)." ) - for idx, model_path in enumerate(next_round_models): + for idx, model_path in enumerate(next_pass_incorrect_models): print(f"- [{idx}] {model_path}") - - print_summary_and_suggestion(next_round_models, max_subgraph_size) + print_summary_and_suggestion(next_pass_incorrect_models, max_subgraph_size) # --- Step 5: Save States --- - config = DecomposeConfig( - max_subgraph_size=max_subgraph_size, - incorrect_models=list(next_round_models), - tasks_map=tasks_map, - running_states=running_states, - ) - config.save(pass_work_dir) + decompose_config.incorrect_models = list(next_pass_incorrect_models) + decompose_config.save(work_dir) if __name__ == "__main__": From d7c91a265923c3c195a485099cbb2228f4b6e296 Mon Sep 17 00:00:00 2001 From: Liu Yiqun Date: Wed, 10 Dec 2025 11:25:58 +0800 Subject: [PATCH 6/7] Opimize codes. --- .../subgraph_decompose_and_evaluation_step.py | 55 +++++++------------ 1 file changed, 20 insertions(+), 35 deletions(-) diff --git a/graph_net/subgraph_decompose_and_evaluation_step.py b/graph_net/subgraph_decompose_and_evaluation_step.py index 5a45d20d2..56bd7282e 100755 --- a/graph_net/subgraph_decompose_and_evaluation_step.py +++ b/graph_net/subgraph_decompose_and_evaluation_step.py @@ -279,27 +279,18 @@ def run_evaluation( ), f"[ERROR] test failed for {samples_dir}, please check the log." -def reconstruct_subgraph_size(split_positions: List[int]) -> List[list]: - """Reconstructs subgraph size based on sorted split positions.""" - deduplicated_splits = sorted(set(split_positions)) - - subgraph_size = [ - deduplicated_splits[i : i + 2] for i in range(len(deduplicated_splits) - 1) - ] - return subgraph_size - - -def calculate_split_positions_for_subgraph(subgraph_range, max_subgraph_size): - assert isinstance(subgraph_range, (list, tuple)) and len(subgraph_range) == 2 - - # subgraph_size: the start and end position in original model. - start_pos, end_pos = subgraph_range - end_pos = kMaxGraphSize if end_pos == float("inf") else end_pos +def reconstruct_split_positions_for_subgraph( + split_positions, subgraph_idx, max_subgraph_size +): + assert ( + subgraph_idx < len(split_positions) - 1 + ), f"subgraph_idx {subgraph_idx} is out of bounds of split_positions: {split_positions}." - split_positions = set( + start_pos, end_pos = split_positions[subgraph_idx : subgraph_idx + 2] + new_split_positions = set( range(start_pos, end_pos + max_subgraph_size - 1, max_subgraph_size) ) - return sorted(list(set(split_positions))) + return sorted(list(new_split_positions)) def generate_initial_tasks(args): @@ -310,17 +301,14 @@ def generate_initial_tasks(args): tasks_map = {} max_subgraph_size = args.max_subgraph_size + initial_split_positions = reconstruct_split_positions_for_subgraph( + [0, kMaxGraphSize], 0, max_subgraph_size + ) for model_path in initial_failures: model_name = get_model_name_with_subgraph_tag(model_path) - - initial_range = [0, kMaxGraphSize] - initial_splits = calculate_split_positions_for_subgraph( - initial_range, max_subgraph_size - ) - tasks_map[model_name] = { "original_path": model_path, - "split_positions": initial_splits, + "split_positions": initial_split_positions, } running_states = { @@ -355,19 +343,14 @@ def generate_successor_tasks(base_output_dir, current_pass_id): for subgraph_path in sorted(prev_config.incorrect_models): model_name, subgraph_idx = extract_model_name_and_subgraph_idx(subgraph_path) + print(f"{subgraph_path=}") assert model_name in prev_tasks_map pre_task_for_model = prev_tasks_map[model_name] prev_split_positions = pre_task_for_model.get("split_positions", []) - subgraph_ranges = reconstruct_subgraph_size(prev_split_positions) - - assert subgraph_idx < len( - subgraph_ranges - ), f"subgraph_idx {subgraph_idx} is out of bounds for {model_name} (previous split_positions: {prev_split_positions})" - - split_positions = calculate_split_positions_for_subgraph( - subgraph_ranges[subgraph_idx], max_subgraph_size + split_positions = reconstruct_split_positions_for_subgraph( + prev_split_positions, subgraph_idx, max_subgraph_size ) if model_name not in tasks_map: tasks_map[model_name] = { @@ -381,6 +364,7 @@ def generate_successor_tasks(base_output_dir, current_pass_id): tasks_map[model_name]["split_positions"] = list( sorted(set(merged_split_positions)) ) + print(f"{tasks_map=}") return tasks_map, max_subgraph_size, prev_config.running_states @@ -409,6 +393,7 @@ def prepare_tasks_and_verify(args, current_pass_id, base_output_dir): ) sys.exit(0) + sys.exit(0) return tasks_map, max_subgraph_size, running_states @@ -450,8 +435,8 @@ def execute_decomposition_phase(max_subgraph_size, tasks_map, framework, workspa split_positions = task_info["split_positions"] if not split_positions or len(split_positions) < 2: continue - new_split_positions = calculate_split_positions_for_subgraph( - split_positions[0:2], max_subgraph_size + new_split_positions = reconstruct_split_positions_for_subgraph( + split_positions, 0, max_subgraph_size ) task_info["split_positions"] = new_split_positions else: From 7cfd4ebd63f7bcc1d4592c11d34e62390eb2f5a1 Mon Sep 17 00:00:00 2001 From: Liu Yiqun Date: Wed, 10 Dec 2025 14:48:34 +0800 Subject: [PATCH 7/7] Support fixed-start method. --- .../subgraph_decompose_and_evaluation_step.py | 122 ++++++++++++------ 1 file changed, 80 insertions(+), 42 deletions(-) diff --git a/graph_net/subgraph_decompose_and_evaluation_step.py b/graph_net/subgraph_decompose_and_evaluation_step.py index 56bd7282e..c296a6539 100755 --- a/graph_net/subgraph_decompose_and_evaluation_step.py +++ b/graph_net/subgraph_decompose_and_evaluation_step.py @@ -111,8 +111,9 @@ def _print(self): @dataclass class DecomposeConfig: + method: str + tolerance: int | List[int] max_subgraph_size: int = -1 - incorrect_models: List[str] = field(default_factory=list) tasks_map: Dict[str, Union[int, str, list, dict]] = field(default_factory=dict) running_states: Dict[str, Union[int, str, list, dict]] = field(default_factory=dict) @@ -139,6 +140,11 @@ def load(self, work_dir): def get_config_path(self, work_dir) -> str: return os.path.join(work_dir, "decompose_config.json") + def get_incorrect_models(self, pass_id): + pass_key = get_pass_name(pass_id) + assert pass_key in self.running_states + return self.running_states[pass_key]["incorrect_models"] + def update_running_states(self, pass_id, **kwargs): pass_key = get_pass_name(pass_id) if self.running_states.get(pass_key, None) is None: @@ -229,8 +235,13 @@ def run_decomposer_for_multi_models( ) for model_name, task_info in tasks_map.items(): original_path = task_info["original_path"] - split_positions = sorted(list(task_info["split_positions"])) + + method = "fixed-start" + if method == "fixed-start": + assert len(split_positions) >= 3, f"{split_positions=}" + split_positions = [0, split_positions[1]] + rectified_model_path = get_rectfied_model_path(original_path) assert os.path.exists( rectified_model_path @@ -279,18 +290,22 @@ def run_evaluation( ), f"[ERROR] test failed for {samples_dir}, please check the log." -def reconstruct_split_positions_for_subgraph( - split_positions, subgraph_idx, max_subgraph_size +def reconstruct_split_positions_for_subgraphs( + split_positions, subgraph_idxs, max_subgraph_size ): - assert ( - subgraph_idx < len(split_positions) - 1 - ), f"subgraph_idx {subgraph_idx} is out of bounds of split_positions: {split_positions}." + subgraph_idxs = [subgraph_idxs] if isinstance(subgraph_idxs, int) else subgraph_idxs - start_pos, end_pos = split_positions[subgraph_idx : subgraph_idx + 2] - new_split_positions = set( - range(start_pos, end_pos + max_subgraph_size - 1, max_subgraph_size) - ) - return sorted(list(new_split_positions)) + new_split_positions = [] + for subgraph_idx in subgraph_idxs: + assert ( + subgraph_idx < len(split_positions) - 1 + ), f"subgraph_idx {subgraph_idx} is out of bounds of split_positions: {split_positions}." + + start_pos, end_pos = split_positions[subgraph_idx : subgraph_idx + 2] + new_split_positions = new_split_positions + list( + range(start_pos, end_pos + max_subgraph_size, max_subgraph_size) + ) + return sorted(list(set(new_split_positions))) def generate_initial_tasks(args): @@ -299,9 +314,9 @@ def generate_initial_tasks(args): initial_failures = get_ranged_incorrect_models(args.tolerance, args.log_file) tasks_map = {} - max_subgraph_size = args.max_subgraph_size + max_subgraph_size = min(args.max_subgraph_size, kMaxGraphSize // 2) - initial_split_positions = reconstruct_split_positions_for_subgraph( + initial_split_positions = reconstruct_split_positions_for_subgraphs( [0, kMaxGraphSize], 0, max_subgraph_size ) for model_path in initial_failures: @@ -328,42 +343,61 @@ def extract_model_name_and_subgraph_idx(subgraph_path): return model_name, subgraph_idx -def generate_successor_tasks(base_output_dir, current_pass_id): +def collect_incorrect_subgraph_idxs(args, target_model_names, incorrect_models): + model_name2subgraph_idxs = {} + for subgraph_path in sorted(incorrect_models): + model_name, subgraph_idx = extract_model_name_and_subgraph_idx(subgraph_path) + print(f"{subgraph_path=}") + print(f"{model_name=}, {subgraph_idx=}") + assert model_name in target_model_names, f"{model_name=}, {subgraph_idx=}" + + if model_name not in model_name2subgraph_idxs: + model_name2subgraph_idxs[model_name] = [] + model_name2subgraph_idxs[model_name].append(subgraph_idx) + + if args.method == "fixed-start": + print(model_name2subgraph_idxs) + for model_name in target_model_names: + if model_name not in model_name2subgraph_idxs: + model_name2subgraph_idxs[model_name] = [1] + else: + assert len( + model_name2subgraph_idxs[model_name] + ) == 1 and model_name2subgraph_idxs[model_name] == [0] + return model_name2subgraph_idxs + + +def generate_successor_tasks(args, base_output_dir, current_pass_id): """Generates tasks for Pass > 0 based on previous pass results.""" prev_pass_dir = get_decompose_workspace_path(base_output_dir, current_pass_id - 1) print(f"[Init] Resuming from Pass_{current_pass_id - 1} (Dir: {prev_pass_dir})...") prev_config = DecomposeConfig.load(prev_pass_dir) max_subgraph_size = prev_config.max_subgraph_size // 2 - if not prev_config.incorrect_models: + incorrect_models = prev_config.get_incorrect_models(current_pass_id) + if args.method != "fixed-start" and not incorrect_models: return {}, max_subgraph_size, prev_config.running_states tasks_map = {} prev_tasks_map = prev_config.tasks_map - for subgraph_path in sorted(prev_config.incorrect_models): - model_name, subgraph_idx = extract_model_name_and_subgraph_idx(subgraph_path) - print(f"{subgraph_path=}") + target_model_names = list(prev_tasks_map.keys()) + model_name2subgraph_idxs = collect_incorrect_subgraph_idxs( + args, target_model_names, incorrect_models + ) - assert model_name in prev_tasks_map + for model_name, subgraph_idxs in model_name2subgraph_idxs.items(): pre_task_for_model = prev_tasks_map[model_name] prev_split_positions = pre_task_for_model.get("split_positions", []) - split_positions = reconstruct_split_positions_for_subgraph( - prev_split_positions, subgraph_idx, max_subgraph_size + split_positions = reconstruct_split_positions_for_subgraphs( + prev_split_positions, subgraph_idxs, max_subgraph_size ) - if model_name not in tasks_map: - tasks_map[model_name] = { - "original_path": pre_task_for_model["original_path"], - "split_positions": list(sorted(split_positions)), - } - else: - merged_split_positions = ( - tasks_map[model_name]["split_positions"] + split_positions - ) - tasks_map[model_name]["split_positions"] = list( - sorted(set(merged_split_positions)) - ) + + tasks_map[model_name] = { + "original_path": pre_task_for_model["original_path"], + "split_positions": split_positions, + } print(f"{tasks_map=}") return tasks_map, max_subgraph_size, prev_config.running_states @@ -374,7 +408,7 @@ def prepare_tasks_and_verify(args, current_pass_id, base_output_dir): tasks_map, max_subgraph_size, running_states = generate_initial_tasks(args) else: tasks_map, max_subgraph_size, running_states = generate_successor_tasks( - base_output_dir, current_pass_id + args, base_output_dir, current_pass_id ) print(f"[Init] initial max_subgraph_size: {max_subgraph_size}") @@ -393,7 +427,6 @@ def prepare_tasks_and_verify(args, current_pass_id, base_output_dir): ) sys.exit(0) - sys.exit(0) return tasks_map, max_subgraph_size, running_states @@ -402,6 +435,7 @@ def execute_decomposition_phase(max_subgraph_size, tasks_map, framework, workspa failed_decomposition = [] need_decompose = True if len(tasks_map) > 0 else False + method = "fixed-start" while need_decompose: decomposed_samples_dir = os.path.join( @@ -426,6 +460,7 @@ def execute_decomposition_phase(max_subgraph_size, tasks_map, framework, workspa not failed_decomposition and num_decomposed_samples == len(tasks_map) and max_subgraph_size > 1 + and method != "fixed-start" ): need_decompose = True shutil.rmtree(decomposed_samples_dir) @@ -435,7 +470,7 @@ def execute_decomposition_phase(max_subgraph_size, tasks_map, framework, workspa split_positions = task_info["split_positions"] if not split_positions or len(split_positions) < 2: continue - new_split_positions = reconstruct_split_positions_for_subgraph( + new_split_positions = reconstruct_split_positions_for_subgraphs( split_positions, 0, max_subgraph_size ) task_info["split_positions"] = new_split_positions @@ -458,8 +493,7 @@ def count_unique_original_models(incorrect_models): return len(original_model_paths) -def print_summary_and_suggestion(next_round_models, max_subgraph_size): - """Print suggestion/result.""" +def print_summary_and_suggestion(args, next_round_models, max_subgraph_size): print("\n" + "=" * 80) if next_round_models and max_subgraph_size > 1: print(f">>> [SUGGESTION] Issues remain (Count: {len(next_round_models)}).") @@ -485,6 +519,8 @@ def main(args): args, current_pass_id, base_output_dir ) decompose_config = DecomposeConfig( + method=args.method, + tolerance=args.tolerance, max_subgraph_size=max_subgraph_size, tasks_map=tasks_map, running_states=running_states, @@ -517,7 +553,6 @@ def main(args): run_evaluation(args.framework, args.test_config, work_dir, log_path) # --- Step 4: Analysis --- - next_pass_incorrect_models = set() if task_controller.task_scheduler["post_analysis"]: tolerance = ( args.tolerance[0] if isinstance(args.tolerance, list) else args.tolerance @@ -530,15 +565,17 @@ def main(args): num_incorrect_models=num_original_models, incorrect_models=list(next_pass_incorrect_models), ) + print( f"[Analysis] Found {len(next_pass_incorrect_models)} incorrect subgraphs ({num_original_models} original models)." ) for idx, model_path in enumerate(next_pass_incorrect_models): print(f"- [{idx}] {model_path}") - print_summary_and_suggestion(next_pass_incorrect_models, max_subgraph_size) + print_summary_and_suggestion( + args, next_pass_incorrect_models, max_subgraph_size + ) # --- Step 5: Save States --- - decompose_config.incorrect_models = list(next_pass_incorrect_models) decompose_config.save(work_dir) @@ -550,6 +587,7 @@ def main(args): parser.add_argument( "--test-config", type=str, required=True, help="Base64 encoded test config" ) + parser.add_argument("--method", type=str, required=True) parser.add_argument( "--tolerance", type=int,