diff --git a/graph_net/filter_fault_model_path.py b/graph_net/filter_fault_model_path.py new file mode 100644 index 000000000..eb5555a94 --- /dev/null +++ b/graph_net/filter_fault_model_path.py @@ -0,0 +1,105 @@ +import os +import json +import base64 +from pathlib import Path +from typing import Callable +from graph_net_bench.calculate_es_scores import calculate_es_scores_for_each_model_path +from graph_net.fault_locator.fault_detector import has_fault_at +from graph_net.declare_config_mixin import DeclareConfigMixin + + +class FaultModelPathFilter(DeclareConfigMixin): + def __init__(self, config=None): + self.init_config(config) + + def declare_config( + self, + log_file_path: str, + output_txt_file_path: str, + model_path_prefix: str, + graph_net_log_prompt: str = "graph-net-test-compiler-log", + tolerance: int = 0, + interpretation_type: str = "default", + negative_speedup_penalty: float = 0.0, + fpdb: float = 0.1, + enable_aggregation_mode: bool = True, + ): + """ + Configuration for filtering models with numerical faults. + """ + pass + + def _get_model_path_extractor(self) -> Callable[[str], str | None]: + """ + Creates an extractor that looks for: + '{graph_net_log_prompt} [Processing] {model_path}' + """ + prompt = self.config.get("graph_net_log_prompt", "graph-net-test-compiler-log") + header = f"{prompt} [Processing]" + + def extractor(line: str) -> str | None: + if line.startswith(header): + parts = line.split() + # Example: "graph-net-test-compiler-log [Processing] /path/to/model" + # Index 0: prompt, Index 1: [Processing], Index 2: model_path + if len(parts) >= 3: + return parts[2].strip() + return None + + return extractor + + def __call__(self): + # 1. Load log content + log_path = self.config["log_file_path"] + log_content = Path(log_path).read_text(encoding="utf-8") + + # 2. Extract scores grouped by model_path + # Uses the logic: cumsum -> groupby -> calculate_es_scores_for_log_contents + path2es_scores = calculate_es_scores_for_each_model_path( + log_content=log_content, + get_model_path_for_each_log_line=self._get_model_path_extractor(), + interpretation_type=self.config.get("interpretation_type", "default"), + negative_speedup_penalty=self.config.get("negative_speedup_penalty", 0.0), + fpdb=self.config.get("fpdb", 0.1), + enable_aggregation_mode=self.config.get("enable_aggregation_mode", True), + ) + + # 3. Filter for faults and convert to relative paths + faulty_relative_paths = [] + prefix = self.config["model_path_prefix"] + tolerance = self.config.get("tolerance", 0) + + for full_path, es_scores in path2es_scores.items(): + # Imported/Existing function check + if has_fault_at(es_scores, tolerance): + # Ensure we output paths relative to the model_path_prefix + rel_path = os.path.relpath(full_path, prefix) + faulty_relative_paths.append(rel_path) + + # 4. Save results (one model_path per line) + output_path = self.config["output_txt_file_path"] + os.makedirs(os.path.dirname(output_path), exist_ok=True) + + with open(output_path, "w", encoding="utf-8") as f: + for path in faulty_relative_paths: + f.write(f"{path}\n") + + print(f"[Success] Identified {len(faulty_relative_paths)} faulty models.") + print(f"[File] Saved to: {output_path}") + + +if __name__ == "__main__": + import sys + + # Expecting a single argument: base64 encoded JSON string + if len(sys.argv) > 1: + try: + encoded_config = sys.argv[1] + decoded_json = base64.b64decode(encoded_config).decode("utf-8") + config_dict = json.loads(decoded_json) + + filter_app = FaultModelPathFilter(config_dict) + filter_app() + except Exception as e: + print(f"Filter execution failed: {e}") + sys.exit(1) diff --git a/graph_net/test/fault_model_path_filter_test.py b/graph_net/test/fault_model_path_filter_test.py new file mode 100644 index 000000000..63fe94c62 --- /dev/null +++ b/graph_net/test/fault_model_path_filter_test.py @@ -0,0 +1,68 @@ +import unittest +import os +import shutil +from graph_net.filter_fault_model_path import FaultModelPathFilter + + +class TestFaultModelPathFilter(unittest.TestCase): + def setUp(self): + # Locate the root of the project + self.graph_net_root = "/workspace/GraphNet" + + # Paths based on your requirements + self.log_file = os.path.join( + self.graph_net_root, + "graph_net/test/data_calculate_es_scores/evaluation.log", + ) + self.output_txt = "/tmp/workspace_fault_filter/faulty_models.txt" + + # Clean up output directory + if os.path.exists(os.path.dirname(self.output_txt)): + shutil.rmtree(os.path.dirname(self.output_txt)) + os.makedirs(os.path.dirname(self.output_txt), exist_ok=True) + + # Initialize the filter with your requested config + self.config = { + "log_file_path": self.log_file, + "output_txt_file_path": self.output_txt, + "model_path_prefix": self.graph_net_root, + "tolerance": 0, # Default behavior + "graph_net_log_prompt": "graph-net-test-compiler-log", # Default behavior + } + self.filter_op = FaultModelPathFilter(self.config) + + def test_filter_execution(self): + """ + Verify that the filter correctly parses the log and writes relative paths. + """ + print(f"\n[Test] Filtering log: {self.log_file}") + + # Execute the filter + self.filter_op() + + # 1. Verify the output file exists + self.assertTrue( + os.path.exists(self.output_txt), "Output text file was not created." + ) + + # 2. Read the results and verify content + with open(self.output_txt, "r") as f: + lines = [line.strip() for line in f.readlines() if line.strip()] + + print(f"[Debug] Found {len(lines)} faulty models in output.") + for line in lines: + print(f" - {line}") + + # 3. Basic Validation + # Check that paths are indeed relative (should not start with /workspace/GraphNet) + for path in lines: + self.assertFalse( + path.startswith(self.graph_net_root), + f"Path '{path}' should be relative to prefix.", + ) + # Check for a sample expected path segment if you know one from evaluation.log + # e.g., self.assertIn("samples/ultralytics/yolov3-tinyu", path) + + +if __name__ == "__main__": + unittest.main() diff --git a/graph_net/test/filter_fault_model_path_test.sh b/graph_net/test/filter_fault_model_path_test.sh new file mode 100755 index 000000000..8699dcb53 --- /dev/null +++ b/graph_net/test/filter_fault_model_path_test.sh @@ -0,0 +1,35 @@ +#!/bin/bash + +# Dynamically locate the graph_net package root for the log file path +GRAPH_NET_ROOT=$(python3 -c "import graph_net; import os; print(os.path.dirname(graph_net.__file__))") + +# LOG_FILE needs to be dynamic to find the data on any machine +LOG_FILE="$GRAPH_NET_ROOT/test/data_calculate_es_scores/evaluation.log" + +# Static prefix as recorded in the logs +MODEL_PREFIX="/workspace/GraphNet" +OUTPUT_FILE="/tmp/workspace_fault_filter/faulty_models.txt" + +echo "[Info] Running FaultModelPathFilter..." + +# Inline execution with base64 encoded config +python3 -m graph_net.filter_fault_model_path \ + "$(base64 -w 0 < dict return verified_es_values -def main(args): +def calculate_es_scores_for_each_model_path( + log_content: str, + get_model_path_for_each_log_line: Callable[[str], str | None], + interpretation_type: str = "default", + negative_speedup_penalty: float = 0.0, + fpdb: float = 0.1, + enable_aggregation_mode: bool = True, +) -> dict[str, dict[int, float]]: + """ + Groups log content by model path using accumulate and groupby, + then calculates ES scores for each group. + """ + lines = log_content.splitlines() + + # 1. Get f(line) = 1 if get_model_path_for_each_log_line(line) is not None else 0 + line_indicators = [ + 1 if get_model_path_for_each_log_line(line) is not None else 0 for line in lines + ] + + # 2. Use itertools.accumulate to get the cumulative sum (cumsum) of indicators + cum_sums = list(itertools.accumulate(line_indicators)) + + # 3. Use itertools.groupby to group lines based on the cumsum + # 4. Adjust results to get log_contents grouped by model_path + model_paths = [] + log_contents_list = [] + + for _, group in itertools.groupby(zip(cum_sums, lines), key=lambda x: x[0]): + group_lines = [item[1] for item in group] + if not group_lines: + continue + + # Extract the model_path from the first line of the group + path = get_model_path_for_each_log_line(group_lines[0]) + if path is not None: + model_paths.append(path) + # Combine the lines belonging to this model path back into a single string + log_contents_list.append("\n".join(group_lines)) + + assert len(model_paths) == len(log_contents_list) + + # 5. Call the lower-level function calculate_es_scores_for_log_contents + # Note: This function returns list[dict[int, float]] based on your instruction + es_scores_list = calculate_es_scores_for_log_contents( + log_contents=log_contents_list, + interpretation_type=interpretation_type, + negative_speedup_penalty=negative_speedup_penalty, + fpdb=fpdb, + enable_aggregation_mode=enable_aggregation_mode, + ) + + # 6. Organize return results with model_path as key + # Returns dict[str, dict[int, float]] + return {model_paths[i]: es_scores_list[i] for i in range(len(model_paths))} + + +def calculate_es_scores_for_log_contents( + log_contents: list[str], + interpretation_type: str = "default", + negative_speedup_penalty: float = 0.0, + fpdb: float = 0.1, + enable_aggregation_mode: bool = True, +) -> list[dict[int, float]]: + """ + Wraps raw log contents into temporary files to compute Error Sign (ES) scores. + """ + + def _write_logs_to_dir(target_dir: str): + """Write each log content into tmp_dir/{i}.log.""" + for i, content in enumerate(log_contents): + (Path(target_dir) / f"{i}.log").write_text(content, encoding="utf-8") + + with TemporaryDirectory() as tmp_dir: + # Step 1: Create a temporary directory and write logs + _write_logs_to_dir(tmp_dir) + + # Step 2: Get index_str2es_scores from the file-based utility + index_str2es_scores = calculate_es_scores_for_log_file_or_dir( + benchmark_path=tmp_dir, + interpretation_type=interpretation_type, + negative_speedup_penalty=negative_speedup_penalty, + fpdb=fpdb, + enable_aggregation_mode=enable_aggregation_mode, + ) + + # Step 3: Convert keys of index_str2es_scores to int to get index2es_scores + index2es_scores = {int(k): v for k, v in index_str2es_scores.items()} + + # Step 4: Convert index2es_scores to list to get es_scores_list + # Ensuring the order matches the original log_contents indices + es_scores_list = [index2es_scores[i] for i in range(len(log_contents))] + + # Step 5: Return es_scores_list + return es_scores_list + + +def calculate_es_scores_for_log_file_or_dir( + benchmark_path: str, + interpretation_type: str = "default", + negative_speedup_penalty: float = 0.0, + fpdb: float = 0.1, + enable_aggregation_mode: bool = True, +): # 1. Scan folders to get data - all_results = analysis_util.scan_all_folders(args.benchmark_path) + all_results = analysis_util.scan_all_folders(benchmark_path) if not all_results: print("No valid data found. Exiting.") - return + return {} # 2. Calculate scores for each curve and verify aggregated/microscopic consistency all_es_scores = {} all_aggregated_results = {} positive_tolerance_interpretation = get_positive_tolerance_interpretation( - args.interpretation_type + interpretation_type ) for folder_name, samples in all_results.items(): @@ -185,8 +291,8 @@ def main(args): es_scores = analysis_util.calculate_scores( samples, - p=args.negative_speedup_penalty, - b=args.fpdb, + p=negative_speedup_penalty, + b=fpdb, type="ESt", positive_tolerance_interpretation=positive_tolerance_interpretation, ) @@ -195,14 +301,14 @@ def main(args): all_es_scores[folder_name] = es_scores # Verify aggregated/microscopic consistency if aggregation mode is enabled - if args.enable_aggregation_mode: + if enable_aggregation_mode: # Calculate aggregated results and attach to es_scores aggregated_results = ( verify_aggregated_params.verify_es_constructor_params_across_tolerances( samples, folder_name, - negative_speedup_penalty=args.negative_speedup_penalty, - fpdb=args.fpdb, + negative_speedup_penalty=negative_speedup_penalty, + fpdb=fpdb, positive_tolerance_interpretation=positive_tolerance_interpretation, ) ) @@ -222,7 +328,17 @@ def main(args): es_scores_wrapper, folder_name ) all_es_scores[folder_name] = verified_es_values + return all_es_scores + +def main(args): + all_es_scores = calculate_es_scores_for_log_file_or_dir( + benchmark_path=args.benchmark_path, + interpretation_type=args.interpretation_type, + negative_speedup_penalty=args.negative_speedup_penalty, + fpdb=args.fpdb, + enable_aggregation_mode=args.enable_aggregation_mode, + ) assert len(all_es_scores) == 1 with open(args.output_json_file_path, "w") as f: json.dump(next(iter(all_es_scores.items()))[1], f, indent=4)