bodet_psa_cli.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. #!/usr/bin/env python3
  2. import socket
  3. import argparse
  4. import time
  5. import sys
  6. from typing import List, Dict, Tuple, Optional
  7. # Default configuration values
  8. DEFAULT_MULTICAST_ADDR = "239.192.0.1"
  9. DEFAULT_PORT = 1681
  10. DEFAULT_TTL = 2
  11. DEFAULT_VOLUME = 3
  12. DEFAULT_REPEATS = 0 # 0 means infinite
  13. # Command codes
  14. CMD_MELODY = "3001"
  15. CMD_ALARM = "5001"
  16. CMD_STOP = "5002"
  17. # Header constants
  18. MEL_HEADER = "4d454c" # "MEL" in hex
  19. START_MARKER = "0100"
  20. END_MARKER = "0100"
  21. # Maps for user-friendly selection
  22. MELODY_MAP = {
  23. # Standard melodies
  24. "Westminster": 1,
  25. "BimBam": 2,
  26. "Gong": 3,
  27. "CanCan": 4,
  28. "SingingBird": 5,
  29. "Violin": 6,
  30. "Trumpet": 7,
  31. "Piano": 8,
  32. "Telephone": 9,
  33. "Circus": 10,
  34. # Add more melodies as needed
  35. }
  36. # Common functions (borrowed from mp3_psa_streamer.py)
  37. def compute_psa_checksum(data: bytes) -> bytes:
  38. """Compute PSA checksum for packet data."""
  39. var_e = 0x0000 # Starting seed value
  40. for i in range(len(data)):
  41. var_e ^= (data[i] + i) & 0xFFFF
  42. return var_e.to_bytes(2, 'big') # 2-byte checksum, big-endian
  43. def setup_multicast_socket(ttl=DEFAULT_TTL):
  44. """Set up a socket for sending multicast UDP."""
  45. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
  46. sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)
  47. return sock
  48. def send_multicast_packet(sock, packet, addr, port, send_twice=True):
  49. """Send a packet to the multicast group, optionally twice."""
  50. try:
  51. sock.sendto(packet, (addr, port))
  52. if send_twice:
  53. time.sleep(0.01) # Small delay between duplicates
  54. sock.sendto(packet, (addr, port))
  55. return True
  56. except Exception as e:
  57. print(f"Error sending packet: {e}")
  58. return False
  59. # New functions for the CLI tool
  60. def encode_zones(zones: List[int]) -> str:
  61. """
  62. Encode zone numbers (1-100) into a 12-byte hex string (24 characters).
  63. Each byte represents 8 zones, with MSB being the lowest zone number.
  64. """
  65. # Initialize 96 bits (12 bytes) of zeros
  66. zone_bits = [0] * 96
  67. for zone in zones:
  68. if 1 <= zone <= 96: # Ensure zone is in valid range
  69. # Calculate bit position (zero-indexed)
  70. # Zone 1 = bit 0, Zone 2 = bit 1, etc.
  71. bit_pos = zone - 1
  72. # Set the bit
  73. zone_bits[bit_pos] = 1
  74. # Convert bits to bytes
  75. zone_bytes = bytearray()
  76. for i in range(0, 96, 8):
  77. byte_val = 0
  78. for j in range(8):
  79. if i + j < 96: # Avoid index out of range
  80. byte_val |= (zone_bits[i + j] << (7 - j))
  81. zone_bytes.append(byte_val)
  82. # Convert to hex string
  83. return zone_bytes.hex()
  84. def create_command_packet(
  85. cmd: str,
  86. zones: List[int],
  87. sequence: int = 0,
  88. melody_id: Optional[int] = None,
  89. volume: Optional[int] = None,
  90. repeats: Optional[int] = None
  91. ) -> bytes:
  92. """Create a PSA command packet."""
  93. # Convert sequence to bytes (2 bytes, little endian with FF padding)
  94. seq_bytes = sequence.to_bytes(1, 'little') + b'\xff'
  95. # Encode zone info
  96. zone_info = bytes.fromhex(encode_zones(zones))
  97. # Command-specific handling
  98. if cmd == CMD_STOP:
  99. # For stop command, use all-zones pattern regardless of input
  100. zone_info = bytes.fromhex("FFFFFFFFFFFFFFFFFFFF")
  101. metadata = bytes.fromhex("0F")
  102. # No end marker for stop commands
  103. command_data = bytes.fromhex(MEL_HEADER) + seq_bytes + bytes.fromhex(cmd) + zone_info + metadata
  104. else:
  105. # For melody and alarm commands
  106. if melody_id is None or volume is None or repeats is None:
  107. raise ValueError("Melody ID, volume, and repeats are required for melody/alarm commands")
  108. # Check ranges
  109. if not 1 <= melody_id <= 20:
  110. raise ValueError("Melody ID must be between 1 and 20")
  111. if not 1 <= volume <= 8:
  112. raise ValueError("Volume must be between 1 and 8")
  113. if not 0 <= repeats <= 255:
  114. raise ValueError("Repeats must be between 0 and 255")
  115. # Build metadata
  116. fixed_field = "0001" # Standard fixed field
  117. metadata = bytes.fromhex(fixed_field) + \
  118. volume.to_bytes(1, 'big') + \
  119. repeats.to_bytes(1, 'big') + \
  120. b'\x01' + \
  121. melody_id.to_bytes(1, 'big') + \
  122. bytes.fromhex(END_MARKER)
  123. command_data = bytes.fromhex(MEL_HEADER) + \
  124. seq_bytes + \
  125. bytes.fromhex(cmd) + \
  126. zone_info + \
  127. metadata
  128. # Calculate payload length (excluding header and length field)
  129. # We need to add this back into the actual packet
  130. length = len(command_data) - 3 # subtract "MEL" header
  131. length_bytes = length.to_bytes(2, 'big')
  132. # Re-build with correct length
  133. packet = bytes.fromhex(MEL_HEADER) + length_bytes + \
  134. bytes.fromhex(START_MARKER) + seq_bytes + \
  135. bytes.fromhex(cmd) + zone_info + \
  136. (metadata if cmd != CMD_STOP else bytes.fromhex("0F"))
  137. # Calculate and append checksum
  138. checksum = compute_psa_checksum(packet)
  139. return packet + checksum
  140. def list_melodies():
  141. """Display available melodies."""
  142. print("\nAvailable Melody Names:")
  143. print("-" * 25)
  144. for name, id in sorted(MELODY_MAP.items(), key=lambda x: x[1]):
  145. print(f"{id:2d}: {name}")
  146. print("\nNote: You can also enter melody ID numbers directly.")
  147. def parse_zones(zone_arg: str) -> List[int]:
  148. """Parse zone argument into a list of zone numbers."""
  149. zones = []
  150. # Handle empty case
  151. if not zone_arg:
  152. return zones
  153. # Split by comma or space
  154. parts = zone_arg.replace(',', ' ').split()
  155. for part in parts:
  156. # Handle ranges like 1-5
  157. if '-' in part:
  158. try:
  159. start, end = map(int, part.split('-'))
  160. if 1 <= start <= end <= 96:
  161. zones.extend(range(start, end + 1))
  162. else:
  163. print(f"Warning: Invalid zone range {part} (must be 1-96)")
  164. except ValueError:
  165. print(f"Warning: Could not parse zone range {part}")
  166. # Handle single numbers
  167. else:
  168. try:
  169. zone = int(part)
  170. if 1 <= zone <= 96:
  171. zones.append(zone)
  172. else:
  173. print(f"Warning: Zone {zone} out of range (must be 1-96)")
  174. except ValueError:
  175. print(f"Warning: Could not parse zone {part}")
  176. # Remove duplicates and sort
  177. return sorted(set(zones))
  178. def parse_melody(melody_arg: str) -> int:
  179. """Parse melody argument into a melody ID."""
  180. # First check if it's a direct integer
  181. try:
  182. melody_id = int(melody_arg)
  183. if 1 <= melody_id <= 20:
  184. return melody_id
  185. else:
  186. print(f"Warning: Melody ID {melody_id} out of range, using default (1)")
  187. return 1
  188. except ValueError:
  189. # Try to match by name (case insensitive)
  190. melody_name = melody_arg.strip().lower()
  191. for name, id in MELODY_MAP.items():
  192. if name.lower() == melody_name:
  193. return id
  194. print(f"Warning: Unknown melody '{melody_arg}', using default (1)")
  195. return 1
  196. def main():
  197. # Set up the argument parser
  198. parser = argparse.ArgumentParser(description="Send commands to Bodet Harmony PSA system")
  199. # Common arguments
  200. parser.add_argument("-a", "--addr", default=DEFAULT_MULTICAST_ADDR,
  201. help=f"Multicast address (default: {DEFAULT_MULTICAST_ADDR})")
  202. parser.add_argument("-p", "--port", type=int, default=DEFAULT_PORT,
  203. help=f"UDP port (default: {DEFAULT_PORT})")
  204. parser.add_argument("-z", "--zones", default="8",
  205. help="Zones to target (e.g., '1,2,3' or '1-5 8')")
  206. parser.add_argument("-s", "--single", action="store_true",
  207. help="Send packet only once (default: send twice)")
  208. parser.add_argument("-l", "--list-melodies", action="store_true",
  209. help="List available melody names")
  210. # Create subparsers for different commands
  211. subparsers = parser.add_subparsers(dest="command", help="Command to send")
  212. # Melody command
  213. melody_parser = subparsers.add_parser("melody", help="Play a melody")
  214. melody_parser.add_argument("-m", "--melody", default="Westminster",
  215. help="Melody name or ID (1-20)")
  216. melody_parser.add_argument("-v", "--volume", type=int, default=DEFAULT_VOLUME,
  217. help=f"Volume level (1-8, default: {DEFAULT_VOLUME})")
  218. melody_parser.add_argument("-r", "--repeats", type=int, default=DEFAULT_REPEATS,
  219. help=f"Number of repeats (0=infinite, default: {DEFAULT_REPEATS})")
  220. # Alarm command
  221. alarm_parser = subparsers.add_parser("alarm", help="Play an alarm")
  222. alarm_parser.add_argument("-m", "--melody", default="Westminster",
  223. help="Alarm melody name or ID (1-20)")
  224. alarm_parser.add_argument("-v", "--volume", type=int, default=DEFAULT_VOLUME,
  225. help=f"Volume level (1-8, default: {DEFAULT_VOLUME})")
  226. alarm_parser.add_argument("-r", "--repeats", type=int, default=DEFAULT_REPEATS,
  227. help=f"Number of repeats (0=infinite, default: {DEFAULT_REPEATS})")
  228. # Stop command
  229. subparsers.add_parser("stop", help="Stop all audio")
  230. # Parse arguments
  231. args = parser.parse_args()
  232. # Just list melodies if requested
  233. if args.list_melodies:
  234. list_melodies()
  235. return
  236. # Ensure a command was specified
  237. if not args.command:
  238. parser.print_help()
  239. return
  240. # Set up the socket
  241. sock = setup_multicast_socket()
  242. try:
  243. # Parse zones
  244. zones = parse_zones(args.zones)
  245. if not zones and args.command != "stop":
  246. print("No valid zones specified, using zone 8")
  247. zones = [8]
  248. # Command-specific processing
  249. if args.command == "melody":
  250. melody_id = parse_melody(args.melody)
  251. cmd_code = CMD_MELODY
  252. packet = create_command_packet(
  253. cmd_code, zones,
  254. melody_id=melody_id,
  255. volume=args.volume,
  256. repeats=args.repeats
  257. )
  258. print(f"Sending melody {melody_id} to zones {zones}, volume={args.volume}, repeats={args.repeats}")
  259. elif args.command == "alarm":
  260. melody_id = parse_melody(args.melody)
  261. cmd_code = CMD_ALARM
  262. packet = create_command_packet(
  263. cmd_code, zones,
  264. melody_id=melody_id,
  265. volume=args.volume,
  266. repeats=args.repeats
  267. )
  268. print(f"Sending alarm {melody_id} to zones {zones}, volume={args.volume}, repeats={args.repeats}")
  269. elif args.command == "stop":
  270. cmd_code = CMD_STOP
  271. packet = create_command_packet(cmd_code, zones)
  272. print("Sending stop command to all zones")
  273. # Send the packet
  274. success = send_multicast_packet(sock, packet, args.addr, args.port, not args.single)
  275. if success:
  276. print("Command sent successfully!")
  277. else:
  278. print("Failed to send command.")
  279. except Exception as e:
  280. print(f"Error: {e}")
  281. return 1
  282. if __name__ == "__main__":
  283. sys.exit(main() or 0)