#!/usr/bin/env python # Matelight # Copyright (C) 2016 Sebastian Götte # Copyright (C) 2015 Uwe Kamper # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import socket import time import itertools import sys import re from contextlib import suppress import asyncio import threading import functools import operator import config import matelight import bdf import crap import secret_sauce def log(*args): print(time.strftime('\x1B[93m[%m-%d %H:%M:%S]\x1B[0m'), ' '.join(str(arg) for arg in args), '\x1B[0m') sys.stdout.flush() def addrcolor(addr): col = 16 + (functools.reduce(operator.xor, (int(e or '0') for e in re.split('[.:]', addr))) % 216) return '\x1B[38;5;{}m{}\x1B[0m'.format(col, addr) class TextRenderer: def __init__(self, text, title='default', checkwidth=False, font=bdf.unifont): self.text = text self.font = font (self.width, _), _testrender = font.compute_text_bounds(text), font.render_text(text, 0) self.title = title if self.width > config.max_marquee_width: raise ValueError() def __iter__(self): for i in range(-config.display_width, self.width): yield self.title, self.font.render_text(self.text, i) time.sleep(0.05) class MatelightTCPServer: def __init__(self, ip, port, loop): coro = asyncio.start_server(self.handle_conn, ip, port, loop=loop) server = loop.run_until_complete(coro) self.renderqueue = [] self.close = server.close def __iter__(self): q, self.renderqueue = self.renderqueue, [] for title, frame in itertools.chain(*q): yield title, frame async def handle_conn(self, reader, writer): line = (await reader.read(1024)).decode('UTF-8').strip() addr,*rest = writer.get_extra_info('peername') log('\x1B[95mText from\x1B[0m {}: {}\x1B[0m'.format(addrcolor(addr), line)) try: secret_sauce.check_spam(str(addr), line) renderer = TextRenderer(line, title='tcp:'+addr, checkwidth=True) except secret_sauce.SpamError as err: log('\x1B[91mMessage rejected from {}: {}'.format(addrcolor(addr), err)) writer.write(b'BLERGH!\n') except ValueError: # Text too long writer.write(b'TOO MUCH INFORMATION!\n') except RuntimeError: # Invalid escape etc. writer.write(b'STAHPTROLLINK?\n') else: self.renderqueue.append(renderer) writer.write(b'KTHXBYE!\n') await writer.drain() writer.close() def _fallbackiter(it, fallback): for fel in fallback: for el in it: yield el yield fel def defaulttexts(filename='default.lines'): with open(filename) as f: return itertools.chain.from_iterable(( TextRenderer(l[:-1].replace('\\x1B', '\x1B')) for l in f.readlines() )) if __name__ == '__main__': try: ml = matelight.Matelight(config.ml_usb_serial_match) except ValueError as e: print(e, 'Starting in headless mode.', file=sys.stderr) ml = None loop = asyncio.get_event_loop() tcp_server = MatelightTCPServer(config.tcp_addr, config.tcp_port, loop) udp_server = crap.CRAPServer(config.udp_addr, config.udp_port) forwarder = crap.CRAPClient(config.crap_fw_addr, config.crap_fw_port) if config.crap_fw_addr is not None else None async_thr = threading.Thread(target=loop.run_forever) async_thr.daemon = True async_thr.start() with suppress(KeyboardInterrupt): while True: for title, frame in _fallbackiter(udp_server, _fallbackiter(tcp_server, defaulttexts())): if ml: ml.sendframe(frame) if forwarder: forwarder.sendframe(frame) tcp_server.close() udp_server.close() forwarder.close()