| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- #!/usr/bin/env python3
- import socket
- import argparse
- import time
- import sys
- from typing import List, Dict, Tuple, Optional
- # Default configuration values
- DEFAULT_MULTICAST_ADDR = "239.192.0.1"
- DEFAULT_PORT = 1681
- DEFAULT_TTL = 2
- DEFAULT_VOLUME = 3
- DEFAULT_REPEATS = 0 # 0 means infinite
- # Command codes
- CMD_MELODY = "3001"
- CMD_ALARM = "5001"
- CMD_STOP = "5002"
- # Header constants
- MEL_HEADER = "4d454c" # "MEL" in hex
- START_MARKER = "0100"
- END_MARKER = "0100"
- # Maps for user-friendly selection
- MELODY_MAP = {
- # Standard melodies
- "Westminster": 1,
- "BimBam": 2,
- "Gong": 3,
- "CanCan": 4,
- "SingingBird": 5,
- "Violin": 6,
- "Trumpet": 7,
- "Piano": 8,
- "Telephone": 9,
- "Circus": 10,
- # Add more melodies as needed
- }
- # Common functions (borrowed from mp3_psa_streamer.py)
- def compute_psa_checksum(data: bytes) -> bytes:
- """Compute PSA checksum for packet data."""
- var_e = 0x0000 # Starting seed value
- for i in range(len(data)):
- var_e ^= (data[i] + i) & 0xFFFF
- return var_e.to_bytes(2, 'big') # 2-byte checksum, big-endian
- def setup_multicast_socket(ttl=DEFAULT_TTL):
- """Set up a socket for sending multicast UDP."""
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
- sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)
- return sock
- def send_multicast_packet(sock, packet, addr, port, send_twice=True):
- """Send a packet to the multicast group, optionally twice."""
- try:
- sock.sendto(packet, (addr, port))
- if send_twice:
- time.sleep(0.01) # Small delay between duplicates
- sock.sendto(packet, (addr, port))
- return True
- except Exception as e:
- print(f"Error sending packet: {e}")
- return False
- # New functions for the CLI tool
- def encode_zones(zones: List[int]) -> str:
- """
- Encode zone numbers (1-100) into a 12-byte hex string (24 characters).
- Each byte represents 8 zones, with MSB being the lowest zone number.
- """
- # Initialize 96 bits (12 bytes) of zeros
- zone_bits = [0] * 96
-
- for zone in zones:
- if 1 <= zone <= 96: # Ensure zone is in valid range
- # Calculate bit position (zero-indexed)
- # Zone 1 = bit 0, Zone 2 = bit 1, etc.
- bit_pos = zone - 1
-
- # Set the bit
- zone_bits[bit_pos] = 1
-
- # Convert bits to bytes
- zone_bytes = bytearray()
- for i in range(0, 96, 8):
- byte_val = 0
- for j in range(8):
- if i + j < 96: # Avoid index out of range
- byte_val |= (zone_bits[i + j] << (7 - j))
- zone_bytes.append(byte_val)
-
- # Convert to hex string
- return zone_bytes.hex()
- def create_command_packet(
- cmd: str,
- zones: List[int],
- sequence: int = 0,
- melody_id: Optional[int] = None,
- volume: Optional[int] = None,
- repeats: Optional[int] = None
- ) -> bytes:
- """Create a PSA command packet."""
- # Convert sequence to bytes (2 bytes, little endian with FF padding)
- seq_bytes = sequence.to_bytes(1, 'little') + b'\xff'
-
- # Encode zone info
- zone_info = bytes.fromhex(encode_zones(zones))
-
- # Command-specific handling
- if cmd == CMD_STOP:
- # For stop command, use all-zones pattern regardless of input
- zone_info = bytes.fromhex("FFFFFFFFFFFFFFFFFFFF")
- metadata = bytes.fromhex("0F")
- # No end marker for stop commands
- command_data = bytes.fromhex(MEL_HEADER) + seq_bytes + bytes.fromhex(cmd) + zone_info + metadata
- else:
- # For melody and alarm commands
- if melody_id is None or volume is None or repeats is None:
- raise ValueError("Melody ID, volume, and repeats are required for melody/alarm commands")
-
- # Check ranges
- if not 1 <= melody_id <= 20:
- raise ValueError("Melody ID must be between 1 and 20")
- if not 1 <= volume <= 8:
- raise ValueError("Volume must be between 1 and 8")
- if not 0 <= repeats <= 255:
- raise ValueError("Repeats must be between 0 and 255")
-
- # Build metadata
- fixed_field = "0001" # Standard fixed field
- metadata = bytes.fromhex(fixed_field) + \
- volume.to_bytes(1, 'big') + \
- repeats.to_bytes(1, 'big') + \
- b'\x01' + \
- melody_id.to_bytes(1, 'big') + \
- bytes.fromhex(END_MARKER)
-
- command_data = bytes.fromhex(MEL_HEADER) + \
- seq_bytes + \
- bytes.fromhex(cmd) + \
- zone_info + \
- metadata
-
- # Calculate payload length (excluding header and length field)
- # We need to add this back into the actual packet
- length = len(command_data) - 3 # subtract "MEL" header
- length_bytes = length.to_bytes(2, 'big')
-
- # Re-build with correct length
- packet = bytes.fromhex(MEL_HEADER) + length_bytes + \
- bytes.fromhex(START_MARKER) + seq_bytes + \
- bytes.fromhex(cmd) + zone_info + \
- (metadata if cmd != CMD_STOP else bytes.fromhex("0F"))
-
- # Calculate and append checksum
- checksum = compute_psa_checksum(packet)
- return packet + checksum
- def list_melodies():
- """Display available melodies."""
- print("\nAvailable Melody Names:")
- print("-" * 25)
- for name, id in sorted(MELODY_MAP.items(), key=lambda x: x[1]):
- print(f"{id:2d}: {name}")
- print("\nNote: You can also enter melody ID numbers directly.")
- def parse_zones(zone_arg: str) -> List[int]:
- """Parse zone argument into a list of zone numbers."""
- zones = []
-
- # Handle empty case
- if not zone_arg:
- return zones
-
- # Split by comma or space
- parts = zone_arg.replace(',', ' ').split()
-
- for part in parts:
- # Handle ranges like 1-5
- if '-' in part:
- try:
- start, end = map(int, part.split('-'))
- if 1 <= start <= end <= 96:
- zones.extend(range(start, end + 1))
- else:
- print(f"Warning: Invalid zone range {part} (must be 1-96)")
- except ValueError:
- print(f"Warning: Could not parse zone range {part}")
- # Handle single numbers
- else:
- try:
- zone = int(part)
- if 1 <= zone <= 96:
- zones.append(zone)
- else:
- print(f"Warning: Zone {zone} out of range (must be 1-96)")
- except ValueError:
- print(f"Warning: Could not parse zone {part}")
-
- # Remove duplicates and sort
- return sorted(set(zones))
- def parse_melody(melody_arg: str) -> int:
- """Parse melody argument into a melody ID."""
- # First check if it's a direct integer
- try:
- melody_id = int(melody_arg)
- if 1 <= melody_id <= 20:
- return melody_id
- else:
- print(f"Warning: Melody ID {melody_id} out of range, using default (1)")
- return 1
- except ValueError:
- # Try to match by name (case insensitive)
- melody_name = melody_arg.strip().lower()
- for name, id in MELODY_MAP.items():
- if name.lower() == melody_name:
- return id
-
- print(f"Warning: Unknown melody '{melody_arg}', using default (1)")
- return 1
- def main():
- # Set up the argument parser
- parser = argparse.ArgumentParser(description="Send commands to Bodet Harmony PSA system")
-
- # Common arguments
- parser.add_argument("-a", "--addr", default=DEFAULT_MULTICAST_ADDR,
- help=f"Multicast address (default: {DEFAULT_MULTICAST_ADDR})")
- parser.add_argument("-p", "--port", type=int, default=DEFAULT_PORT,
- help=f"UDP port (default: {DEFAULT_PORT})")
- parser.add_argument("-z", "--zones", default="8",
- help="Zones to target (e.g., '1,2,3' or '1-5 8')")
- parser.add_argument("-s", "--single", action="store_true",
- help="Send packet only once (default: send twice)")
- parser.add_argument("-l", "--list-melodies", action="store_true",
- help="List available melody names")
-
- # Create subparsers for different commands
- subparsers = parser.add_subparsers(dest="command", help="Command to send")
-
- # Melody command
- melody_parser = subparsers.add_parser("melody", help="Play a melody")
- melody_parser.add_argument("-m", "--melody", default="Westminster",
- help="Melody name or ID (1-20)")
- melody_parser.add_argument("-v", "--volume", type=int, default=DEFAULT_VOLUME,
- help=f"Volume level (1-8, default: {DEFAULT_VOLUME})")
- melody_parser.add_argument("-r", "--repeats", type=int, default=DEFAULT_REPEATS,
- help=f"Number of repeats (0=infinite, default: {DEFAULT_REPEATS})")
-
- # Alarm command
- alarm_parser = subparsers.add_parser("alarm", help="Play an alarm")
- alarm_parser.add_argument("-m", "--melody", default="Westminster",
- help="Alarm melody name or ID (1-20)")
- alarm_parser.add_argument("-v", "--volume", type=int, default=DEFAULT_VOLUME,
- help=f"Volume level (1-8, default: {DEFAULT_VOLUME})")
- alarm_parser.add_argument("-r", "--repeats", type=int, default=DEFAULT_REPEATS,
- help=f"Number of repeats (0=infinite, default: {DEFAULT_REPEATS})")
-
- # Stop command
- subparsers.add_parser("stop", help="Stop all audio")
-
- # Parse arguments
- args = parser.parse_args()
-
- # Just list melodies if requested
- if args.list_melodies:
- list_melodies()
- return
-
- # Ensure a command was specified
- if not args.command:
- parser.print_help()
- return
-
- # Set up the socket
- sock = setup_multicast_socket()
-
- try:
- # Parse zones
- zones = parse_zones(args.zones)
- if not zones and args.command != "stop":
- print("No valid zones specified, using zone 8")
- zones = [8]
-
- # Command-specific processing
- if args.command == "melody":
- melody_id = parse_melody(args.melody)
- cmd_code = CMD_MELODY
- packet = create_command_packet(
- cmd_code, zones,
- melody_id=melody_id,
- volume=args.volume,
- repeats=args.repeats
- )
- print(f"Sending melody {melody_id} to zones {zones}, volume={args.volume}, repeats={args.repeats}")
-
- elif args.command == "alarm":
- melody_id = parse_melody(args.melody)
- cmd_code = CMD_ALARM
- packet = create_command_packet(
- cmd_code, zones,
- melody_id=melody_id,
- volume=args.volume,
- repeats=args.repeats
- )
- print(f"Sending alarm {melody_id} to zones {zones}, volume={args.volume}, repeats={args.repeats}")
-
- elif args.command == "stop":
- cmd_code = CMD_STOP
- packet = create_command_packet(cmd_code, zones)
- print("Sending stop command to all zones")
-
- # Send the packet
- success = send_multicast_packet(sock, packet, args.addr, args.port, not args.single)
- if success:
- print("Command sent successfully!")
- else:
- print("Failed to send command.")
-
- except Exception as e:
- print(f"Error: {e}")
- return 1
- if __name__ == "__main__":
- sys.exit(main() or 0)
|