#!/usr/bin/env python3 import socket import argparse import time import sys from typing import List, Dict, Tuple, Optional # Default configuration values DEFAULT_MULTICAST_ADDR = "239.192.0.1" DEFAULT_PORT = 1681 DEFAULT_TTL = 2 DEFAULT_VOLUME = 3 DEFAULT_REPEATS = 0 # 0 means infinite # Command codes CMD_MELODY = "3001" CMD_ALARM = "5001" CMD_STOP = "5002" # Header constants MEL_HEADER = "4d454c" # "MEL" in hex START_MARKER = "0100" END_MARKER = "0100" # Maps for user-friendly selection MELODY_MAP = { # Standard melodies "Westminster": 1, "BimBam": 2, "Gong": 3, "CanCan": 4, "SingingBird": 5, "Violin": 6, "Trumpet": 7, "Piano": 8, "Telephone": 9, "Circus": 10, # Add more melodies as needed } # Common functions (borrowed from mp3_psa_streamer.py) def compute_psa_checksum(data: bytes) -> bytes: """Compute PSA checksum for packet data.""" var_e = 0x0000 # Starting seed value for i in range(len(data)): var_e ^= (data[i] + i) & 0xFFFF return var_e.to_bytes(2, 'big') # 2-byte checksum, big-endian def setup_multicast_socket(ttl=DEFAULT_TTL): """Set up a socket for sending multicast UDP.""" sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl) return sock def send_multicast_packet(sock, packet, addr, port, send_twice=True): """Send a packet to the multicast group, optionally twice.""" try: sock.sendto(packet, (addr, port)) if send_twice: time.sleep(0.01) # Small delay between duplicates sock.sendto(packet, (addr, port)) return True except Exception as e: print(f"Error sending packet: {e}") return False # New functions for the CLI tool def encode_zones(zones: List[int]) -> str: """ Encode zone numbers (1-100) into a 12-byte hex string (24 characters). Each byte represents 8 zones, with MSB being the lowest zone number. """ # Initialize 96 bits (12 bytes) of zeros zone_bits = [0] * 96 for zone in zones: if 1 <= zone <= 96: # Ensure zone is in valid range # Calculate bit position (zero-indexed) # Zone 1 = bit 0, Zone 2 = bit 1, etc. bit_pos = zone - 1 # Set the bit zone_bits[bit_pos] = 1 # Convert bits to bytes zone_bytes = bytearray() for i in range(0, 96, 8): byte_val = 0 for j in range(8): if i + j < 96: # Avoid index out of range byte_val |= (zone_bits[i + j] << (7 - j)) zone_bytes.append(byte_val) # Convert to hex string return zone_bytes.hex() def create_command_packet( cmd: str, zones: List[int], sequence: int = 0, melody_id: Optional[int] = None, volume: Optional[int] = None, repeats: Optional[int] = None ) -> bytes: """Create a PSA command packet.""" # Convert sequence to bytes (2 bytes, little endian with FF padding) seq_bytes = sequence.to_bytes(1, 'little') + b'\xff' # Encode zone info zone_info = bytes.fromhex(encode_zones(zones)) # Command-specific handling if cmd == CMD_STOP: # For stop command, use all-zones pattern regardless of input zone_info = bytes.fromhex("FFFFFFFFFFFFFFFFFFFF") metadata = bytes.fromhex("0F") # No end marker for stop commands command_data = bytes.fromhex(MEL_HEADER) + seq_bytes + bytes.fromhex(cmd) + zone_info + metadata else: # For melody and alarm commands if melody_id is None or volume is None or repeats is None: raise ValueError("Melody ID, volume, and repeats are required for melody/alarm commands") # Check ranges if not 1 <= melody_id <= 20: raise ValueError("Melody ID must be between 1 and 20") if not 1 <= volume <= 8: raise ValueError("Volume must be between 1 and 8") if not 0 <= repeats <= 255: raise ValueError("Repeats must be between 0 and 255") # Build metadata fixed_field = "0001" # Standard fixed field metadata = bytes.fromhex(fixed_field) + \ volume.to_bytes(1, 'big') + \ repeats.to_bytes(1, 'big') + \ b'\x01' + \ melody_id.to_bytes(1, 'big') + \ bytes.fromhex(END_MARKER) command_data = bytes.fromhex(MEL_HEADER) + \ seq_bytes + \ bytes.fromhex(cmd) + \ zone_info + \ metadata # Calculate payload length (excluding header and length field) # We need to add this back into the actual packet length = len(command_data) - 3 # subtract "MEL" header length_bytes = length.to_bytes(2, 'big') # Re-build with correct length packet = bytes.fromhex(MEL_HEADER) + length_bytes + \ bytes.fromhex(START_MARKER) + seq_bytes + \ bytes.fromhex(cmd) + zone_info + \ (metadata if cmd != CMD_STOP else bytes.fromhex("0F")) # Calculate and append checksum checksum = compute_psa_checksum(packet) return packet + checksum def list_melodies(): """Display available melodies.""" print("\nAvailable Melody Names:") print("-" * 25) for name, id in sorted(MELODY_MAP.items(), key=lambda x: x[1]): print(f"{id:2d}: {name}") print("\nNote: You can also enter melody ID numbers directly.") def parse_zones(zone_arg: str) -> List[int]: """Parse zone argument into a list of zone numbers.""" zones = [] # Handle empty case if not zone_arg: return zones # Split by comma or space parts = zone_arg.replace(',', ' ').split() for part in parts: # Handle ranges like 1-5 if '-' in part: try: start, end = map(int, part.split('-')) if 1 <= start <= end <= 96: zones.extend(range(start, end + 1)) else: print(f"Warning: Invalid zone range {part} (must be 1-96)") except ValueError: print(f"Warning: Could not parse zone range {part}") # Handle single numbers else: try: zone = int(part) if 1 <= zone <= 96: zones.append(zone) else: print(f"Warning: Zone {zone} out of range (must be 1-96)") except ValueError: print(f"Warning: Could not parse zone {part}") # Remove duplicates and sort return sorted(set(zones)) def parse_melody(melody_arg: str) -> int: """Parse melody argument into a melody ID.""" # First check if it's a direct integer try: melody_id = int(melody_arg) if 1 <= melody_id <= 20: return melody_id else: print(f"Warning: Melody ID {melody_id} out of range, using default (1)") return 1 except ValueError: # Try to match by name (case insensitive) melody_name = melody_arg.strip().lower() for name, id in MELODY_MAP.items(): if name.lower() == melody_name: return id print(f"Warning: Unknown melody '{melody_arg}', using default (1)") return 1 def main(): # Set up the argument parser parser = argparse.ArgumentParser(description="Send commands to Bodet Harmony PSA system") # Common arguments parser.add_argument("-a", "--addr", default=DEFAULT_MULTICAST_ADDR, help=f"Multicast address (default: {DEFAULT_MULTICAST_ADDR})") parser.add_argument("-p", "--port", type=int, default=DEFAULT_PORT, help=f"UDP port (default: {DEFAULT_PORT})") parser.add_argument("-z", "--zones", default="8", help="Zones to target (e.g., '1,2,3' or '1-5 8')") parser.add_argument("-s", "--single", action="store_true", help="Send packet only once (default: send twice)") parser.add_argument("-l", "--list-melodies", action="store_true", help="List available melody names") # Create subparsers for different commands subparsers = parser.add_subparsers(dest="command", help="Command to send") # Melody command melody_parser = subparsers.add_parser("melody", help="Play a melody") melody_parser.add_argument("-m", "--melody", default="Westminster", help="Melody name or ID (1-20)") melody_parser.add_argument("-v", "--volume", type=int, default=DEFAULT_VOLUME, help=f"Volume level (1-8, default: {DEFAULT_VOLUME})") melody_parser.add_argument("-r", "--repeats", type=int, default=DEFAULT_REPEATS, help=f"Number of repeats (0=infinite, default: {DEFAULT_REPEATS})") # Alarm command alarm_parser = subparsers.add_parser("alarm", help="Play an alarm") alarm_parser.add_argument("-m", "--melody", default="Westminster", help="Alarm melody name or ID (1-20)") alarm_parser.add_argument("-v", "--volume", type=int, default=DEFAULT_VOLUME, help=f"Volume level (1-8, default: {DEFAULT_VOLUME})") alarm_parser.add_argument("-r", "--repeats", type=int, default=DEFAULT_REPEATS, help=f"Number of repeats (0=infinite, default: {DEFAULT_REPEATS})") # Stop command subparsers.add_parser("stop", help="Stop all audio") # Parse arguments args = parser.parse_args() # Just list melodies if requested if args.list_melodies: list_melodies() return # Ensure a command was specified if not args.command: parser.print_help() return # Set up the socket sock = setup_multicast_socket() try: # Parse zones zones = parse_zones(args.zones) if not zones and args.command != "stop": print("No valid zones specified, using zone 8") zones = [8] # Command-specific processing if args.command == "melody": melody_id = parse_melody(args.melody) cmd_code = CMD_MELODY packet = create_command_packet( cmd_code, zones, melody_id=melody_id, volume=args.volume, repeats=args.repeats ) print(f"Sending melody {melody_id} to zones {zones}, volume={args.volume}, repeats={args.repeats}") elif args.command == "alarm": melody_id = parse_melody(args.melody) cmd_code = CMD_ALARM packet = create_command_packet( cmd_code, zones, melody_id=melody_id, volume=args.volume, repeats=args.repeats ) print(f"Sending alarm {melody_id} to zones {zones}, volume={args.volume}, repeats={args.repeats}") elif args.command == "stop": cmd_code = CMD_STOP packet = create_command_packet(cmd_code, zones) print("Sending stop command to all zones") # Send the packet success = send_multicast_packet(sock, packet, args.addr, args.port, not args.single) if success: print("Command sent successfully!") else: print("Failed to send command.") except Exception as e: print(f"Error: {e}") return 1 if __name__ == "__main__": sys.exit(main() or 0)