audio_analyzer.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. import sys
  2. import re
  3. import binascii
  4. from collections import Counter, defaultdict
  5. import struct
  6. import os.path
  7. def parse_hex_stream(file_path):
  8. """Parse hexadecimal stream from a text file."""
  9. try:
  10. with open(file_path, 'r') as f:
  11. content = f.read()
  12. # Remove any whitespace and line breaks
  13. content = re.sub(r'\s+', '', content)
  14. return content
  15. except Exception as e:
  16. print(f"Error reading file: {e}")
  17. return None
  18. def identify_packets(hex_stream):
  19. """Split the hex stream into individual packets based on MEL header pattern."""
  20. # Pattern is 4d454c04 which is "MEL\x04" in ASCII
  21. packet_pattern = r'4d454c04'
  22. # Find all positions of the pattern
  23. positions = [match.start() for match in re.finditer(packet_pattern, hex_stream)]
  24. packets = []
  25. for i in range(len(positions)):
  26. start = positions[i]
  27. # If this is the last pattern occurrence, go to end of stream
  28. end = positions[i+1] if i < len(positions) - 1 else len(hex_stream)
  29. packet = hex_stream[start:end]
  30. packets.append(packet)
  31. return packets
  32. def analyze_packet_structure(packet):
  33. """Analyze the structure of a single packet."""
  34. if len(packet) < 20: # Ensure packet has enough bytes for header
  35. return {"error": "Packet too short"}
  36. # Extract header components
  37. header = packet[:8] # MEL\x04
  38. version = packet[8:12] # Version or type
  39. sequence = packet[12:16] # Possibly sequence number
  40. flags = packet[16:20] # Possibly flags
  41. # Extract length fields (if they exist)
  42. length_field = packet[20:28]
  43. # Extract the data portion (minus the checksum)
  44. data = packet[28:-4]
  45. # Extract the checksum (last 2 bytes / 4 hex chars)
  46. checksum = packet[-4:]
  47. # Calculate expected checksum (simple CRC)
  48. # This is just a placeholder; actual checksum algorithm would need to be determined
  49. calculated_checksum = binascii.crc32(bytes.fromhex(packet[:-4])) & 0xFFFF
  50. checksum_match = hex(calculated_checksum)[2:].zfill(4) == checksum.lower()
  51. return {
  52. "header": header,
  53. "version": version,
  54. "sequence": sequence,
  55. "flags": flags,
  56. "length_field": length_field,
  57. "data_length": len(data) // 2, # Byte count
  58. "checksum": checksum,
  59. "checksum_match": checksum_match,
  60. "total_bytes": len(packet) // 2
  61. }
  62. def detect_duplicates(packets):
  63. """Detect duplicate packets in the stream."""
  64. duplicates = []
  65. for i in range(len(packets) - 1):
  66. if packets[i] == packets[i + 1]:
  67. duplicates.append(i)
  68. duplicate_percentage = (len(duplicates) / len(packets)) * 100 if packets else 0
  69. return {
  70. "duplicate_count": len(duplicates),
  71. "duplicate_indices": duplicates,
  72. "duplicate_percentage": duplicate_percentage
  73. }
  74. def guess_codec(packets, file_path):
  75. """Attempt to identify the audio codec based on packet patterns."""
  76. # Extract common headers or patterns
  77. headers = Counter([packet[:24] for packet in packets])
  78. most_common_header = headers.most_common(1)[0][0] if headers else "Unknown"
  79. # Check for known codec signatures
  80. codec = "Unknown"
  81. quality = "Unknown"
  82. if "400hz-sine-wave" in file_path:
  83. quality = "High Quality"
  84. elif "400hz-square-wave" in file_path:
  85. quality = "High Quality"
  86. elif "audio-stream" in file_path:
  87. quality = "Normal Quality"
  88. # Since we know the system uses the LAME encoder (binary shipped with software)
  89. if most_common_header.startswith("4d454c0409010"):
  90. codec = "LAME MP3 (packaged in MEL Audio Format)"
  91. # MP3 frame analysis
  92. mp3_frame_sync_count = 0
  93. potential_bitrate = None
  94. potential_sample_rate = None
  95. # Check each packet for MP3 headers (starting with 0xFF 0xFB for MPEG-1 Layer 3)
  96. for packet in packets[:min(10, len(packets))]: # Check first 10 packets
  97. data_portion = packet[28:-4] # Skip header and checksum
  98. # Look for MP3 frame sync patterns
  99. sync_positions = [m.start() for m in re.finditer(r'fffb', data_portion)]
  100. if sync_positions:
  101. mp3_frame_sync_count += len(sync_positions)
  102. # Try to extract bitrate and sample rate from first valid header
  103. for pos in sync_positions:
  104. if pos + 4 <= len(data_portion):
  105. try:
  106. header_bytes = bytes.fromhex(data_portion[pos:pos+8])
  107. # Extract bits 16-19 for bitrate index (0-based)
  108. bitrate_index = (header_bytes[2] >> 4) & 0x0F
  109. # Extract bits 20-21 for sample rate index
  110. sample_rate_index = (header_bytes[2] >> 2) & 0x03
  111. # MPEG-1 Layer 3 bitrate table (kbps): 0 is free format
  112. bitrates = [0, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160, 192, 224, 256, 320]
  113. # MPEG-1 sample rates: 44100, 48000, 32000 Hz
  114. sample_rates = [44100, 48000, 32000, 0] # 0 is reserved
  115. if bitrate_index > 0 and sample_rate_index < 3: # Valid indices
  116. potential_bitrate = bitrates[bitrate_index]
  117. potential_sample_rate = sample_rates[sample_rate_index]
  118. break
  119. except:
  120. pass # Skip if unable to parse header
  121. # Evaluate if this is likely MP3 based on frame sync patterns
  122. mp3_likelihood = "High" if mp3_frame_sync_count > 5 else "Medium" if mp3_frame_sync_count > 0 else "Low"
  123. # Check for stream characteristics that might indicate codec/bitrate
  124. avg_packet_size = sum(len(p) for p in packets) / (2 * len(packets)) if packets else 0
  125. if potential_bitrate:
  126. codec_guess = f"LAME MP3 ({potential_bitrate}kbps)"
  127. elif 1000 <= avg_packet_size <= 1500:
  128. codec_guess = "LAME MP3 (48-64kbps)"
  129. elif avg_packet_size > 1500:
  130. codec_guess = "LAME MP3 (96-128kbps or higher)"
  131. else:
  132. codec_guess = "LAME MP3 (low bitrate)"
  133. return {
  134. "likely_codec": codec,
  135. "quality_setting": quality,
  136. "most_common_header": most_common_header,
  137. "codec_guess_from_size": codec_guess,
  138. "average_packet_size_bytes": avg_packet_size,
  139. "mp3_frame_sync_found": mp3_frame_sync_count > 0,
  140. "mp3_likelihood": mp3_likelihood,
  141. "detected_bitrate_kbps": potential_bitrate,
  142. "detected_sample_rate_hz": potential_sample_rate
  143. }
  144. def detect_repetition_pattern(packets):
  145. """Analyze if packets are sent in repeating patterns (beyond simple duplication)."""
  146. if len(packets) < 4:
  147. return {"pattern": "Not enough packets to detect pattern"}
  148. # Check if every second packet is a repeat
  149. alternate_duplicates = all(packets[i] == packets[i+2] for i in range(0, len(packets)-2, 2))
  150. # Check for more complex patterns
  151. repeats_every_n = None
  152. for n in range(2, min(10, len(packets) // 2)):
  153. if all(packets[i] == packets[i+n] for i in range(len(packets)-n)):
  154. repeats_every_n = n
  155. break
  156. return {
  157. "alternating_duplicates": alternate_duplicates,
  158. "repeats_every_n": repeats_every_n
  159. }
  160. def extract_timestamps(packets):
  161. """Try to extract timestamp information from packets."""
  162. timestamps = []
  163. for i, packet in enumerate(packets):
  164. # This would need to be adjusted based on actual packet structure
  165. # Assuming timestamp might be in a specific position
  166. potential_timestamp = packet[24:32]
  167. try:
  168. # Try to interpret as a 32-bit timestamp
  169. ts_value = int(potential_timestamp, 16)
  170. timestamps.append(ts_value)
  171. except:
  172. timestamps.append(None)
  173. return timestamps
  174. def calculate_total_duration(packets, sample_rate=44100):
  175. """Estimate total audio duration based on packet analysis."""
  176. # This is a rough estimation and would need adjustment based on the actual codec
  177. if not packets:
  178. return 0
  179. # For MP3, we'll use a different approach since we now know it's LAME MP3
  180. # Assuming each packet contains a fixed number of samples
  181. samples_per_frame = 1152 # Standard for MP3
  182. # Count potential MP3 frames in the data
  183. frame_count = 0
  184. for packet in packets:
  185. data_portion = packet[28:-4] # Skip header and checksum
  186. # Look for MP3 frame sync patterns (0xFF 0xFB for MPEG-1 Layer 3)
  187. sync_positions = [m.start() for m in re.finditer(r'fffb', data_portion)]
  188. frame_count += len(sync_positions)
  189. # If we can't detect frames, fallback to packet-based estimation
  190. if frame_count == 0:
  191. # Total unique packets as a conservative estimate
  192. unique_packets = len(set(packets))
  193. # Estimate one frame per packet (conservative)
  194. frame_count = unique_packets
  195. # Estimate duration
  196. total_samples = frame_count * samples_per_frame
  197. duration_seconds = total_samples / sample_rate
  198. return duration_seconds
  199. def analyze_audio_stream(file_path):
  200. """Complete analysis of an audio stream file."""
  201. hex_stream = parse_hex_stream(file_path)
  202. if not hex_stream:
  203. return {"error": "Failed to parse hex stream"}
  204. packets = identify_packets(hex_stream)
  205. if not packets:
  206. return {"error": "No valid packets identified"}
  207. packet_analyses = [analyze_packet_structure(p) for p in packets]
  208. packet_lengths = [p["total_bytes"] for p in packet_analyses]
  209. # Group by packet lengths to detect patterns
  210. length_count = Counter(packet_lengths)
  211. most_common_lengths = length_count.most_common(3)
  212. duplicates = detect_duplicates(packets)
  213. codec_info = guess_codec(packets, file_path)
  214. repetition = detect_repetition_pattern(packets)
  215. timestamps = extract_timestamps(packets)
  216. # Use detected sample rate if available, otherwise default to 44100
  217. sample_rate = codec_info.get("detected_sample_rate_hz", 44100)
  218. duration = calculate_total_duration(packets, sample_rate)
  219. # Analyze duplicated packets pattern
  220. pairs = []
  221. for i in range(0, len(packets)-1, 2):
  222. if i+1 < len(packets):
  223. are_identical = packets[i] == packets[i+1]
  224. pairs.append(are_identical)
  225. pairs_percentage = sum(pairs)/len(pairs)*100 if pairs else 0
  226. # Extract LAME tag info if present for VBR and encoding quality
  227. lame_version = None
  228. lame_tag_found = False
  229. vbr_method = None
  230. # Look for LAME tag in first few packets
  231. for packet in packets[:min(5, len(packets))]:
  232. data_portion = packet[28:-4] # Skip header and checksum
  233. # Look for "LAME" or "Lavf" strings in hex
  234. if "4c414d45" in data_portion.lower(): # "LAME" in hex
  235. lame_tag_found = True
  236. # Additional LAME tag parsing could be added here
  237. elif "4c617666" in data_portion.lower(): # "Lavf" in hex (LAVF container format)
  238. lame_tag_found = True
  239. return {
  240. "file_name": os.path.basename(file_path),
  241. "total_packets": len(packets),
  242. "unique_packets": len(set(packets)),
  243. "packet_lengths": most_common_lengths,
  244. "average_packet_length": sum(packet_lengths) / len(packet_lengths) if packet_lengths else 0,
  245. "duplicates": duplicates,
  246. "codec_info": codec_info,
  247. "repetition_pattern": repetition,
  248. "timestamp_pattern": "Available" if any(timestamps) else "Not found",
  249. "estimated_duration_seconds": duration,
  250. "paired_packet_pattern": f"{pairs_percentage:.1f}% of packets appear in identical pairs",
  251. "lame_tag_found": lame_tag_found
  252. }
  253. def main():
  254. if len(sys.argv) < 2:
  255. print("Usage: python audio_analyzer.py <audio_file.txt> [audio_file2.txt] ...")
  256. return
  257. for file_path in sys.argv[1:]:
  258. print(f"\nAnalyzing: {file_path}")
  259. print("-" * 50)
  260. analysis = analyze_audio_stream(file_path)
  261. if "error" in analysis:
  262. print(f"Error: {analysis['error']}")
  263. continue
  264. print(f"File: {analysis['file_name']}")
  265. print(f"Total packets: {analysis['total_packets']}")
  266. print(f"Unique packets: {analysis['unique_packets']}")
  267. print(f"Most common packet lengths (bytes): {analysis['packet_lengths']}")
  268. print(f"Average packet length: {analysis['average_packet_length']:.2f} bytes")
  269. print(f"Duplicates: {analysis['duplicates']['duplicate_count']} ({analysis['duplicates']['duplicate_percentage']:.1f}%)")
  270. print(f"Likely codec: {analysis['codec_info']['likely_codec']}")
  271. print(f"Quality setting: {analysis['codec_info']['quality_setting']}")
  272. print(f"Codec estimate: {analysis['codec_info']['codec_guess_from_size']}")
  273. print(f"MP3 likelihood: {analysis['codec_info'].get('mp3_likelihood', 'Unknown')}")
  274. if analysis['codec_info'].get('detected_bitrate_kbps'):
  275. print(f"Detected bitrate: {analysis['codec_info']['detected_bitrate_kbps']} kbps")
  276. if analysis['codec_info'].get('detected_sample_rate_hz'):
  277. print(f"Detected sample rate: {analysis['codec_info']['detected_sample_rate_hz']} Hz")
  278. print(f"LAME tag found: {'Yes' if analysis.get('lame_tag_found', False) else 'No'}")
  279. print(f"Repetition pattern: {analysis['repetition_pattern']}")
  280. print(f"Estimated duration: {analysis['estimated_duration_seconds']:.2f} seconds")
  281. print(f"Packet pairing: {analysis['paired_packet_pattern']}")
  282. if __name__ == "__main__":
  283. main()