diff --git a/graph_net/subgraph_decompose_and_evaluation_step.py b/graph_net/subgraph_decompose_and_evaluation_step.py index 20ae6dbb2..c296a6539 100755 --- a/graph_net/subgraph_decompose_and_evaluation_step.py +++ b/graph_net/subgraph_decompose_and_evaluation_step.py @@ -7,7 +7,8 @@ import argparse import subprocess import glob -from typing import List, Set, Dict, Any, Union +from dataclasses import dataclass, field +from typing import List, Dict, Union from graph_net.analysis_util import get_incorrect_models from graph_net import path_utils @@ -18,9 +19,16 @@ def convert_b64_string_to_json(b64str): return json.loads(base64.b64decode(b64str).decode("utf-8")) +def convert_json_to_b64_string(config): + return base64.b64encode(json.dumps(config).encode()).decode() + + +def get_pass_name(pass_id): + return f"pass_{pass_id}" + + def get_ranged_incorrect_models(tolerance_args: List[int], log_path: str) -> set: - if not os.path.exists(log_path): - return set() + assert os.path.exists(log_path) t_start = tolerance_args[0] models_start = set(get_incorrect_models(t_start, log_path)) @@ -31,14 +39,10 @@ def get_ranged_incorrect_models(tolerance_args: List[int], log_path: str) -> set t_end = tolerance_args[1] models_end = set(get_incorrect_models(t_end, log_path)) - print(f"[Filter] Tolerance Range: {t_start} -> {t_end}") print( - f"[Filter] Fail({t_start}): {len(models_start)}, Fail({t_end}): {len(models_end)}" + f"[Init] number of incorrect models: {len(models_start)} (tolerance={t_start}) - {len(models_end)} (tolerance={t_end})" ) - - diff_set = models_start - models_end - - return diff_set + return models_start - models_end class TaskController: @@ -105,6 +109,56 @@ def _print(self): print() +@dataclass +class DecomposeConfig: + method: str + tolerance: int | List[int] + max_subgraph_size: int = -1 + tasks_map: Dict[str, Union[int, str, list, dict]] = field(default_factory=dict) + running_states: Dict[str, Union[int, str, list, dict]] = field(default_factory=dict) + + def save(self, work_dir): + """Save the current config to a JSON file.""" + config_path = self.get_config_path(work_dir) + + with open(config_path, "w") as f: + json.dump(self.__dict__, f, indent=4) + print(f"\n[INFO] Save state to: {config_path}") + + @classmethod + def load(self, work_dir): + """Initialize a object from a JSON file.""" + config_path = self.get_config_path(work_dir) + if not os.path.exists(config_path): + raise FileNotFoundError(f"Missing configuration file: {config_path}") + + with open(config_path, "r") as f: + data = json.load(f) + return self(**data) + + @classmethod + def get_config_path(self, work_dir) -> str: + return os.path.join(work_dir, "decompose_config.json") + + def get_incorrect_models(self, pass_id): + pass_key = get_pass_name(pass_id) + assert pass_key in self.running_states + return self.running_states[pass_key]["incorrect_models"] + + def update_running_states(self, pass_id, **kwargs): + pass_key = get_pass_name(pass_id) + if self.running_states.get(pass_key, None) is None: + self.running_states[pass_key] = {} + + for key, value in kwargs.items(): + assert key in [ + "num_incorrect_models", + "incorrect_models", + "failed_decomposition_models", + ] + self.running_states[pass_key][key] = value + + def get_rectfied_model_path(model_path): graphnet_root = path_utils.get_graphnet_root() return os.path.join(graphnet_root, model_path.split("GraphNet/")[-1]) @@ -118,53 +172,10 @@ def count_samples(samples_dir): return num_samples -def get_decompose_config_path(output_dir: str) -> str: - """Returns the full path to the decompose configuration file.""" - return os.path.join(output_dir, "decompose_config.json") - - def get_decompose_workspace_path(output_dir, pass_id): return os.path.join(output_dir, f"pass_{pass_id}") -def load_decompose_config(work_dir: str) -> Dict[str, Any]: - """Loads the configuration file from the previous pass.""" - config_path = get_decompose_config_path(work_dir) - - if not os.path.exists(config_path): - raise FileNotFoundError(f"Missing configuration file: {config_path}") - with open(config_path, "r") as f: - return json.load(f) - - -def save_decompose_config( - workspace: str, - max_subgraph_size: int, - tasks_map: Dict[str, Union[int, str, list, dict]], - incorrect_paths: Union[List[str], Set[str]], - failed_decomposition_models: Union[List[str], Set[str]], -): - """Saves the current state to a JSON file.""" - - tasks_map_copy = {} - for model_name, task_info in tasks_map.items(): - tasks_map_copy[model_name] = {} - for key in ["original_path", "split_positions"]: - tasks_map_copy[model_name][key] = task_info[key] - - config = { - "max_subgraph_size": max_subgraph_size, - "incorrect_models": list(incorrect_paths), - "tasks_map": tasks_map_copy, - "failed_decomposition_models": list(failed_decomposition_models), - } - config_path = get_decompose_config_path(workspace) - - with open(config_path, "w") as f: - json.dump(config, f, indent=4) - print(f"\n[INFO] Save state to: {config_path}") - - def get_model_name_with_subgraph_tag(model_path): fields = model_path.rstrip("/").split(os.sep) pattern = r"^subgraph(_\d+)?$" @@ -195,9 +206,7 @@ def run_decomposer_for_single_model( }, }, } - decorator_config_b64 = base64.b64encode( - json.dumps(decorator_config).encode() - ).decode() + decorator_config_b64 = convert_json_to_b64_string(decorator_config) print( f"[Decomposition] model_path: {model_path}, split_positions: {split_positions}" @@ -226,8 +235,13 @@ def run_decomposer_for_multi_models( ) for model_name, task_info in tasks_map.items(): original_path = task_info["original_path"] - split_positions = sorted(list(task_info["split_positions"])) + + method = "fixed-start" + if method == "fixed-start": + assert len(split_positions) >= 3, f"{split_positions=}" + split_positions = [0, split_positions[1]] + rectified_model_path = get_rectfied_model_path(original_path) assert os.path.exists( rectified_model_path @@ -276,28 +290,22 @@ def run_evaluation( ), f"[ERROR] test failed for {samples_dir}, please check the log." -def reconstruct_subgraph_size(split_positions: List[int]) -> List[list]: - """Reconstructs subgraph size based on sorted split positions.""" - deduplicated_splits = list(dict.fromkeys(split_positions)) - - subgraph_size = [ - [deduplicated_splits[i], deduplicated_splits[i + 1]] - for i in range(len(deduplicated_splits) - 1) - ] - return subgraph_size - - -def calculate_split_positions_for_subgraph(subgraph_size, max_subgraph_size): - assert isinstance(subgraph_size, (list, tuple)) and len(subgraph_size) == 2 +def reconstruct_split_positions_for_subgraphs( + split_positions, subgraph_idxs, max_subgraph_size +): + subgraph_idxs = [subgraph_idxs] if isinstance(subgraph_idxs, int) else subgraph_idxs - # subgraph_size: the start and end position in original model. - start_pos, end_pos = subgraph_size - end_pos = kMaxGraphSize if end_pos == float("inf") else end_pos + new_split_positions = [] + for subgraph_idx in subgraph_idxs: + assert ( + subgraph_idx < len(split_positions) - 1 + ), f"subgraph_idx {subgraph_idx} is out of bounds of split_positions: {split_positions}." - split_positions = list(range(start_pos, end_pos + 1, max_subgraph_size)) - if split_positions[-1] != end_pos: - split_positions.append(end_pos) - return sorted(list(set(split_positions))) + start_pos, end_pos = split_positions[subgraph_idx : subgraph_idx + 2] + new_split_positions = new_split_positions + list( + range(start_pos, end_pos + max_subgraph_size, max_subgraph_size) + ) + return sorted(list(set(new_split_positions))) def generate_initial_tasks(args): @@ -306,94 +314,108 @@ def generate_initial_tasks(args): initial_failures = get_ranged_incorrect_models(args.tolerance, args.log_file) tasks_map = {} - max_subgraph_size = args.max_subgraph_size + max_subgraph_size = min(args.max_subgraph_size, kMaxGraphSize // 2) + initial_split_positions = reconstruct_split_positions_for_subgraphs( + [0, kMaxGraphSize], 0, max_subgraph_size + ) for model_path in initial_failures: model_name = get_model_name_with_subgraph_tag(model_path) - - initial_range = [0, kMaxGraphSize] - initial_splits = calculate_split_positions_for_subgraph( - initial_range, max_subgraph_size - ) - tasks_map[model_name] = { - "subgraph_path": model_path, "original_path": model_path, - "split_positions": initial_splits, + "split_positions": initial_split_positions, } - for task in tasks_map.values(): - task["split_positions"] = sorted(list(task["split_positions"])) - - return tasks_map, max_subgraph_size - - -def generate_refined_tasks(base_output_dir, current_pass_id): + running_states = { + "pass_0": { + "num_incorrect_models": len(initial_failures), + "incorrect_models": list(sorted(initial_failures)), + } + } + return tasks_map, max_subgraph_size, running_states + + +def extract_model_name_and_subgraph_idx(subgraph_path): + # Parse model name and subgraph index + model_name_with_subgraph_idx = subgraph_path.rstrip("/").split(os.sep)[-1] + model_name = "_".join(model_name_with_subgraph_idx.split("_")[:-1]) + subgraph_idx = int(model_name_with_subgraph_idx.split("_")[-1]) + return model_name, subgraph_idx + + +def collect_incorrect_subgraph_idxs(args, target_model_names, incorrect_models): + model_name2subgraph_idxs = {} + for subgraph_path in sorted(incorrect_models): + model_name, subgraph_idx = extract_model_name_and_subgraph_idx(subgraph_path) + print(f"{subgraph_path=}") + print(f"{model_name=}, {subgraph_idx=}") + assert model_name in target_model_names, f"{model_name=}, {subgraph_idx=}" + + if model_name not in model_name2subgraph_idxs: + model_name2subgraph_idxs[model_name] = [] + model_name2subgraph_idxs[model_name].append(subgraph_idx) + + if args.method == "fixed-start": + print(model_name2subgraph_idxs) + for model_name in target_model_names: + if model_name not in model_name2subgraph_idxs: + model_name2subgraph_idxs[model_name] = [1] + else: + assert len( + model_name2subgraph_idxs[model_name] + ) == 1 and model_name2subgraph_idxs[model_name] == [0] + return model_name2subgraph_idxs + + +def generate_successor_tasks(args, base_output_dir, current_pass_id): """Generates tasks for Pass > 0 based on previous pass results.""" prev_pass_dir = get_decompose_workspace_path(base_output_dir, current_pass_id - 1) print(f"[Init] Resuming from Pass_{current_pass_id - 1} (Dir: {prev_pass_dir})...") - prev_config = load_decompose_config(prev_pass_dir) - prev_incorrect_subgraphs = prev_config.get("incorrect_models", []) - prev_tasks_map = prev_config.get("tasks_map", {}) - - prev_max_subgraph_size = prev_config.get("max_subgraph_size") - max_subgraph_size = prev_max_subgraph_size // 2 - - if not prev_incorrect_subgraphs: - return {}, max_subgraph_size + prev_config = DecomposeConfig.load(prev_pass_dir) + max_subgraph_size = prev_config.max_subgraph_size // 2 + incorrect_models = prev_config.get_incorrect_models(current_pass_id) + if args.method != "fixed-start" and not incorrect_models: + return {}, max_subgraph_size, prev_config.running_states tasks_map = {} - for subgraph_path in prev_incorrect_subgraphs: - # Parse model name and subgraph index - model_name_with_subgraph_idx = subgraph_path.rstrip("/").split(os.sep)[-1] - model_name = "_".join(model_name_with_subgraph_idx.split("_")[:-1]) - subgraph_idx = int(model_name_with_subgraph_idx.split("_")[-1]) + prev_tasks_map = prev_config.tasks_map - assert model_name in prev_tasks_map + target_model_names = list(prev_tasks_map.keys()) + model_name2subgraph_idxs = collect_incorrect_subgraph_idxs( + args, target_model_names, incorrect_models + ) + + for model_name, subgraph_idxs in model_name2subgraph_idxs.items(): pre_task_for_model = prev_tasks_map[model_name] prev_split_positions = pre_task_for_model.get("split_positions", []) - subgraph_ranges = reconstruct_subgraph_size(prev_split_positions) - - assert subgraph_idx < len( - subgraph_ranges - ), f"subgraph_idx {subgraph_idx} is out of bounds for {model_name} (previous split_positions: {prev_split_positions})" - - current_fail_range = subgraph_ranges[subgraph_idx] - - new_splits = calculate_split_positions_for_subgraph( - current_fail_range, max_subgraph_size + split_positions = reconstruct_split_positions_for_subgraphs( + prev_split_positions, subgraph_idxs, max_subgraph_size ) - if model_name not in tasks_map: - tasks_map[model_name] = { - "subgraph_path": subgraph_path, - "original_path": pre_task_for_model["original_path"], - "split_positions": set(new_splits), - } - else: - tasks_map[model_name]["split_positions"].update(new_splits) - - for task in tasks_map.values(): - task["split_positions"] = sorted(list(task["split_positions"])) + tasks_map[model_name] = { + "original_path": pre_task_for_model["original_path"], + "split_positions": split_positions, + } + print(f"{tasks_map=}") - return tasks_map, max_subgraph_size + return tasks_map, max_subgraph_size, prev_config.running_states def prepare_tasks_and_verify(args, current_pass_id, base_output_dir): if current_pass_id == 0: - tasks_map, max_subgraph_size = generate_initial_tasks(args) + tasks_map, max_subgraph_size, running_states = generate_initial_tasks(args) else: - tasks_map, max_subgraph_size = generate_refined_tasks( - base_output_dir, current_pass_id + tasks_map, max_subgraph_size, running_states = generate_successor_tasks( + args, base_output_dir, current_pass_id ) - print(f"[INFO] initial max_subgraph_size: {max_subgraph_size}") - print(f"[INFO] number of incorrect models: {len(tasks_map)}") - for model_name, task_info in tasks_map.items(): + print(f"[Init] initial max_subgraph_size: {max_subgraph_size}") + print(f"[Init] number of incorrect models: {len(tasks_map)}") + for idx, (model_name, task_info) in enumerate(tasks_map.items()): original_path = task_info["original_path"] - print(f"- {original_path}") + print(f"- [{idx}] {original_path}") if not tasks_map: print("[FINISHED] No models need processing.") @@ -405,7 +427,7 @@ def prepare_tasks_and_verify(args, current_pass_id, base_output_dir): ) sys.exit(0) - return tasks_map, max_subgraph_size + return tasks_map, max_subgraph_size, running_states def execute_decomposition_phase(max_subgraph_size, tasks_map, framework, workspace): @@ -413,6 +435,7 @@ def execute_decomposition_phase(max_subgraph_size, tasks_map, framework, workspa failed_decomposition = [] need_decompose = True if len(tasks_map) > 0 else False + method = "fixed-start" while need_decompose: decomposed_samples_dir = os.path.join( @@ -437,21 +460,20 @@ def execute_decomposition_phase(max_subgraph_size, tasks_map, framework, workspa not failed_decomposition and num_decomposed_samples == len(tasks_map) and max_subgraph_size > 1 + and method != "fixed-start" ): need_decompose = True shutil.rmtree(decomposed_samples_dir) os.makedirs(decomposed_samples_dir, exist_ok=True) max_subgraph_size = max(1, max_subgraph_size // 2) for model_name, task_info in tasks_map.items(): - splits = task_info["split_positions"] - if not splits or len(splits) < 2: + split_positions = task_info["split_positions"] + if not split_positions or len(split_positions) < 2: continue - start_pos = splits[0] - first_segment_end = splits[1] - new_splits = calculate_split_positions_for_subgraph( - [start_pos, first_segment_end], max_subgraph_size + new_split_positions = reconstruct_split_positions_for_subgraphs( + split_positions, 0, max_subgraph_size ) - task_info["split_positions"] = new_splits + task_info["split_positions"] = new_split_positions else: need_decompose = False print() @@ -462,8 +484,16 @@ def execute_decomposition_phase(max_subgraph_size, tasks_map, framework, workspa return tasks_map, failed_decomposition, max_subgraph_size -def print_summary_and_suggestion(next_round_models, max_subgraph_size): - """Print suggestion/result.""" +def count_unique_original_models(incorrect_models): + original_model_paths = set( + model_name + for subgraph_path in incorrect_models + for model_name, _ in [extract_model_name_and_subgraph_idx(subgraph_path)] + ) + return len(original_model_paths) + + +def print_summary_and_suggestion(args, next_round_models, max_subgraph_size): print("\n" + "=" * 80) if next_round_models and max_subgraph_size > 1: print(f">>> [SUGGESTION] Issues remain (Count: {len(next_round_models)}).") @@ -485,15 +515,21 @@ def main(args): print("=" * 80) # --- Step 1: Prepare Tasks and Workspace --- - tasks_map, max_subgraph_size = prepare_tasks_and_verify( + tasks_map, max_subgraph_size, running_states = prepare_tasks_and_verify( args, current_pass_id, base_output_dir ) - pass_work_dir = get_decompose_workspace_path(base_output_dir, current_pass_id) - if not os.path.exists(pass_work_dir): - os.makedirs(pass_work_dir, exist_ok=True) + decompose_config = DecomposeConfig( + method=args.method, + tolerance=args.tolerance, + max_subgraph_size=max_subgraph_size, + tasks_map=tasks_map, + running_states=running_states, + ) + work_dir = get_decompose_workspace_path(base_output_dir, current_pass_id) + if not os.path.exists(work_dir): + os.makedirs(work_dir, exist_ok=True) # --- Step 2: Decomposition --- - failed_decomposition = [] if task_controller.task_scheduler["run_decomposer"]: print("\n--- Phase 1: Decomposition ---", flush=True) ( @@ -501,46 +537,46 @@ def main(args): failed_decomposition, max_subgraph_size, ) = execute_decomposition_phase( - max_subgraph_size, tasks_map, args.framework, pass_work_dir + max_subgraph_size, tasks_map, args.framework, work_dir + ) + decompose_config.update_running_states( + current_pass_id, failed_decomposition_models=list(failed_decomposition) ) else: - config = load_decompose_config(pass_work_dir) - max_subgraph_size = config["max_subgraph_size"] - failed_decomposition = config["failed_decomposition_models"] - tasks_map = config.get("tasks_map", {}) + print("\n--- Phase 1: Decomposition (skipped) ---", flush=True) + decompose_config = DecomposeConfig.load(work_dir) # --- Step 3: Evaluation --- - pass_log_path = os.path.join(pass_work_dir, "batch_test_result.log") + log_path = os.path.join(work_dir, f"log_{task_controller.test_module_name}.txt") if task_controller.task_scheduler["run_evaluation"]: - print("\n--- Phase 2: Evaluation ---") - run_evaluation(args.framework, args.test_config, pass_work_dir, pass_log_path) + print(f"\n--- Phase 2: Evaluation ({task_controller.test_module_name}) ---") + run_evaluation(args.framework, args.test_config, work_dir, log_path) # --- Step 4: Analysis --- - next_round_models = set() if task_controller.task_scheduler["post_analysis"]: - print("\n--- Phase 3: Analysis ---") - analysis_tolerance = ( + tolerance = ( args.tolerance[0] if isinstance(args.tolerance, list) else args.tolerance ) - next_round_models = get_incorrect_models(analysis_tolerance, pass_log_path) + print(f"\n--- Phase 3: Analysis (torlance={tolerance}) ---") + next_pass_incorrect_models = sorted(get_incorrect_models(tolerance, log_path)) + num_original_models = count_unique_original_models(next_pass_incorrect_models) + decompose_config.update_running_states( + current_pass_id + 1, + num_incorrect_models=num_original_models, + incorrect_models=list(next_pass_incorrect_models), + ) - print(f"[Analysis] Found {len(next_round_models)} incorrect subgraphs.\n") - if len(next_round_models) > 0: - print("[DEBUG] List of detected incorrect models:") - for idx, model_path in enumerate(sorted(list(next_round_models))): - print(f" [{idx}] {model_path}") - else: - print("[DEBUG] No incorrect models detected.") - print_summary_and_suggestion(next_round_models, max_subgraph_size) + print( + f"[Analysis] Found {len(next_pass_incorrect_models)} incorrect subgraphs ({num_original_models} original models)." + ) + for idx, model_path in enumerate(next_pass_incorrect_models): + print(f"- [{idx}] {model_path}") + print_summary_and_suggestion( + args, next_pass_incorrect_models, max_subgraph_size + ) # --- Step 5: Save States --- - save_decompose_config( - pass_work_dir, - max_subgraph_size, - tasks_map, - next_round_models, - failed_decomposition, - ) + decompose_config.save(work_dir) if __name__ == "__main__": @@ -551,6 +587,7 @@ def main(args): parser.add_argument( "--test-config", type=str, required=True, help="Base64 encoded test config" ) + parser.add_argument("--method", type=str, required=True) parser.add_argument( "--tolerance", type=int, diff --git a/paddle_samples/PaddleX/Mask-RT-DETR-S/subgraph_14/input_meta.py b/paddle_samples/PaddleX/Mask-RT-DETR-S/subgraph_14/input_meta.py index 9a01d6a58..2f14b05cb 100644 --- a/paddle_samples/PaddleX/Mask-RT-DETR-S/subgraph_14/input_meta.py +++ b/paddle_samples/PaddleX/Mask-RT-DETR-S/subgraph_14/input_meta.py @@ -240,9 +240,7 @@ class Program_weight_tensor_data_21: shape = [1, 8400, 4] dtype = "float32" min_val = float("-4.36945") - max_val = float("inf") - mean = float("inf") - std = float("nan") + max_val = float("5.0") data = None