broadcast.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. import socket
  2. import sys
  3. import struct
  4. import time
  5. import xml.etree.ElementTree as ET
  6. import threading
  7. import optparse
  8. from packet import Packet, CMD, itos
  9. parser = optparse.OptionParser()
  10. parser.add_option('-t', '--test', dest='test', action='store_true', help='Play a test tone (440, 880) on all clients in sequence (the last overlaps with the first of the next)')
  11. parser.add_option('-T', '--sync-test', dest='sync_test', action='store_true', help='Don\'t wait for clients to play tones properly--have them all test tone at the same time')
  12. parser.add_option('-q', '--quit', dest='quit', action='store_true', help='Instruct all clients to quit')
  13. parser.add_option('-f', '--factor', dest='factor', type='float', default=1.0, help='Rescale time by this factor (0<f<1 are faster; 0.5 is twice the speed, 2 is half)')
  14. parser.add_option('-r', '--route', dest='routes', action='append', help='Add a routing directive (see --route-help)')
  15. parser.add_option('-v', '--verbose', dest='verbose', action='store_true', help='Be verbose; dump events and actual time (can slow down performance!)')
  16. parser.add_option('--help-routes', dest='help_routes', action='store_true', help='Show help about routing directives')
  17. parser.set_defaults(routes=[])
  18. options, args = parser.parse_args()
  19. if options.help_routes:
  20. print '''Routes are a way of either exclusively or mutually binding certain streams to certain playback clients. They are especially fitting in heterogenous environments where some clients will outperform others in certain pitches or with certain parts.
  21. Routes are fully specified by:
  22. -The attribute to be routed on (either type "T", or UID "U")
  23. -The value of that attribute
  24. -The exclusivity of that route ("+" for inclusive, "-" for exclusive)
  25. -The stream group to be routed there.
  26. The syntax for that specification resembles the following:
  27. broadcast.py -r U:bass=+bass -r U:treble1,U:treble2=+treble -r T:BEEP=-beeps,-trk3,-trk5
  28. The specifier consists of a comma-separated list of attribute-colon-value pairs, followed by an equal sign. After this is a comma-separated list of exclusivities paired with the name of a stream group as specified in the file. The above example shows that stream groups "bass", "treble", and "beeps" will be routed to clients with UID "bass", "treble", and TYPE "BEEP" respectively. Additionally, TYPE "BEEP" will receive tracks 4 and 6 (indices 3 and 5) of the MIDI file (presumably split with -T), and that these three groups are exclusively to be routed to TYPE "BEEP" clients only (the broadcaster will drop the stream if no more are available), as opposed to the preference of the bass and treble groups, which may be routed onto other stream clients if they are available.'''
  29. exit()
  30. PORT = 13676
  31. factor = options.factor
  32. print 'Factor:', factor
  33. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  34. s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
  35. clients = []
  36. uid_groups = {}
  37. type_groups = {}
  38. s.sendto(str(Packet(CMD.PING)), ('255.255.255.255', PORT))
  39. s.settimeout(0.5)
  40. try:
  41. while True:
  42. data, src = s.recvfrom(4096)
  43. clients.append(src)
  44. except socket.timeout:
  45. pass
  46. print 'Clients:'
  47. for cl in clients:
  48. print cl,
  49. s.sendto(str(Packet(CMD.CAPS)), cl)
  50. data, _ = s.recvfrom(4096)
  51. pkt = Packet.FromStr(data)
  52. print 'ports', pkt.data[0],
  53. tp = itos(pkt.data[1])
  54. print 'type', tp,
  55. uid = ''.join([itos(i) for i in pkt.data[2:]]).rstrip('\x00')
  56. print 'uid', uid
  57. if uid == '':
  58. uid = None
  59. uid_groups.setdefault(uid, []).append(cl)
  60. type_groups.setdefault(tp, []).append(cl)
  61. if options.test:
  62. s.sendto(str(Packet(CMD.PLAY, 0, 250000, 440, 255)), cl)
  63. if not options.sync_test:
  64. time.sleep(0.25)
  65. s.sendto(str(Packet(CMD.PLAY, 0, 250000, 880, 255)), cl)
  66. if options.quit:
  67. s.sendto(str(Packet(CMD.QUIT)), cl)
  68. if options.test and options.sync_test:
  69. time.sleep(0.25)
  70. for cl in clients:
  71. s.sendto(str(Packet(CMD.PLAY, 0, 250000, 880, 255)), cl)
  72. if options.test or options.quit:
  73. print uid_groups
  74. print type_groups
  75. exit()
  76. try:
  77. iv = ET.parse(args[0]).getroot()
  78. except IOError:
  79. print 'Bad file'
  80. exit()
  81. notestreams = iv.findall("./streams/stream[@type='ns']")
  82. groups = set([ns.get('group') for ns in notestreams if 'group' in ns.keys()])
  83. print len(notestreams), 'notestreams'
  84. print len(clients), 'clients'
  85. print len(groups), 'groups'
  86. class Route(object):
  87. def __init__(self, fattr, fvalue, group, excl=False):
  88. if fattr == 'U':
  89. self.map = uid_groups
  90. elif fattr == 'T':
  91. self.map = type_groups
  92. else:
  93. raise ValueError('Not a valid attribute specifier: %r'%(fattr,))
  94. self.value = fvalue
  95. if group not in groups:
  96. raise ValueError('Not a present group: %r'%(group,))
  97. self.group = group
  98. self.excl = excl
  99. @classmethod
  100. def Parse(cls, s):
  101. fspecs, _, grpspecs = map(lambda x: x.strip(), s.partition('='))
  102. fpairs = []
  103. ret = []
  104. for fspec in [i.strip() for i in fspecs.split(',')]:
  105. fattr, _, fvalue = map(lambda x: x.strip(), fspec.partition(':'))
  106. fpairs.append((fattr, fvalue))
  107. for part in [i.strip() for i in grpspecs.split(',')]:
  108. for fattr, fvalue in fpairs:
  109. if part[0] == '+':
  110. ret.append(Route(fattr, fvalue, part[1:], False))
  111. elif part[0] == '-':
  112. ret.append(Route(fattr, fvalue, part[1:], True))
  113. else:
  114. raise ValueError('Not an exclusivity: %r'%(part[0],))
  115. return ret
  116. def __repr__(self):
  117. return '<Route of %r to %s:%s>'%(self.group, ('U' if self.map is uid_groups else 'T'), self.value)
  118. class RouteSet(object):
  119. def __init__(self, clis=None):
  120. if clis is None:
  121. clis = clients
  122. self.clients = clis
  123. self.routes = []
  124. def Route(self, stream):
  125. grp = stream.get('group')
  126. if options.verbose:
  127. print 'Routing', grp, '...'
  128. excl = False
  129. for route in self.routes:
  130. if route.group == grp:
  131. if options.verbose:
  132. print 'Matches route', route
  133. excl = excl or route.excl
  134. matches = filter(lambda x, route=route: route.Apply(x), self.clients)
  135. if matches:
  136. if options.verbose:
  137. print 'Using client', matches[0]
  138. self.clients.remove(matches[0])
  139. return matches[0]
  140. print 'No matches, moving on...'
  141. if excl:
  142. if options.verbose:
  143. print 'Exclusively routed, no route matched.'
  144. return None
  145. if not self.clients:
  146. if options.verbose:
  147. print 'Out of clients, no route matched.'
  148. return None
  149. cli = self.clients.pop(0)
  150. if options.verbose:
  151. print 'Default route to', cli
  152. return cli
  153. routeset = RouteSet()
  154. for rspec in options.routes:
  155. routeset.routes.extend(Route.Parse(rspec))
  156. if options.verbose:
  157. print 'All routes:'
  158. for route in routeset.routes:
  159. print route
  160. class NSThread(threading.Thread):
  161. def wait_for(self, t):
  162. if t <= 0:
  163. return
  164. time.sleep(t)
  165. def run(self):
  166. nsq, cl = self._Thread__args
  167. for note in nsq:
  168. ttime = float(note.get('time'))
  169. pitch = int(note.get('pitch'))
  170. vel = int(note.get('vel'))
  171. dur = factor*float(note.get('dur'))
  172. while time.time() - BASETIME < factor*ttime:
  173. self.wait_for(factor*ttime - (time.time() - BASETIME))
  174. s.sendto(str(Packet(CMD.PLAY, int(dur), int((dur*1000000)%1000000), int(440.0 * 2**((pitch-69)/12.0)), vel*2)), cl)
  175. if options.verbose:
  176. print (time.time() - BASETIME), cl, ': PLAY', pitch, dur, vel
  177. self.wait_for(dur - ((time.time() - BASETIME) - factor*ttime))
  178. if options.verbose:
  179. print '% 6.5f'%(time.time() - BASETIME,), cl, ': DONE'
  180. threads = []
  181. for ns in notestreams:
  182. cli = routeset.Route(ns)
  183. if cli:
  184. nsq = ns.findall('note')
  185. threads.append(NSThread(args=(nsq, clients.pop(0))))
  186. BASETIME = time.time()
  187. for thr in threads:
  188. thr.start()
  189. for thr in threads:
  190. thr.join()