| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- import sys
- import re
- import binascii
- from collections import Counter, defaultdict
- import struct
- import os.path
- def parse_hex_stream(file_path):
- """Parse hexadecimal stream from a text file."""
- try:
- with open(file_path, 'r') as f:
- content = f.read()
-
- # Remove any whitespace and line breaks
- content = re.sub(r'\s+', '', content)
- return content
- except Exception as e:
- print(f"Error reading file: {e}")
- return None
- def identify_packets(hex_stream):
- """Split the hex stream into individual packets based on MEL header pattern."""
- # Pattern is 4d454c04 which is "MEL\x04" in ASCII
- packet_pattern = r'4d454c04'
-
- # Find all positions of the pattern
- positions = [match.start() for match in re.finditer(packet_pattern, hex_stream)]
-
- packets = []
- for i in range(len(positions)):
- start = positions[i]
- # If this is the last pattern occurrence, go to end of stream
- end = positions[i+1] if i < len(positions) - 1 else len(hex_stream)
- packet = hex_stream[start:end]
- packets.append(packet)
-
- return packets
- def analyze_packet_structure(packet):
- """Analyze the structure of a single packet."""
- if len(packet) < 20: # Ensure packet has enough bytes for header
- return {"error": "Packet too short"}
-
- # Extract header components
- header = packet[:8] # MEL\x04
- version = packet[8:12] # Version or type
- sequence = packet[12:16] # Possibly sequence number
- flags = packet[16:20] # Possibly flags
-
- # Extract length fields (if they exist)
- length_field = packet[20:28]
-
- # Extract the data portion (minus the checksum)
- data = packet[28:-4]
-
- # Extract the checksum (last 2 bytes / 4 hex chars)
- checksum = packet[-4:]
-
- # Calculate expected checksum (simple CRC)
- # This is just a placeholder; actual checksum algorithm would need to be determined
- calculated_checksum = binascii.crc32(bytes.fromhex(packet[:-4])) & 0xFFFF
- checksum_match = hex(calculated_checksum)[2:].zfill(4) == checksum.lower()
-
- return {
- "header": header,
- "version": version,
- "sequence": sequence,
- "flags": flags,
- "length_field": length_field,
- "data_length": len(data) // 2, # Byte count
- "checksum": checksum,
- "checksum_match": checksum_match,
- "total_bytes": len(packet) // 2
- }
- def detect_duplicates(packets):
- """Detect duplicate packets in the stream."""
- duplicates = []
- for i in range(len(packets) - 1):
- if packets[i] == packets[i + 1]:
- duplicates.append(i)
-
- duplicate_percentage = (len(duplicates) / len(packets)) * 100 if packets else 0
- return {
- "duplicate_count": len(duplicates),
- "duplicate_indices": duplicates,
- "duplicate_percentage": duplicate_percentage
- }
- def guess_codec(packets, file_path):
- """Attempt to identify the audio codec based on packet patterns."""
- # Extract common headers or patterns
- headers = Counter([packet[:24] for packet in packets])
- most_common_header = headers.most_common(1)[0][0] if headers else "Unknown"
-
- # Check for known codec signatures
- codec = "Unknown"
- quality = "Unknown"
-
- if "400hz-sine-wave" in file_path:
- quality = "High Quality"
- elif "400hz-square-wave" in file_path:
- quality = "High Quality"
- elif "audio-stream" in file_path:
- quality = "Normal Quality"
-
- # Since we know the system uses the LAME encoder (binary shipped with software)
- if most_common_header.startswith("4d454c0409010"):
- codec = "LAME MP3 (packaged in MEL Audio Format)"
-
- # MP3 frame analysis
- mp3_frame_sync_count = 0
- potential_bitrate = None
- potential_sample_rate = None
-
- # Check each packet for MP3 headers (starting with 0xFF 0xFB for MPEG-1 Layer 3)
- for packet in packets[:min(10, len(packets))]: # Check first 10 packets
- data_portion = packet[28:-4] # Skip header and checksum
-
- # Look for MP3 frame sync patterns
- sync_positions = [m.start() for m in re.finditer(r'fffb', data_portion)]
- if sync_positions:
- mp3_frame_sync_count += len(sync_positions)
-
- # Try to extract bitrate and sample rate from first valid header
- for pos in sync_positions:
- if pos + 4 <= len(data_portion):
- try:
- header_bytes = bytes.fromhex(data_portion[pos:pos+8])
- # Extract bits 16-19 for bitrate index (0-based)
- bitrate_index = (header_bytes[2] >> 4) & 0x0F
- # Extract bits 20-21 for sample rate index
- sample_rate_index = (header_bytes[2] >> 2) & 0x03
-
- # MPEG-1 Layer 3 bitrate table (kbps): 0 is free format
- bitrates = [0, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160, 192, 224, 256, 320]
- # MPEG-1 sample rates: 44100, 48000, 32000 Hz
- sample_rates = [44100, 48000, 32000, 0] # 0 is reserved
-
- if bitrate_index > 0 and sample_rate_index < 3: # Valid indices
- potential_bitrate = bitrates[bitrate_index]
- potential_sample_rate = sample_rates[sample_rate_index]
- break
- except:
- pass # Skip if unable to parse header
-
- # Evaluate if this is likely MP3 based on frame sync patterns
- mp3_likelihood = "High" if mp3_frame_sync_count > 5 else "Medium" if mp3_frame_sync_count > 0 else "Low"
-
- # Check for stream characteristics that might indicate codec/bitrate
- avg_packet_size = sum(len(p) for p in packets) / (2 * len(packets)) if packets else 0
-
- if potential_bitrate:
- codec_guess = f"LAME MP3 ({potential_bitrate}kbps)"
- elif 1000 <= avg_packet_size <= 1500:
- codec_guess = "LAME MP3 (48-64kbps)"
- elif avg_packet_size > 1500:
- codec_guess = "LAME MP3 (96-128kbps or higher)"
- else:
- codec_guess = "LAME MP3 (low bitrate)"
-
- return {
- "likely_codec": codec,
- "quality_setting": quality,
- "most_common_header": most_common_header,
- "codec_guess_from_size": codec_guess,
- "average_packet_size_bytes": avg_packet_size,
- "mp3_frame_sync_found": mp3_frame_sync_count > 0,
- "mp3_likelihood": mp3_likelihood,
- "detected_bitrate_kbps": potential_bitrate,
- "detected_sample_rate_hz": potential_sample_rate
- }
- def detect_repetition_pattern(packets):
- """Analyze if packets are sent in repeating patterns (beyond simple duplication)."""
- if len(packets) < 4:
- return {"pattern": "Not enough packets to detect pattern"}
-
- # Check if every second packet is a repeat
- alternate_duplicates = all(packets[i] == packets[i+2] for i in range(0, len(packets)-2, 2))
-
- # Check for more complex patterns
- repeats_every_n = None
- for n in range(2, min(10, len(packets) // 2)):
- if all(packets[i] == packets[i+n] for i in range(len(packets)-n)):
- repeats_every_n = n
- break
-
- return {
- "alternating_duplicates": alternate_duplicates,
- "repeats_every_n": repeats_every_n
- }
- def extract_timestamps(packets):
- """Try to extract timestamp information from packets."""
- timestamps = []
- for i, packet in enumerate(packets):
- # This would need to be adjusted based on actual packet structure
- # Assuming timestamp might be in a specific position
- potential_timestamp = packet[24:32]
- try:
- # Try to interpret as a 32-bit timestamp
- ts_value = int(potential_timestamp, 16)
- timestamps.append(ts_value)
- except:
- timestamps.append(None)
-
- return timestamps
- def calculate_total_duration(packets, sample_rate=44100):
- """Estimate total audio duration based on packet analysis."""
- # This is a rough estimation and would need adjustment based on the actual codec
- if not packets:
- return 0
-
- # For MP3, we'll use a different approach since we now know it's LAME MP3
- # Assuming each packet contains a fixed number of samples
- samples_per_frame = 1152 # Standard for MP3
-
- # Count potential MP3 frames in the data
- frame_count = 0
- for packet in packets:
- data_portion = packet[28:-4] # Skip header and checksum
- # Look for MP3 frame sync patterns (0xFF 0xFB for MPEG-1 Layer 3)
- sync_positions = [m.start() for m in re.finditer(r'fffb', data_portion)]
- frame_count += len(sync_positions)
-
- # If we can't detect frames, fallback to packet-based estimation
- if frame_count == 0:
- # Total unique packets as a conservative estimate
- unique_packets = len(set(packets))
- # Estimate one frame per packet (conservative)
- frame_count = unique_packets
-
- # Estimate duration
- total_samples = frame_count * samples_per_frame
- duration_seconds = total_samples / sample_rate
-
- return duration_seconds
- def analyze_audio_stream(file_path):
- """Complete analysis of an audio stream file."""
- hex_stream = parse_hex_stream(file_path)
- if not hex_stream:
- return {"error": "Failed to parse hex stream"}
-
- packets = identify_packets(hex_stream)
- if not packets:
- return {"error": "No valid packets identified"}
-
- packet_analyses = [analyze_packet_structure(p) for p in packets]
- packet_lengths = [p["total_bytes"] for p in packet_analyses]
-
- # Group by packet lengths to detect patterns
- length_count = Counter(packet_lengths)
- most_common_lengths = length_count.most_common(3)
-
- duplicates = detect_duplicates(packets)
- codec_info = guess_codec(packets, file_path)
- repetition = detect_repetition_pattern(packets)
- timestamps = extract_timestamps(packets)
-
- # Use detected sample rate if available, otherwise default to 44100
- sample_rate = codec_info.get("detected_sample_rate_hz", 44100)
- duration = calculate_total_duration(packets, sample_rate)
-
- # Analyze duplicated packets pattern
- pairs = []
- for i in range(0, len(packets)-1, 2):
- if i+1 < len(packets):
- are_identical = packets[i] == packets[i+1]
- pairs.append(are_identical)
-
- pairs_percentage = sum(pairs)/len(pairs)*100 if pairs else 0
-
- # Extract LAME tag info if present for VBR and encoding quality
- lame_version = None
- lame_tag_found = False
- vbr_method = None
-
- # Look for LAME tag in first few packets
- for packet in packets[:min(5, len(packets))]:
- data_portion = packet[28:-4] # Skip header and checksum
- # Look for "LAME" or "Lavf" strings in hex
- if "4c414d45" in data_portion.lower(): # "LAME" in hex
- lame_tag_found = True
- # Additional LAME tag parsing could be added here
- elif "4c617666" in data_portion.lower(): # "Lavf" in hex (LAVF container format)
- lame_tag_found = True
-
- return {
- "file_name": os.path.basename(file_path),
- "total_packets": len(packets),
- "unique_packets": len(set(packets)),
- "packet_lengths": most_common_lengths,
- "average_packet_length": sum(packet_lengths) / len(packet_lengths) if packet_lengths else 0,
- "duplicates": duplicates,
- "codec_info": codec_info,
- "repetition_pattern": repetition,
- "timestamp_pattern": "Available" if any(timestamps) else "Not found",
- "estimated_duration_seconds": duration,
- "paired_packet_pattern": f"{pairs_percentage:.1f}% of packets appear in identical pairs",
- "lame_tag_found": lame_tag_found
- }
- def main():
- if len(sys.argv) < 2:
- print("Usage: python audio_analyzer.py <audio_file.txt> [audio_file2.txt] ...")
- return
-
- for file_path in sys.argv[1:]:
- print(f"\nAnalyzing: {file_path}")
- print("-" * 50)
-
- analysis = analyze_audio_stream(file_path)
-
- if "error" in analysis:
- print(f"Error: {analysis['error']}")
- continue
-
- print(f"File: {analysis['file_name']}")
- print(f"Total packets: {analysis['total_packets']}")
- print(f"Unique packets: {analysis['unique_packets']}")
- print(f"Most common packet lengths (bytes): {analysis['packet_lengths']}")
- print(f"Average packet length: {analysis['average_packet_length']:.2f} bytes")
- print(f"Duplicates: {analysis['duplicates']['duplicate_count']} ({analysis['duplicates']['duplicate_percentage']:.1f}%)")
- print(f"Likely codec: {analysis['codec_info']['likely_codec']}")
- print(f"Quality setting: {analysis['codec_info']['quality_setting']}")
- print(f"Codec estimate: {analysis['codec_info']['codec_guess_from_size']}")
- print(f"MP3 likelihood: {analysis['codec_info'].get('mp3_likelihood', 'Unknown')}")
-
- if analysis['codec_info'].get('detected_bitrate_kbps'):
- print(f"Detected bitrate: {analysis['codec_info']['detected_bitrate_kbps']} kbps")
- if analysis['codec_info'].get('detected_sample_rate_hz'):
- print(f"Detected sample rate: {analysis['codec_info']['detected_sample_rate_hz']} Hz")
-
- print(f"LAME tag found: {'Yes' if analysis.get('lame_tag_found', False) else 'No'}")
- print(f"Repetition pattern: {analysis['repetition_pattern']}")
- print(f"Estimated duration: {analysis['estimated_duration_seconds']:.2f} seconds")
- print(f"Packet pairing: {analysis['paired_packet_pattern']}")
- if __name__ == "__main__":
- main()
|