diff options
-rw-r--r-- | host/host.py | 32 | ||||
-rw-r--r-- | host/matelight/config.py | 27 | ||||
-rw-r--r-- | host/matelight/host.py | 16 | ||||
-rw-r--r-- | host/matelight/listeners.py | 19 | ||||
-rw-r--r-- | host/matelight/queuemgr.py | 52 | ||||
-rw-r--r-- | host/matelight/renderers.py | 143 |
6 files changed, 257 insertions, 32 deletions
diff --git a/host/host.py b/host/host.py deleted file mode 100644 index b19dc7a..0000000 --- a/host/host.py +++ /dev/null @@ -1,32 +0,0 @@ -from pyusb import usb -import colorsys -import numpy as np - -CRATE_WIDTH = 5 -CRATE_HEIGHT = 4 -CRATES_X = 16 -CRATES_Y = 2 -DISPLAY_WIDTH = CRATES_X * CRATE_WIDTH -DISPLAY_HEIGHT = CRATES_Y * CRATE_HEIGHT -FRAME_SIZE = CRATE_WIDTH*CRATE_HEIGHT*3 - -dev = usb.core.find(idVendor=0x1cbe, idProduct=0x0003) - -def sendframe(framedata): - """ Send a frame to the display - - The argument contains a h * w array of 3-tuples of (r, g, b)-data - """ - def chunks(l, n): - for i in xrange(0, len(l), n): - yield l[i:i+n] - - for cx, cy in itertools.product(range(16), range(2)): - data = [ v for x in range(CRATE_WIDTH) for y in range(CRATE_HEIGHT) for v in framedata[cy*CRATE_HEIGHT + y][cx*CRATE_WIDTH + x] ] - if len(data) != FRAME_SIZE: - raise ValueError('Invalid frame data. Expected {} bytes, got {}.'.format(FRAME_SIZE, len(data))) - # Send framebuffer data - dev.write(0x01, bytes([0, x, y])+bytes(data)) - # Send latch command - dev.write(0x01, b'\x01') - diff --git a/host/matelight/config.py b/host/matelight/config.py new file mode 100644 index 0000000..d8cdde1 --- /dev/null +++ b/host/matelight/config.py @@ -0,0 +1,27 @@ + +# Hard timeout in seconds after which (approximately) the rendering of a single item will be cut off +RENDERER_TIMEOUT = 20.0 +# How long to show an image by default +DEFAULT_IMAGE_DURATION = 10.0 +# Default scrolling speed in pixels/second +DEFAULT_SCROLL_SPEED = 4 +# Pixels to leave blank between two letters +LETTER_SPACING = 1 + +# Display geometry +# ┌─────────┐ ┌──┬──┬──┬ ⋯ ┬──┬──┬──┐ +# │1 o o o 5│ │ 1│ │ │ │ │ │16│ +# │6 o o o o│ ├──┼──┼──┼ ⋯ ┼──┼──┼──┤ +# │o o o o o│ │17│ │ │ │ │ │32│ +# │o o o o20│ └──┴──┴──┴ ⋯ ┴──┴──┴──┘ +# └─────────┘ +CRATE_WIDTH = 5 +CRATE_HEIGHT = 4 +CRATES_X = 16 +CRATES_Y = 2 + +# Computed values +DISPLAY_WIDTH = CRATES_X * CRATE_WIDTH +DISPLAY_HEIGHT = CRATES_Y * CRATE_HEIGHT +FRAME_SIZE = DISPLAY_WIDTH*DISPLAY_HEIGHT*3 + diff --git a/host/matelight/host.py b/host/matelight/host.py new file mode 100644 index 0000000..461a4fa --- /dev/null +++ b/host/matelight/host.py @@ -0,0 +1,16 @@ +from pyusb import usb +import colorsys +import numpy as np + +dev = usb.core.find(idVendor=0x1cbe, idProduct=0x0003) + +def sendframe(framedata): + if not isinstance(framedata, np.array) or framedata.shape != (DISPLAY_WIDTH, DISPLAY_HEIGHT, 3) or framedata.dtype != np.int8: + raise ValueError('framedata must be a ({}, {}, 3)-numpy array of int8s'.format(DISPLAY_WIDTH, DISPLAY_HEIGHT)) + + for cx, cy in itertools.product(range(16), range(2)): + cratedata = framedata[cx*CRATE_WIDTH:(cx+1)*CRATE_WIDTH, cy*CRATE_HEIGHT:(cy+1)*CRATE_HEIGHT] + # Send framebuffer data + dev.write(0x01, bytes([0, x, y])+bytes(list(cratedata.flatten()))) + # Send latch command + dev.write(0x01, b'\x01') diff --git a/host/matelight/listeners.py b/host/matelight/listeners.py new file mode 100644 index 0000000..bb0ce72 --- /dev/null +++ b/host/matelight/listeners.py @@ -0,0 +1,19 @@ +from socketserver import * +import zlib +import struct + +class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass +class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass + +class MateLightUDPHandler(BaseRequestHandler): + def handle(self): + data = self.request[0].strip() + if len(data) != FRAME_SIZE+4: + raise ValueError('Invalid frame size: Expected {}, got {}'.format(FRAME_SIZE+4, len(frame))) + frame = data[:-4] + crc1, = struct.unpack('!I', data[-4:]) + crc2 = zlib.crc32(frame), + if crc1 != crc2: + raise ValueError('Invalid frame CRC checksum: Expected {}, got {}'.format(crc2, crc1)) + socket.sendto(b'ACK', self.client_address) + diff --git a/host/matelight/queuemgr.py b/host/matelight/queuemgr.py new file mode 100644 index 0000000..df75309 --- /dev/null +++ b/host/matelight/queuemgr.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 + +from renderers import TextRenderer, ImageRenderer +import host, config +import time, math + +score = lambda now, last, lifetime, priority, item: priority*math.log(now-last)/(10.0+item.duration) + +class FuzzyQueue: + def __init__(self, default): + self._default = default + self.put(default, 0.0, 0) + self._l = [] + + def put(self, item, priority=1.0, lifetime=0): + lifetime += time.time() + self._l.append((0, lifetime, priority, item)) + + def pop(self): + """ Get an item from the queue + + NOTE: This is *not* a regular pop, as it does not necessarily remove the item from the queue. + """ + now = time.time() + # Choose item based on last used and priority + _, index, (_, lifetime, priority, item) = max(sorted([(score(now, *v), i, v) for i, v in self._l])) + # If item's lifetime is exceeded, remove + if lifetime < now and item is not self._default: + del self._l[index] + # Otherwise, set item's last played time + self._l[index] = (now, lifetime, prioity, item) + # Finally, return + return item + +q = FuzzyQueue() + +def insert_text(text, priority=1.0, lifetime=0, escapes=True): + q.put(TextRenderer(text, escapes), priority, lifetime) + +def insert_image(image, priority=1.0, lifetime=0): + q.put(ImageRenderer(image), priority, lifetime) + +def render_thread(): + while True: + start = time.time() + for frame, delay in q.pop().frames(start): + then = time.time() + if then-start+delay > RENDERER_TIMEOUT: + break + sendframe(frame) + now = time.time() + time.sleep(min(RENDERER_TIMEOUT, delay - (now-then))) diff --git a/host/matelight/renderers.py b/host/matelight/renderers.py new file mode 100644 index 0000000..4508063 --- /dev/null +++ b/host/matelight/renderers.py @@ -0,0 +1,143 @@ +import numpy as np +try: + import re2 as re +except ImportError: + import re +from PIL import Image +from pixelterm import xtermcolors + +default_palette = [ + (0x00, 0x00, 0x00), # 0 normal colors + (0xcd, 0x00, 0x00), # 1 + (0x00, 0xcd, 0x00), # 2 + (0xcd, 0xcd, 0x00), # 3 + (0x00, 0x00, 0xee), # 4 + (0xcd, 0x00, 0xcd), # 5 + (0x00, 0xcd, 0xcd), # 6 + (0xe5, 0xe5, 0xe5), # 7 + (0x7f, 0x7f, 0x7f), # 8 bright colors + (0xff, 0x00, 0x00), # 9 + (0x00, 0xff, 0x00), # 10 + (0xff, 0xff, 0x00), # 11 + (0x5c, 0x5c, 0xff), # 12 + (0xff, 0x00, 0xff), # 13 + (0x00, 0xff, 0xff), # 14 + (0xff, 0xff, 0xff)] # 15 + +class CharGenerator: + def __init__(self, seq=None, lg=None, text=''): + settings = False, False, False, default_palette[8], default_palette[0] + if lg: + settings = lg,bold, lg.blink, lg.underscore, lg.fg, lg.bg + self.bold, self.blink, self.underscore, self.fg, self.bg = settings + self.text = text + self.parse_escape_sequence(seq) + + def parse_escape_sequence(seq): + codes = list(map(int, seq[2:-1].split(';'))) + fg, bg, reverse, i = self.fg, self.bg, False, 0 + while i<len(codes): + a = codes[i] + if a in [38, 48]: + if codes[i+1] == 5: + c = xtermcolors.xterm_colors[codes[i+2]] + fg, bg = (c, bg) if a == 38 else (fg, c) + i += 2 + elif a == 39: + fg = (0,0,0) + elif a == 49: + bg = (0,0,0) + elif a == 0: + fg, bg = (0,0,0), (0,0,0) + self.bold, self.blink, self.underscore = False, False, False + elif a in range(30, 38): + fg = default_palette[a-30] + elif a in range(90, 98): + fg = default_palette[a-90+8] + elif a in range(40, 48): + bg = default_palette[a-40] + elif a in range(100, 108): + bg = default_palette[a-100+8] + elif a == 7: + reverse = True + elif a == 5: + self.blink = True + elif a == 4: + self.underscore = True + elif a == 1: # Literally "bright", not bold. + self.bold = True + i += 1 + fg, bg = bg, fg if reverse else fg, bg + self.fg, self.bg = fg, bg + + def generate_char(self, c, now): + fg, bg = self.bg, self.bg if self.blink and now%1.0 < 0.3 else self.fg, self.bg + ... + + def generate(self, now): + chars = [self.generate_char(c, now) for c in self.text] + # This refers to inter-letter spacing + space = np.zeros((LETTER_SPACING, DISPLAY_HEIGHT, 3)) + spaces = [space]*(len(chars)-1) + everything = chars + spaces + everything[::2] = chars + everything[1::2] = spaces + return np.concatenate(everything) + +class TextRenderer: + def __init__(self, text, escapes=True): + """Renders text into a frame buffer + + "escapes" tells the renderer whether to interpret escape sequences (True) or not (False). + """ + generators = [] + current_generator = CharGenerator() + for esc, char in r'(\x1B\[[0-9;]+m)|(.)'.finditer(text): + if esc: + if current_generator.text != '': + generators.append(current_generator) + current_generator = CharGenerator(esc, current_generator) + elif char: + current_generator.text += char + self.generators = generators + [current_generator] + + def frames(self, start): + now = time.time() + zeros = [np.zeros((DISPLAY_WIDTH, DISPLAY_HEIGHT, 3))] + # Pad the array with one screen's worth of zeros on both sides so the text fully scrolls through. + raw = np.concatenate([zeros]+[g.generate(now) for g in generators]+[zeros]) + w,h,_ = raw.size + for i in range(DISPLAY_WIDTH+w, 0, -1): + frame = raw[i:i+DISPLAY_WIDTH, :, :] + yield frame, 1/DEFAULT_SCROLL_SPEED + raw = np.concatenate([zeros]+[g.generate(now) for g in generators]+[zeros]) + +class ImageRenderer: + def __new__(cls, image_data): + img = Image.open(io.BytesIO(image_data)) + self.img = img + + def frames(self): + img = self.img + palette = img.getpalette() + last_frame = Image.new("RGB", img.size) + # FIXME set delay to 1/10s if the image is animated, only use DEFAULT_IMAGE_DURATION for static images. + delay = img.info.get('duration', DEFAULT_IMAGE_DURATION*1000.0)/1000.0 + + for frame in ImageSequence.Iterator(img): + #This works around a known bug in Pillow + #See also: http://stackoverflow.com/questions/4904940/python-converting-gif-frames-to-png + frame.putpalette(palette) + c = frame.convert("RGB") + + if img.info['background'] != img.info['transparency']: + last_frame.paste(c, c) + else: + last_frame = c + + im = last_frame.copy() + im.thumbnail((DISPLAY_WIDTH, DISPLAY_HEIGHT), Image.NEAREST) + data = np.array(im.getdata(), dtype=np.int8) + data.reshape((DISPLAY_WIDTH, DISPLAY_HEIGHT, 3)) + yield data, delay + |