idiot.py 59 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324
  1. import re
  2. import sys
  3. import os
  4. import argparse
  5. from typing import List, Tuple, Callable, Dict, Generator, Optional
  6. from collections import defaultdict, Counter
  7. import json
  8. import time
  9. from itertools import islice
  10. import math
  11. import random
  12. # --- This is pure AI Slop ---
  13. def checksum_sum(data: bytes) -> int:
  14. return sum(data) % 256
  15. def checksum_xor(data: bytes) -> int:
  16. result = 0
  17. for b in data:
  18. result ^= b
  19. return result
  20. def checksum_sum_shifted(data: bytes, shift: int) -> int:
  21. return sum((b << shift) & 0xFF for b in data) % 256
  22. def checksum_xor_shifted(data: bytes, shift: int) -> int:
  23. result = 0
  24. for b in data:
  25. result ^= (b << shift) & 0xFF
  26. return result
  27. def checksum_weighted_sum(data: bytes) -> int:
  28. return sum((i + 1) * b for i, b in enumerate(data)) % 256
  29. def checksum_alt_sum_xor(data: bytes) -> int:
  30. s = sum(data)
  31. x = 0
  32. for i, b in enumerate(data):
  33. if i % 2 == 0:
  34. x ^= b
  35. else:
  36. s ^= b
  37. return (s + x) % 256
  38. def checksum_bit_flip_sum(data: bytes) -> int:
  39. return sum(b ^ 0xFF for b in data) % 256
  40. # --- Input Parser ---
  41. def parse_input_file_lines(filepath: str) -> Tuple[List[Tuple[bytes, int]], Dict]:
  42. samples = []
  43. total_lines = 0
  44. with open(filepath, "r") as f:
  45. for line in f:
  46. total_lines += 1
  47. match = re.match(r'([0-9a-fA-F]+)\s*=\s*([0-9a-fA-F]{1,2})', line.strip())
  48. if match:
  49. hex_data = bytes.fromhex(match.group(1))
  50. checksum = int(match.group(2), 16)
  51. samples.append((hex_data, checksum))
  52. # Return samples and metadata
  53. return samples, {"total_lines": total_lines, "valid_samples": len(samples)}
  54. # --- Enhanced Input Parser for Large Files ---
  55. def parse_input_file_lines_batched(filepath: str, batch_size: int = 1000) -> Generator[List[Tuple[bytes, int]], None, Dict]:
  56. """
  57. Parse a large input file in batches to avoid memory issues.
  58. Returns a generator that yields batches of samples.
  59. """
  60. samples = []
  61. total_lines = 0
  62. valid_samples = 0
  63. try:
  64. with open(filepath, "r") as f:
  65. for line in f:
  66. total_lines += 1
  67. match = re.match(r'([0-9a-fA-F]+)\s*=\s*([0-9a-fA-F]{1,2})', line.strip())
  68. if match:
  69. hex_data = bytes.fromhex(match.group(1))
  70. checksum = int(match.group(2), 16)
  71. samples.append((hex_data, checksum))
  72. valid_samples += 1
  73. # Yield a batch when it reaches the batch size
  74. if len(samples) >= batch_size:
  75. yield samples
  76. samples = []
  77. except Exception as e:
  78. print(f"Error reading file: {e}")
  79. # Yield any remaining samples
  80. if samples:
  81. yield samples
  82. # Return metadata about the entire file
  83. return {"total_lines": total_lines, "valid_samples": valid_samples}
  84. # --- Brute Force Evaluation ---
  85. def bruteforce_all_methods(samples: List[Tuple[bytes, int]], label_prefix="", file_metadata=None) -> List[Tuple[str, int, int, str]]:
  86. methods: List[Tuple[str, Callable[[bytes], int]]] = [
  87. ("SUM", checksum_sum),
  88. ("XOR", checksum_xor),
  89. ("SUM<<1", lambda d: checksum_sum_shifted(d, 1)),
  90. ("SUM<<2", lambda d: checksum_sum_shifted(d, 2)),
  91. ("XOR<<1", lambda d: checksum_xor_shifted(d, 1)),
  92. ("XOR<<2", lambda d: checksum_xor_shifted(d, 2)),
  93. ("WEIGHTED_SUM", checksum_weighted_sum),
  94. ("ALT_SUM_XOR", checksum_alt_sum_xor),
  95. ("BIT_FLIP_SUM", checksum_bit_flip_sum)
  96. ]
  97. seen = set()
  98. matches = []
  99. sample_methods = defaultdict(list) # Track methods that work for each sample
  100. for sample_index, (data, expected) in enumerate(samples):
  101. length = len(data)
  102. sample_success = [] # Track successful methods for this sample
  103. for start in range(length):
  104. for end in range(start + 1, length + 1):
  105. sliced = data[start:end]
  106. label = f"[{start}:{end}]"
  107. for name, func in methods:
  108. try:
  109. result = func(sliced)
  110. method_id = f"{name}{label}"
  111. key = (sample_index, method_id, label_prefix)
  112. if result == expected and key not in seen:
  113. seen.add(key)
  114. matches.append((method_id, sample_index + 1, expected, label_prefix))
  115. sample_success.append((name, start, end))
  116. except Exception:
  117. continue
  118. # Store methods that work for this sample
  119. if sample_success:
  120. sample_methods[sample_index] = sample_success
  121. # Calculate consistency scores if we have enough samples
  122. if len(samples) > 1 and sample_methods:
  123. consistency_analysis = analyze_consistency(sample_methods, len(samples))
  124. matches.append(("CONSISTENCY_DATA", 0, 0, json.dumps(consistency_analysis)))
  125. # Add file metadata for reporting
  126. if file_metadata:
  127. file_name = file_metadata.get("file", "unknown")
  128. matches.append(("FILE_METADATA", file_name, 0, json.dumps(file_metadata)))
  129. return matches
  130. # --- Consistency Analysis ---
  131. def analyze_consistency(sample_methods: Dict[int, List[Tuple[str, int, int]]], total_samples: int) -> Dict:
  132. """Analyze which methods work consistently across different samples."""
  133. method_consistency = defaultdict(int)
  134. range_consistency = defaultdict(int)
  135. method_range_consistency = defaultdict(int)
  136. # Count how many samples each method/range works for
  137. for sample_idx, methods in sample_methods.items():
  138. seen_methods = set()
  139. seen_ranges = set()
  140. seen_method_ranges = set()
  141. for method, start, end in methods:
  142. if method not in seen_methods:
  143. seen_methods.add(method)
  144. method_consistency[method] += 1
  145. range_key = f"{start}:{end}"
  146. if range_key not in seen_ranges:
  147. seen_ranges.add(range_key)
  148. range_consistency[range_key] += 1
  149. method_range_key = f"{method}[{start}:{end}]"
  150. if method_range_key not in seen_method_ranges:
  151. seen_method_ranges.add(method_range_key)
  152. method_range_consistency[method_range_key] += 1
  153. # Calculate consistency percentages
  154. method_scores = {method: count / total_samples * 100 for method, count in method_consistency.items()}
  155. range_scores = {range_key: count / total_samples * 100 for range_key, count in range_consistency.items()}
  156. method_range_scores = {mr: count / total_samples * 100 for mr, count in method_range_consistency.items()}
  157. # Find the most consistent options
  158. best_methods = sorted(method_scores.items(), key=lambda x: x[1], reverse=True)[:5]
  159. best_ranges = sorted(range_scores.items(), key=lambda x: x[1], reverse=True)[:5]
  160. best_method_ranges = sorted(method_range_scores.items(), key=lambda x: x[1], reverse=True)[:5]
  161. return {
  162. "best_methods": best_methods,
  163. "best_ranges": best_ranges,
  164. "best_method_ranges": best_method_ranges,
  165. "total_samples": total_samples
  166. }
  167. # --- Pattern Recognition ---
  168. def analyze_patterns(matches: List[Tuple[str, int, int, str]]) -> Dict:
  169. patterns = {
  170. "methods": Counter(),
  171. "ranges": Counter(),
  172. "start_positions": Counter(),
  173. "end_positions": Counter(),
  174. "lengths": Counter()
  175. }
  176. for method_id, _, _, _ in matches:
  177. # Extract method name and range from method_id (e.g., "SUM[0:5]")
  178. method_parts = re.match(r'([A-Z_<>0-9]+)\[(\d+):(\d+)\]', method_id)
  179. if method_parts:
  180. method_name, start, end = method_parts.groups()
  181. start_pos, end_pos = int(start), int(end)
  182. byte_range = f"[{start}:{end}]"
  183. length = end_pos - start_pos
  184. patterns["methods"][method_name] += 1
  185. patterns["ranges"][byte_range] += 1
  186. patterns["start_positions"][start_pos] += 1
  187. patterns["end_positions"][end_pos] += 1
  188. patterns["lengths"][length] += 1
  189. return patterns
  190. # --- Result Display ---
  191. def print_results_with_summary(all_matches: List[Tuple[str, int, int, str]], per_file=False, insights=None, show_full=False):
  192. """Print results with optional detailed analysis"""
  193. # Extract consistency data and file metadata
  194. consistency_data = {}
  195. file_metadata = {}
  196. filtered_matches = []
  197. for match in all_matches:
  198. if match[0] == "CONSISTENCY_DATA" and match[3]:
  199. try:
  200. file_data = match[3]
  201. consistency_data[file_data] = json.loads(file_data)
  202. except:
  203. pass
  204. elif match[0] == "FILE_METADATA" and match[3]:
  205. try:
  206. metadata = json.loads(match[3])
  207. file_name = match[1] # Use the file name stored in match[1]
  208. file_metadata[file_name] = metadata
  209. except Exception as e:
  210. print(f"Error processing metadata: {e}")
  211. else:
  212. filtered_matches.append(match)
  213. all_matches = filtered_matches
  214. if not all_matches:
  215. print("❌ No matches found.")
  216. return
  217. # Always organize by file
  218. per_file_matches = defaultdict(list)
  219. for match in all_matches:
  220. per_file_matches[match[3]].append(match)
  221. # Per-file statistics and pattern analysis
  222. for file, matches in per_file_matches.items():
  223. # Get file metadata if available
  224. metadata = {}
  225. for meta_file, meta_data in file_metadata.items():
  226. if isinstance(meta_file, str) and file in meta_file: # Ensure meta_file is a string
  227. metadata = meta_data
  228. break
  229. # Extract sample lines that matched successfully
  230. matched_lines = set(line for _, line, _, _ in matches)
  231. # Print file summary with line counts
  232. print(f"\n\n📄 Results for: {file}")
  233. if metadata:
  234. total_lines = metadata.get("total_lines", "?")
  235. valid_samples = metadata.get("valid_samples", len(matched_lines))
  236. success_rate = (len(matched_lines)/valid_samples*100) if valid_samples > 0 else 0
  237. print(f"✅ Matches Found: {len(matched_lines)}/{valid_samples} samples " +
  238. f"({success_rate:.1f}% success rate)")
  239. print(f"📝 Total file lines: {total_lines}, Valid samples: {valid_samples}")
  240. else:
  241. print(f"✅ Matches Found: {len(matches)}")
  242. # Only show individual matches if per_file flag is set AND full details are requested
  243. if per_file and show_full:
  244. for method_id, line, expected, _ in matches[:20]: # Show only first 20 to avoid flooding
  245. print(f"Line {line:03d} | Method: {method_id:20s} | Expected: {expected:02X}")
  246. if len(matches) > 20:
  247. print(f"... and {len(matches) - 20} more matches")
  248. elif per_file:
  249. # In condensed mode, just show counts per line
  250. line_counts = Counter(line for _, line, _, _ in matches)
  251. print(f"Lines with matches: {', '.join(str(l) for l in sorted(line_counts.keys()))}")
  252. if len(line_counts) > 10:
  253. print(f"Total lines with matches: {len(line_counts)}")
  254. # Pattern analysis for this file
  255. patterns = analyze_patterns(matches)
  256. # Print top methods for this file
  257. print("\n📊 Most Successful Methods in this file:")
  258. for method, count in patterns["methods"].most_common(5):
  259. print(f"{method:<15} → {count} matches")
  260. if show_full:
  261. # Print top ranges for this file
  262. print("\n📏 Most Common Byte Ranges:")
  263. for range_str, count in patterns["ranges"].most_common(5):
  264. print(f"{range_str:<10} → {count} matches")
  265. # Print common start positions
  266. print("\n🔍 Common Start Positions:")
  267. for pos, count in patterns["start_positions"].most_common(5):
  268. print(f"Position {pos:<3} → {count} matches")
  269. # Print common end positions
  270. print("\n🔎 Common End Positions:")
  271. for pos, count in patterns["end_positions"].most_common(5):
  272. print(f"Position {pos:<3} → {count} matches")
  273. # Print common byte lengths
  274. print("\n📊 Common Byte Lengths:")
  275. for length, count in patterns["lengths"].most_common(5):
  276. print(f"{length} bytes → {count} matches")
  277. # Visual representation of match distribution
  278. if patterns["start_positions"] and patterns["end_positions"]:
  279. max_pos = max(max(patterns["end_positions"].keys()),
  280. max(patterns["start_positions"].keys()))
  281. print("\n📈 Match Distribution (frequency by position):")
  282. scale = 30 # Reduced scale for more compact output
  283. max_count = max(max(patterns["start_positions"].values()),
  284. max(patterns["end_positions"].values()))
  285. for pos in range(min(max_pos + 1, 40)): # Limit to first 40 positions
  286. start_count = patterns["start_positions"].get(pos, 0)
  287. end_count = patterns["end_positions"].get(pos, 0)
  288. start_bar = '█' * int((start_count / max_count) * scale) if start_count else ''
  289. end_bar = '░' * int((end_count / max_count) * scale) if end_count else ''
  290. print(f"{pos:2d}: {start_bar}|{end_bar}")
  291. print(" ███ = start positions, ░░░ = end positions")
  292. # Print byte-level insights for each sample if available
  293. if insights and show_full:
  294. file_insights = {k: v for k, v in insights.items() if k.startswith(f"sample_") and file in v.get("method", "")}
  295. if file_insights:
  296. print("\n🔬 Byte-Level Analysis:")
  297. for key, data in file_insights.items():
  298. parts = key.split('_')
  299. sample_id = parts[1] if len(parts) > 1 else "?"
  300. print(f"\nSample {sample_id} with {data['method']}[{data['range']}]:")
  301. # Show optimal byte changes
  302. if data.get("optimal_changes"):
  303. print("Optimal byte changes to achieve expected checksum:")
  304. for pos, new_val in data["optimal_changes"]:
  305. print(f" Change byte at position {pos} from 0x{data['contributions']['byte_contributions'][pos]['original_value']:02X} to 0x{new_val:02X}")
  306. else:
  307. print("No simple byte changes found to fix checksum")
  308. # Global summary (always show this part)
  309. print("\n\n📊 Global Summary of Most Successful Methods:")
  310. method_counts = defaultdict(int)
  311. for method_id, _, _, _ in all_matches:
  312. method_counts[method_id] += 1
  313. sorted_methods = sorted(method_counts.items(), key=lambda x: x[1], reverse=True)
  314. for method_id, count in sorted_methods[:5]: # Reduced to top 5 for conciseness
  315. print(f"{method_id:<25} → {count} matches")
  316. # Show more detailed global pattern summary only in full mode
  317. if show_full:
  318. all_patterns = analyze_patterns(all_matches)
  319. print("\n📈 Global Pattern Summary:")
  320. print(f"Total unique methods found: {len(all_patterns['methods'])}")
  321. print(f"Total unique byte ranges: {len(all_patterns['ranges'])}")
  322. print(f"Most common method: {all_patterns['methods'].most_common(1)[0][0]} with {all_patterns['methods'].most_common(1)[0][1]} matches")
  323. # Print global consensus analysis at the end
  324. if consistency_data and show_full:
  325. print("\n\n🧩 Global Consensus Analysis")
  326. print("═══════════════════════════")
  327. print("Methods that work across multiple files:")
  328. # Collect global statistics from all files
  329. global_methods = Counter()
  330. global_ranges = Counter()
  331. global_method_ranges = Counter()
  332. for file_data in consistency_data.values():
  333. for method, score in file_data.get("best_methods", []):
  334. global_methods[method] += 1
  335. for range_key, score in file_data.get("best_ranges", []):
  336. global_ranges[range_key] += 1
  337. for mr, score in file_data.get("best_method_ranges", []):
  338. global_method_ranges[mr] += 1
  339. # Display methods that work across multiple files
  340. num_files = len(consistency_data)
  341. print(f"\n📊 Methods that work across multiple files (total files: {num_files}):")
  342. for method, count in global_methods.most_common(5):
  343. print(f"{method:<15} → appears in top 5 for {count}/{num_files} files ({count/num_files*100:.1f}%)")
  344. print(f"\n📏 Byte ranges that work across multiple files:")
  345. for range_key, count in global_ranges.most_common(5):
  346. print(f"[{range_key}] → appears in top 5 for {count}/{num_files} files ({count/num_files*100:.1f}%)")
  347. print(f"\n🔍 Method+Range combinations that work across multiple files:")
  348. for mr, count in global_method_ranges.most_common(5):
  349. print(f"{mr:<20} → appears in top 5 for {count}/{num_files} files ({count/num_files*100:.1f}%)")
  350. # Generate a recommended approach
  351. if global_method_ranges:
  352. best_combo, count = global_method_ranges.most_common(1)[0]
  353. if count >= num_files * 0.5: # If it works for at least half the files
  354. print(f"\n✅ Recommended global method: {best_combo}")
  355. print(f" This combination works in top 5 for {count}/{num_files} files")
  356. else:
  357. print("\n⚠️ No single method+range combination works reliably across most files")
  358. print(f" Best option ({best_combo}) only works in top 5 for {count}/{num_files} files")
  359. # Try to find patterns in the most successful methods
  360. if global_methods:
  361. best_method, method_count = global_methods.most_common(1)[0]
  362. print(f"\n💡 Consider using {best_method} with file-specific byte ranges")
  363. print(f" This algorithm appears in top 5 for {method_count}/{num_files} files")
  364. # --- Advanced Checksum Algorithms ---
  365. def checksum_weighted_sum_parametric(data: bytes, weight_start: float = 1.0, weight_step: float = 1.0) -> int:
  366. """Weighted sum with configurable starting weight and step"""
  367. return sum(int((weight_start + i * weight_step) * b) % 256 for i, b in enumerate(data)) % 256
  368. def checksum_hybrid_sum_xor(data: bytes, weight: float = 0.5) -> int:
  369. """Hybrid checksum using weighted combination of sum and XOR"""
  370. sum_result = sum(data) % 256
  371. xor_result = 0
  372. for b in data:
  373. xor_result ^= b
  374. return int((weight * sum_result + (1 - weight) * xor_result)) % 256
  375. def checksum_adaptive_bit_flip_sum(data: bytes, flip_mask: int = 0xFF) -> int:
  376. """Bit flip sum with configurable flip mask"""
  377. return sum(b ^ flip_mask for b in data) % 256
  378. def checksum_position_weighted_sum(data: bytes, position_weights: List[float] = None) -> int:
  379. """Sum where each byte is weighted by its position in a specific pattern"""
  380. if position_weights is None:
  381. # Default to alternating weights
  382. position_weights = [1.0, 0.5]
  383. result = 0
  384. for i, b in enumerate(data):
  385. weight = position_weights[i % len(position_weights)]
  386. result = (result + int(b * weight)) % 256
  387. return result
  388. def evaluate_targeted_algorithms(samples: List[Tuple[bytes, int]], label_prefix="") -> List[Tuple[str, int, int, str]]:
  389. """Run a more focused test on the most promising algorithms with fine-tuned parameters"""
  390. # Based on consensus, focus testing on these methods with more parameter variations
  391. matches = []
  392. seen = set()
  393. # Set up parameter variations for testing
  394. bit_flip_masks = [0xFF, 0xF0, 0x0F, 0xCC, 0x55, 0xAA]
  395. hybrid_weights = [0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]
  396. weight_steps = [0.9, 1.0, 1.1, 1.2, 1.5]
  397. pos_weight_patterns = [
  398. [1.0, 0.5], # Alternating
  399. [1.0, 1.0, 0.5], # Every third byte gets half weight
  400. [1.0, 0.75, 0.5, 0.25] # Descending weights
  401. ]
  402. # Process each sample with focused algorithms
  403. for sample_index, (data, expected) in enumerate(samples):
  404. length = len(data)
  405. # Instead of trying every possible byte range, focus on the most promising ranges
  406. # based on global patterns from previous analysis
  407. # Try more specific ranges based on insights
  408. ranges_to_try = []
  409. # Focus on common start positions from global analysis: 0-5 and specific ranges
  410. for start in [0, 1, 2, 3, 4, 5]:
  411. # Try full data range
  412. ranges_to_try.append((start, length))
  413. # Try common end points (from previous runs)
  414. for end_offset in [0, 1, 2, 4, 8]:
  415. if length - end_offset > start + 1: # Ensure valid range
  416. ranges_to_try.append((start, length - end_offset))
  417. # Add specific ranges that were successful in multiple files
  418. specific_ranges = [(3, 30), (4, 31), (5, 8), (5, 9), (2, 11)]
  419. for start, end in specific_ranges:
  420. if start < length and end <= length and start < end:
  421. ranges_to_try.append((start, end))
  422. # Process the focused ranges with our most promising algorithms
  423. for start, end in ranges_to_try:
  424. sliced = data[start:end]
  425. label = f"[{start}:{end}]"
  426. # Test standard checksum methods that showed promise
  427. methods = [
  428. ("WEIGHTED_SUM", lambda d: checksum_weighted_sum(d)),
  429. ("ALT_SUM_XOR", lambda d: checksum_alt_sum_xor(d)),
  430. ("BIT_FLIP_SUM", lambda d: checksum_bit_flip_sum(d)),
  431. ("SUM<<1", lambda d: checksum_sum_shifted(d, 1))
  432. ]
  433. # Test the standard methods
  434. for name, func in methods:
  435. try:
  436. result = func(sliced)
  437. method_id = f"{name}{label}"
  438. key = (sample_index, method_id, label_prefix)
  439. if result == expected and key not in seen:
  440. seen.add(key)
  441. matches.append((method_id, sample_index + 1, expected, label_prefix))
  442. except Exception:
  443. continue
  444. # Test advanced parametric methods
  445. for mask in bit_flip_masks:
  446. try:
  447. result = checksum_adaptive_bit_flip_sum(sliced, mask)
  448. method_id = f"BIT_FLIP_SUM({mask:02X}){label}"
  449. key = (sample_index, method_id, label_prefix)
  450. if result == expected and key not in seen:
  451. seen.add(key)
  452. matches.append((method_id, sample_index + 1, expected, label_prefix))
  453. except Exception:
  454. continue
  455. for weight in hybrid_weights:
  456. try:
  457. result = checksum_hybrid_sum_xor(sliced, weight)
  458. method_id = f"HYBRID_SUM_XOR({weight:.1f}){label}"
  459. key = (sample_index, method_id, label_prefix)
  460. if result == expected and key not in seen:
  461. seen.add(key)
  462. matches.append((method_id, sample_index + 1, expected, label_prefix))
  463. except Exception:
  464. continue
  465. for step in weight_steps:
  466. try:
  467. result = checksum_weighted_sum_parametric(sliced, 1.0, step)
  468. method_id = f"WEIGHTED_SUM_STEP({step:.1f}){label}"
  469. key = (sample_index, method_id, label_prefix)
  470. if result == expected and key not in seen:
  471. seen.add(key)
  472. matches.append((method_id, sample_index + 1, expected, label_prefix))
  473. except Exception:
  474. continue
  475. for i, pattern in enumerate(pos_weight_patterns):
  476. try:
  477. result = checksum_position_weighted_sum(sliced, pattern)
  478. method_id = f"POS_WEIGHT_{i+1}{label}"
  479. key = (sample_index, method_id, label_prefix)
  480. if result == expected and key not in seen:
  481. seen.add(key)
  482. matches.append((method_id, sample_index + 1, expected, label_prefix))
  483. except Exception:
  484. continue
  485. return matches
  486. # --- Byte Change Correlation Analysis ---
  487. def analyze_byte_value_correlations(samples: List[Tuple[bytes, int]], max_samples: int = 1000) -> Dict:
  488. """
  489. Analyze how changing specific bytes correlates with changes in the checksum.
  490. This helps understand the "sensitivity" of the checksum to specific byte positions.
  491. """
  492. # Sample if we have too many samples to process
  493. if len(samples) > max_samples:
  494. print(f"Sampling {max_samples} out of {len(samples)} for correlation analysis")
  495. samples = random.sample(samples, max_samples)
  496. # Initialize data structures for correlation analysis
  497. bytes_by_position = defaultdict(list)
  498. checksums_by_position_value = defaultdict(list)
  499. correlations = {}
  500. position_weights = {}
  501. # Gather data by byte position
  502. max_length = max(len(data) for data, _ in samples)
  503. print(f"Analyzing correlations for {len(samples)} samples with max length {max_length}")
  504. # Track all byte values and checksums by position
  505. for data, checksum in samples:
  506. for pos, value in enumerate(data):
  507. bytes_by_position[pos].append(value)
  508. checksums_by_position_value[(pos, value)].append(checksum)
  509. # Calculate correlation strength for each position
  510. for pos in range(max_length):
  511. pos_values = bytes_by_position.get(pos, [])
  512. if len(pos_values) <= 1:
  513. continue
  514. # Create value-to-checksum mapping and analyze patterns
  515. value_impact = {}
  516. checksum_changes = []
  517. # Group by unique values at this position
  518. unique_values = set(pos_values)
  519. if len(unique_values) <= 1:
  520. continue
  521. # Analyze how changes in this position correlate with checksums
  522. for val in unique_values:
  523. checksums = checksums_by_position_value.get((pos, val), [])
  524. if checksums:
  525. avg_checksum = sum(checksums) / len(checksums)
  526. value_impact[val] = avg_checksum
  527. # If we have enough data, calculate correlation metrics
  528. if len(value_impact) >= 2:
  529. # Look for linear relationships
  530. xy_pairs = [(val, cs) for val, cs in value_impact.items()]
  531. correlation = calculate_correlation_coefficient(xy_pairs)
  532. # Look for bit-level patterns (XOR, bit flips)
  533. bit_patterns = analyze_bit_patterns(value_impact)
  534. correlations[pos] = {
  535. "strength": abs(correlation),
  536. "direction": "positive" if correlation >= 0 else "negative",
  537. "unique_values": len(unique_values),
  538. "sample_count": len(pos_values),
  539. "bit_patterns": bit_patterns
  540. }
  541. # Calculate a rough "weight" for this position in checksum calculations
  542. pos_weight = abs(correlation) * (len(unique_values) / 256)
  543. position_weights[pos] = pos_weight
  544. # Sort positions by correlation strength
  545. sorted_positions = sorted(correlations.keys(), key=lambda p: correlations[p]["strength"], reverse=True)
  546. significant_positions = sorted_positions[:10] # Most influential positions
  547. # Build response
  548. return {
  549. "significant_positions": significant_positions,
  550. "position_correlations": {p: correlations[p] for p in significant_positions},
  551. "position_weights": {p: position_weights[p] for p in position_weights if p in significant_positions},
  552. "analyzed_samples": len(samples),
  553. "max_length": max_length
  554. }
  555. def calculate_correlation_coefficient(pairs: List[Tuple[int, int]]) -> float:
  556. """Calculate Pearson's correlation coefficient between byte values and checksums."""
  557. if len(pairs) < 2:
  558. return 0.0
  559. x_vals = [p[0] for p in pairs]
  560. y_vals = [p[1] for p in pairs]
  561. n = len(pairs)
  562. # Calculate means
  563. x_mean = sum(x_vals) / n
  564. y_mean = sum(y_vals) / n
  565. # Calculate correlation coefficient
  566. numerator = sum((x - x_mean) * (y - y_mean) for x, y in zip(x_vals, y_vals))
  567. denominator_x = sum((x - x_mean) ** 2 for x in x_vals)
  568. denominator_y = sum((y - y_mean) ** 2 for y in y_vals)
  569. if denominator_x == 0 or denominator_y == 0:
  570. return 0.0
  571. return numerator / math.sqrt(denominator_x * denominator_y)
  572. def analyze_bit_patterns(value_impact: Dict[int, float]) -> Dict:
  573. """
  574. Analyze bit-level patterns in how byte changes affect checksums.
  575. Identifies patterns like "flipping bit 3 adds 8 to checksum" etc.
  576. """
  577. bit_influences = [0.0] * 8 # Influence of each bit position
  578. # Calculate average impact when each bit is set vs unset
  579. bit_set_checksums = [[] for _ in range(8)]
  580. bit_unset_checksums = [[] for _ in range(8)]
  581. for value, checksum in value_impact.items():
  582. # Analyze each bit
  583. for bit_pos in range(8):
  584. bit_mask = 1 << bit_pos
  585. if value & bit_mask: # Bit is set
  586. bit_set_checksums[bit_pos].append(checksum)
  587. else: # Bit is unset
  588. bit_unset_checksums[bit_pos].append(checksum)
  589. # Calculate average difference per bit
  590. for bit_pos in range(8):
  591. set_avg = sum(bit_set_checksums[bit_pos]) / len(bit_set_checksums[bit_pos]) if bit_set_checksums[bit_pos] else 0
  592. unset_avg = sum(bit_unset_checksums[bit_pos]) / len(bit_unset_checksums[bit_pos]) if bit_unset_checksums[bit_pos] else 0
  593. if set_avg and unset_avg:
  594. influence = set_avg - unset_avg
  595. bit_influences[bit_pos] = influence
  596. # Determine the bit pattern type
  597. pattern_types = {
  598. "xor_like": all(abs(bit_influences[i]) >= 0.5 for i in range(8)),
  599. "additive": all(bit_influences[i] >= 0 for i in range(8)),
  600. "subtractive": all(bit_influences[i] <= 0 for i in range(8)),
  601. "weighted": max(abs(b) for b in bit_influences) / (min(abs(b) for b in bit_influences) if min(abs(b) for b in bit_influences) else 1) > 3,
  602. }
  603. return {
  604. "bit_influences": {i: bit_influences[i] for i in range(8)},
  605. "pattern_type": next((ptype for ptype, matches in pattern_types.items() if matches), "mixed"),
  606. "most_influential_bit": bit_influences.index(max(bit_influences, key=abs))
  607. }
  608. def find_optimal_byte_changes(data: bytes, checksum_func: Callable, expected: int) -> List[Tuple[int, int]]:
  609. """
  610. Find the minimal set of byte changes needed to achieve the expected checksum.
  611. Returns a list of (position, new_value) tuples.
  612. """
  613. base_checksum = checksum_func(data)
  614. if base_checksum == expected:
  615. return [] # No changes needed
  616. # Try changing bytes to match target checksum using sensitivity information
  617. # First try single byte changes - this is much faster and most likely case
  618. for i in range(len(data)):
  619. modified = bytearray(data)
  620. target_diff = (expected - base_checksum) % 256
  621. # Try calculating what value this position should have
  622. if checksum_func == checksum_sum:
  623. # For sum, we can directly calculate needed value
  624. new_val = (data[i] + target_diff) % 256
  625. modified[i] = new_val
  626. if checksum_func(bytes(modified)) == expected:
  627. return [(i, new_val)]
  628. elif checksum_func == checksum_xor:
  629. # For XOR, direct calculation also works
  630. new_val = data[i] ^ (base_checksum ^ expected)
  631. modified[i] = new_val
  632. if checksum_func(bytes(modified)) == expected:
  633. return [(i, new_val)]
  634. else:
  635. # For other algorithms, try incremental changes or use binary search
  636. best_value = None
  637. best_diff = 256
  638. # Check common values first, then do a smarter search if needed
  639. for test_val in [0, 1, 0xFF, expected, data[i] ^ 0xFF]:
  640. if test_val == data[i]:
  641. continue
  642. modified[i] = test_val
  643. new_checksum = checksum_func(bytes(modified))
  644. if new_checksum == expected:
  645. return [(i, test_val)]
  646. diff = abs((new_checksum - expected) % 256)
  647. if diff < best_diff:
  648. best_diff = diff
  649. best_value = test_val
  650. # If we got close, try a more focused search around the promising value
  651. if best_diff < 50 and best_value is not None:
  652. for offset in range(-10, 11):
  653. test_val = (best_value + offset) % 256
  654. if test_val == data[i]:
  655. continue
  656. modified[i] = test_val
  657. new_checksum = checksum_func(bytes(modified))
  658. if new_checksum == expected:
  659. return [(i, test_val)]
  660. # If single byte changes don't work, try strategic two-byte changes
  661. # For performance, we'll limit this to nearby byte combinations
  662. for i in range(len(data)):
  663. for j in range(i+1, min(i+8, len(data))): # Try up to 7 bytes ahead
  664. for i_adj in [-1, 1]:
  665. for j_adj in [-1, 1]:
  666. modified = bytearray(data)
  667. modified[i] = (data[i] + i_adj) % 256
  668. modified[j] = (data[j] + j_adj) % 256
  669. if checksum_func(bytes(modified)) == expected:
  670. return [(i, modified[i]), (j, modified[j])]
  671. return []
  672. # --- Large-Scale File Analysis ---
  673. def analyze_large_file(filepath: str, max_samples=1000) -> Dict:
  674. """Analyze a large file efficiently by processing it in batches."""
  675. start_time = time.time()
  676. print(f"Starting large-scale analysis of {filepath}...")
  677. # Process the file in batches to handle large files
  678. batch_gen = parse_input_file_lines_batched(filepath, batch_size=1000)
  679. # First batch will be used for detailed analysis
  680. first_batch = next(batch_gen, [])
  681. if not first_batch:
  682. print("No valid samples found in file.")
  683. return {}
  684. # Collect metadata about the batch
  685. batch_metadata = next(batch_gen, {"total_lines": 0, "valid_samples": 0})
  686. # Perform initial algorithm identification on the first batch
  687. print(f"Identifying potential checksum algorithms on first {len(first_batch)} samples...")
  688. matches = bruteforce_all_methods(first_batch, label_prefix=os.path.basename(filepath))
  689. # Extract the most promising algorithms and ranges
  690. patterns = analyze_patterns([m for m in matches if m[0] != "CONSISTENCY_DATA"])
  691. top_methods = patterns["methods"].most_common(3)
  692. top_ranges = patterns["ranges"].most_common(3)
  693. # Combining top methods with top ranges for focused analysis
  694. focused_analysis = []
  695. method_func_map = {
  696. "SUM": checksum_sum,
  697. "XOR": checksum_xor,
  698. "SUM<<1": lambda d: checksum_sum_shifted(d, 1),
  699. "SUM<<2": lambda d: checksum_sum_shifted(d, 2),
  700. "XOR<<1": lambda d: checksum_xor_shifted(d, 1),
  701. "XOR<<2": lambda d: checksum_xor_shifted(d, 2),
  702. "WEIGHTED_SUM": checksum_weighted_sum,
  703. "ALT_SUM_XOR": checksum_alt_sum_xor,
  704. "BIT_FLIP_SUM": checksum_bit_flip_sum
  705. }
  706. # Collect a sample of data for correlation analysis
  707. correlation_samples = first_batch.copy()
  708. # Check more batches if we need more samples for correlation analysis
  709. batches_processed = 1
  710. while len(correlation_samples) < max_samples:
  711. batch = next(batch_gen, None)
  712. if batch is None:
  713. break
  714. correlation_samples.extend(batch[:max_samples - len(correlation_samples)])
  715. batches_processed += 1
  716. if batches_processed >= 10: # Limit to 10 batches for performance
  717. break
  718. # Perform correlation analysis
  719. print(f"Performing byte correlation analysis on {len(correlation_samples)} samples...")
  720. correlations = analyze_byte_value_correlations(correlation_samples, max_samples=max_samples)
  721. # Test the most likely algorithms on the significant byte positions
  722. print("Testing algorithm-position combinations...")
  723. for method_name, _ in top_methods:
  724. for range_str, _ in top_ranges:
  725. range_parts = range_str.strip('[]').split(':')
  726. if len(range_parts) == 2:
  727. start, end = int(range_parts[0]), int(range_parts[1])
  728. method_func = method_func_map.get(method_name)
  729. if method_func:
  730. success_count = 0
  731. for data, expected in correlation_samples[:100]: # Test on first 100 samples
  732. if len(data) >= end:
  733. result = method_func(data[start:end])
  734. if result == expected:
  735. success_count += 1
  736. success_rate = success_count / min(100, len(correlation_samples))
  737. focused_analysis.append({
  738. "method": method_name,
  739. "range": f"[{start}:{end}]",
  740. "success_rate": success_rate,
  741. "success_count": success_count
  742. })
  743. # Sort by success rate
  744. focused_analysis.sort(key=lambda x: x["success_rate"], reverse=True)
  745. # Find byte positions that most strongly influence the checksum
  746. influential_positions = correlations["significant_positions"][:5]
  747. elapsed_time = time.time() - start_time
  748. return {
  749. "file_name": os.path.basename(filepath),
  750. "samples_analyzed": len(correlation_samples),
  751. "elapsed_time": elapsed_time,
  752. "top_methods": [m[0] for m in top_methods],
  753. "top_ranges": [r[0] for r in top_ranges],
  754. "focused_analysis": focused_analysis[:5],
  755. "influential_positions": influential_positions,
  756. "position_correlations": {str(p): correlations["position_correlations"][p] for p in influential_positions},
  757. "byte_pattern_summary": summarize_byte_patterns(correlations),
  758. }
  759. def summarize_byte_patterns(correlations: Dict) -> Dict:
  760. """Summarize patterns in byte correlations to help understand the checksum algorithm."""
  761. if not correlations or "position_correlations" not in correlations:
  762. return {}
  763. # Identify patterns in how byte positions affect the checksum
  764. positions = correlations.get("significant_positions", [])
  765. if not positions:
  766. return {}
  767. # Count pattern types to identify algorithm characteristics
  768. pattern_types = Counter()
  769. for pos in positions:
  770. if pos in correlations["position_correlations"]:
  771. bit_patterns = correlations["position_correlations"][pos].get("bit_patterns", {})
  772. pattern_type = bit_patterns.get("pattern_type", "unknown")
  773. pattern_types[pattern_type] += 1
  774. # Algorithm characteristics based on patterns
  775. primary_pattern = pattern_types.most_common(1)[0][0] if pattern_types else "unknown"
  776. algorithm_characteristics = {
  777. "xor_like": "XOR-based algorithm (position-independent)",
  778. "additive": "Sum-based algorithm (position-independent)",
  779. "subtractive": "Subtraction-based algorithm (unusual)",
  780. "weighted": "Weighted algorithm (position-dependent)",
  781. "mixed": "Mixed algorithm (complex checksum)"
  782. }
  783. # Check position importance distribution
  784. pos_weights = correlations.get("position_weights", {})
  785. weight_values = list(pos_weights.values())
  786. weight_variance = 0
  787. if weight_values:
  788. mean_weight = sum(weight_values) / len(weight_values)
  789. weight_variance = sum((w - mean_weight) ** 2 for w in weight_values) / len(weight_values)
  790. position_dependent = weight_variance > 0.05
  791. return {
  792. "dominant_pattern": primary_pattern,
  793. "likely_algorithm_type": algorithm_characteristics.get(primary_pattern, "Unknown algorithm type"),
  794. "position_dependent": position_dependent,
  795. "weight_variance": weight_variance,
  796. "recommendation": get_algorithm_recommendation(primary_pattern, position_dependent)
  797. }
  798. def get_algorithm_recommendation(pattern_type: str, position_dependent: bool) -> str:
  799. """Get a recommendation for checksum algorithm based on correlation analysis."""
  800. if pattern_type == "xor_like" and not position_dependent:
  801. return "XOR-based checksum recommended"
  802. elif pattern_type == "xor_like" and position_dependent:
  803. return "Position-dependent XOR (shifted XOR) recommended"
  804. elif pattern_type == "additive" and not position_dependent:
  805. return "Simple sum checksum recommended"
  806. elif pattern_type == "additive" and position_dependent:
  807. return "Weighted sum checksum recommended"
  808. elif pattern_type == "weighted":
  809. return "Complex weighted checksum recommended"
  810. else:
  811. return "Mixed or complex algorithm recommended, try ALT_SUM_XOR or custom hybrid"
  812. def print_large_file_analysis(analysis: Dict):
  813. """Print the results of large-file analysis in a readable format."""
  814. print("\n📊 Large File Analysis Results")
  815. print("═══════════════════════════")
  816. print(f"File: {analysis.get('file_name', 'Unknown')}")
  817. print(f"Samples analyzed: {analysis.get('samples_analyzed', 0)}")
  818. print(f"Analysis time: {analysis.get('elapsed_time', 0):.2f} seconds")
  819. # Print the top methods and ranges
  820. print("\n🔍 Top Checksum Methods:")
  821. for method in analysis.get('top_methods', []):
  822. print(f" • {method}")
  823. print("\n📏 Top Byte Ranges:")
  824. for range_str in analysis.get('top_ranges', []):
  825. print(f" • {range_str}")
  826. # Print the focused analysis results
  827. print("\n✅ Best Method+Range Combinations:")
  828. for combo in analysis.get('focused_analysis', []):
  829. print(f" • {combo['method']}{combo['range']} → {combo['success_rate']*100:.1f}% success rate ({combo['success_count']} samples)")
  830. # Print the byte pattern summary
  831. pattern_summary = analysis.get('byte_pattern_summary', {})
  832. if pattern_summary:
  833. print("\n🧠 Algorithm Characteristics:")
  834. print(f" Dominant pattern: {pattern_summary.get('dominant_pattern', 'Unknown')}")
  835. print(f" Likely algorithm: {pattern_summary.get('likely_algorithm_type', 'Unknown')}")
  836. print(f" Position dependent: {'Yes' if pattern_summary.get('position_dependent', False) else 'No'}")
  837. print(f"\n💡 Recommendation: {pattern_summary.get('recommendation', 'Unknown')}")
  838. # Print influential byte positions
  839. print("\n🔢 Most Influential Byte Positions:")
  840. positions = analysis.get('influential_positions', [])
  841. pos_correlations = analysis.get('position_correlations', {})
  842. for pos in positions:
  843. pos_str = str(pos)
  844. if pos_str in pos_correlations:
  845. info = pos_correlations[pos_str]
  846. print(f" • Position {pos}: {info['strength']:.3f} correlation strength, " +
  847. f"{info['direction']} correlation, {info['unique_values']} unique values")
  848. # Print bit patterns if available
  849. bit_patterns = info.get("bit_patterns", {})
  850. if bit_patterns:
  851. most_influential_bit = bit_patterns.get("most_influential_bit", 0)
  852. print(f" Most influential bit: {most_influential_bit} (bit {7-most_influential_bit} from left)")
  853. # --- Enhanced Folder Processing ---
  854. def process_folder_with_limits(folder_path: str, max_total_samples: int = 1000) -> List[Tuple[bytes, int]]:
  855. """
  856. Process files in a folder with a limit on total samples.
  857. Returns a list of samples up to the specified limit.
  858. """
  859. all_samples = []
  860. files_processed = 0
  861. samples_collected = 0
  862. print(f"Processing folder with limit of {max_total_samples} samples...")
  863. for file in os.listdir(folder_path):
  864. if file.endswith(".txt"):
  865. full_path = os.path.join(folder_path, file)
  866. try:
  867. samples, file_meta = parse_input_file_lines(full_path)
  868. # Take only what we need to stay under max_total_samples
  869. remaining = max_total_samples - len(all_samples)
  870. if remaining <= 0:
  871. break
  872. if len(samples) > remaining:
  873. print(f"Taking {remaining} of {len(samples)} samples from {file}")
  874. samples = samples[:remaining]
  875. else:
  876. print(f"Taking all {len(samples)} samples from {file}")
  877. all_samples.extend(samples)
  878. files_processed += 1
  879. samples_collected += len(samples)
  880. # Stop if we've reached our limit
  881. if len(all_samples) >= max_total_samples:
  882. break
  883. except Exception as e:
  884. print(f"Error processing {file}: {e}")
  885. print(f"Processed {files_processed} files, collected {samples_collected} samples")
  886. return all_samples
  887. # --- Main ---
  888. if __name__ == "__main__":
  889. # Create argument parser
  890. parser = argparse.ArgumentParser(description='Analyze checksum algorithms in files.')
  891. parser.add_argument('path', help='Path to file or directory to analyze')
  892. parser.add_argument('--full', action='store_true', help='Show detailed output with all analyses')
  893. parser.add_argument('--byte-analysis', action='store_true', help='Perform byte-level contribution analysis')
  894. parser.add_argument('--large', action='store_true', help='Perform large-scale analysis optimized for big files')
  895. parser.add_argument('--max-samples', type=int, default=1000,
  896. help='Maximum number of samples for intensive analyses (byte-level and large-scale)')
  897. args = parser.parse_args()
  898. path = args.path
  899. show_full = args.full
  900. perform_byte_analysis = args.byte_analysis
  901. large_analysis = args.large
  902. max_samples = args.max_samples
  903. all_matches = []
  904. byte_insights = {}
  905. if os.path.isdir(path):
  906. # Standard brute force - process all samples without limits
  907. print("Phase 1: Running standard brute force analysis...")
  908. for file in os.listdir(path):
  909. if file.endswith(".txt"):
  910. full_path = os.path.join(path, file)
  911. try:
  912. parsed_samples, file_meta = parse_input_file_lines(full_path)
  913. # Process all samples for standard analysis
  914. match_results = bruteforce_all_methods(
  915. parsed_samples,
  916. label_prefix=file,
  917. file_metadata={"file": file, **file_meta}
  918. )
  919. all_matches.extend(match_results)
  920. except Exception as e:
  921. print(f"Error processing {file}: {e}")
  922. # Display standard results
  923. print_results_with_summary(all_matches, per_file=True, show_full=show_full)
  924. if perform_byte_analysis:
  925. # Limit to max_samples for the intensive byte-level analysis
  926. print(f"\n\nPhase 2: Running byte-level contribution analysis (limit: {max_samples} samples)...")
  927. files_analyzed = 0
  928. total_samples_analyzed = 0
  929. for file in list(os.listdir(path)):
  930. # Stop if we've hit our sample limit or analyzed enough files
  931. if total_samples_analyzed >= max_samples or files_analyzed >= 3:
  932. break
  933. if file.endswith(".txt"):
  934. full_path = os.path.join(path, file)
  935. try:
  936. parsed_samples, file_meta = parse_input_file_lines(full_path)
  937. if not parsed_samples:
  938. print(f"⚠️ No valid samples found in {file}")
  939. continue
  940. # Determine how many samples to take from this file
  941. samples_remaining = max_samples - total_samples_analyzed
  942. if samples_remaining <= 0:
  943. break
  944. samples_to_analyze = parsed_samples
  945. if len(parsed_samples) > samples_remaining:
  946. print(f"Limiting to {samples_remaining} samples from {file}")
  947. samples_to_analyze = parsed_samples[:samples_remaining]
  948. else:
  949. print(f"Analyzing all {len(parsed_samples)} samples from {file}")
  950. total_samples_analyzed += len(samples_to_analyze)
  951. files_analyzed += 1
  952. print(f"\n📄 Analyzing file: {file} ({len(samples_to_analyze)} samples)")
  953. match_results, file_insights = evaluate_with_byte_analysis(
  954. samples_to_analyze,
  955. label_prefix=f"BYTE_ANALYSIS_{file}",
  956. detailed=True
  957. )
  958. if not file_insights:
  959. print(f"⚠️ No byte-level insights found for {file}")
  960. byte_insights.update(file_insights)
  961. except Exception as e:
  962. print(f"⚠️ Error analyzing {file}: {e}")
  963. print(f"\nCompleted byte-level analysis on {total_samples_analyzed} samples from {files_analyzed} files")
  964. # Overall summary
  965. print("\n\n🧬 Byte Contribution Analysis Summary")
  966. print("═════════════════════════════════════")
  967. print(f"Total samples analyzed: {len(byte_insights)}")
  968. print(f"Methods with most influence on checksums:")
  969. # Collect statistics on which methods have highest average impact
  970. method_impacts = defaultdict(list)
  971. for key, data in byte_insights.items():
  972. if "contributions" in data:
  973. # Get average of max impacts across all bytes
  974. impacts = [info["max_impact"] for info in data["contributions"]["byte_contributions"].values()]
  975. if impacts:
  976. avg_impact = sum(impacts) / len(impacts)
  977. method_impacts[data["method"]].append(avg_impact)
  978. # Show average impact by method
  979. for method, impacts in method_impacts.items():
  980. if impacts:
  981. avg = sum(impacts) / len(impacts)
  982. print(f"{method:<15} → Avg impact: {avg:.1f}")
  983. elif os.path.isfile(path):
  984. parsed_samples, file_meta = parse_input_file_lines(path)
  985. file_name = os.path.basename(path)
  986. match_results = bruteforce_all_methods(
  987. parsed_samples,
  988. label_prefix=file_name,
  989. file_metadata={"file": file_name, **file_meta}
  990. )
  991. all_matches.extend(match_results)
  992. # Display results
  993. print_results_with_summary(all_matches, per_file=True, show_full=show_full)
  994. if perform_byte_analysis and parsed_samples:
  995. print("\nRunning byte-level contribution analysis...")
  996. try:
  997. match_results, file_insights = evaluate_with_byte_analysis(
  998. parsed_samples, # Now correctly passing just the samples list
  999. label_prefix=f"BYTE_ANALYSIS_{os.path.basename(path)}",
  1000. detailed=True
  1001. )
  1002. # Print just the first sample's analysis as an example
  1003. if file_insights:
  1004. key = next(iter(file_insights))
  1005. data = file_insights[key]
  1006. sample_id = key.split('_')[1] if len(key.split('_')) > 1 else "?"
  1007. method_name = data["method"]
  1008. range_str = data["range"]
  1009. # Get original sample data
  1010. if int(sample_id) <= len(parsed_samples):
  1011. data_bytes, expected = parsed_samples[int(sample_id)-1]
  1012. start, end = map(int, data["range"].split(':'))
  1013. sliced_data = data_bytes[start:end]
  1014. print(f"\nByte analysis for Sample {sample_id} using {method_name}[{range_str}]")
  1015. print_byte_analysis(sliced_data, data["contributions"], method_name)
  1016. except Exception as e:
  1017. print(f"⚠️ Error during byte analysis: {e}")
  1018. if os.path.isdir(path):
  1019. # ...existing code...
  1020. if large_analysis:
  1021. print(f"\n\nPerforming large-scale file analysis (limit: {max_samples} samples per file)...")
  1022. files_analyzed = 0
  1023. for file in list(os.listdir(path)):
  1024. if files_analyzed >= 5: # Limit to 5 files for performance
  1025. break
  1026. if file.endswith(".txt"):
  1027. full_path = os.path.join(path, file)
  1028. try:
  1029. analysis = analyze_large_file(full_path, max_samples=max_samples)
  1030. print_large_file_analysis(analysis)
  1031. files_analyzed += 1
  1032. except Exception as e:
  1033. print(f"⚠️ Error during large file analysis of {file}: {e}")
  1034. elif os.path.isfile(path):
  1035. # ...existing code...
  1036. if large_analysis:
  1037. try:
  1038. analysis = analyze_large_file(path, max_samples=max_samples)
  1039. print_large_file_analysis(analysis)
  1040. except Exception as e:
  1041. print(f"⚠️ Error during large file analysis: {e}")
  1042. def evaluate_with_byte_analysis(samples: List[Tuple[bytes, int]], label_prefix="", detailed=False) -> Tuple[List, Dict]:
  1043. """Analyze which methods work and provide byte-level insights"""
  1044. matches = []
  1045. seen = set()
  1046. byte_insights = {}
  1047. # Most promising methods based on previous analysis
  1048. methods = [
  1049. ("WEIGHTED_SUM", checksum_weighted_sum),
  1050. ("ALT_SUM_XOR", checksum_alt_sum_xor),
  1051. ("BIT_FLIP_SUM", checksum_bit_flip_sum),
  1052. ("SUM<<1", lambda d: checksum_sum_shifted(d, 1)),
  1053. ("HYBRID_SUM_XOR(0.5)", lambda d: checksum_hybrid_sum_xor(d, 0.5)),
  1054. ("BIT_FLIP_SUM(AA)", lambda d: checksum_adaptive_bit_flip_sum(d, 0xAA))
  1055. ]
  1056. for sample_index, (data, expected) in enumerate(samples[:5]): # Limit to first 5 samples for performance
  1057. length = len(data)
  1058. # Focus on the most promising ranges
  1059. ranges_to_try = []
  1060. # Add the specific ranges that were most successful in our analysis
  1061. specific_ranges = [(3, 30), (4, 31), (5, 8), (5, 9), (2, 11)]
  1062. for start, end in specific_ranges:
  1063. if start < length and end <= length and start < end:
  1064. ranges_to_try.append((start, end))
  1065. # Process each range with our methods
  1066. for start, end in ranges_to_try:
  1067. if end > start + 30: # Skip very large ranges to keep analysis fast
  1068. continue
  1069. sliced = data[start:end]
  1070. label = f"[{start}:{end}]"
  1071. for name, func in methods:
  1072. try:
  1073. result = func(sliced)
  1074. method_id = f"{name}{label}"
  1075. key = (sample_index, method_id, label_prefix)
  1076. if result == expected and key not in seen:
  1077. seen.add(key)
  1078. matches.append((method_id, sample_index + 1, expected, label_prefix))
  1079. # For matching methods, perform byte contribution analysis
  1080. if detailed:
  1081. print(f"Analyzing contributions for sample {sample_index+1}, method {method_id}...")
  1082. byte_contributions = analyze_byte_contributions(sliced, func, expected)
  1083. optimal_changes = find_optimal_byte_changes(sliced, func, expected)
  1084. # Store insights and also print them immediately
  1085. insights_key = f"sample_{sample_index+1}_{name}"
  1086. byte_insights[insights_key] = {
  1087. "contributions": byte_contributions,
  1088. "optimal_changes": optimal_changes,
  1089. "method": name,
  1090. "range": f"{start}:{end}",
  1091. "data": sliced # Store the data slice itself for easier analysis
  1092. }
  1093. # Print analysis directly during collection for immediate feedback
  1094. print_byte_analysis(sliced, byte_contributions, method_id)
  1095. # If we found compensation values, print them
  1096. if optimal_changes:
  1097. print("\nSuggested byte changes:")
  1098. for pos, new_val in optimal_changes:
  1099. print(f" Change byte at position {pos} from 0x{sliced[pos]:02X} to 0x{new_val:02X}")
  1100. # Once we've found and analyzed one matching method for a sample, move on
  1101. # to keep the output manageable
  1102. break
  1103. except Exception as e:
  1104. continue
  1105. # If we've already found and analyzed a method for this sample, move on
  1106. if any(k.startswith(f"sample_{sample_index+1}_") for k in byte_insights.keys()):
  1107. break
  1108. # If we've already found and analyzed a method for this sample, move on
  1109. if any(k.startswith(f"sample_{sample_index+1}_") for k in byte_insights.keys()):
  1110. continue
  1111. return matches, byte_insights