From ad5102f334720b54c549d698a790f45bd096ccf0 Mon Sep 17 00:00:00 2001 From: Shy Date: Mon, 10 Apr 2017 15:32:53 +0200 Subject: [PATCH] Various modifications --- c4ctrl.py | 596 +++++++++++++++++++++++++++--------------------------- 1 file changed, 302 insertions(+), 294 deletions(-) diff --git a/c4ctrl.py b/c4ctrl.py index bf7bbd3..5012b05 100755 --- a/c4ctrl.py +++ b/c4ctrl.py @@ -13,24 +13,25 @@ Dependencies: """ import sys +from random import choice # For client_id generation class C4Interface(): """ Interaction with AutoC4, the C4 home automation system. """ - from uuid import uuid4 - broker = "autoc4.labor.koeln.ccc.de" port = 1883 qos = 2 retain = True # Generate a (sufficiently) unique client id - client_id = "c4ctrl-" + uuid4().hex + client_id = "c4ctrl-" + "".join( + choice("0123456789abcdefABCDEF") for unused in range(16) + ) debug = False def push(self, message, topic=None, retain=None): """ Send a message to the MQTT broker. - + message may be a byte encoded payload or a list of either dict()s or tuples()s. If message is a byte encoded payload, topic= must be given. dict()s and tuple()s should lool like this: @@ -60,7 +61,7 @@ class C4Interface(): ) message.remove(item) message.append(new_item) - + if self.debug: return print("[DEBUG] inhibited messages:", message, file=sys.stderr) @@ -84,7 +85,7 @@ class C4Interface(): def pull(self, topic=[]): """ Return the state of a topic. - + topic may be a list of topics or a single topic given as string. Returns a paho message object or list of message objects. """ @@ -140,6 +141,225 @@ class C4Interface(): self.push(payload, topic="club/shutdown", retain=False) +class Kitchenlight: + """ Interface to the Kitchenlight and its functions. """ + + _END = "little" # Kitchenlight endianess + + def __init__(self, topic="kitchenlight/change_screen", + powertopic="power/wohnzimmer/kitchenlight", + autopower=True): + self.topic = topic # Kitchenlight topic + self.powertopic = powertopic # Topic for power on + self.autopower = autopower # Power on on every mode change? + + def _switch(self, data, poweron=False, poweroff=False): + """ Send commands via a C4Interface to the MQTT broker. """ + + if self.autopower or poweron or poweroff: + c4 = C4Interface() + command = [] + command.append({ + "topic" : self.topic, + "payload" : data }) + if poweroff: + command.append({ + "topic" : self.powertopic, + "payload" : b'\x00'}) + elif self.autopower or poweron: + command.append({ + "topic" : self.powertopic, + "payload" : b'\x01'}) + c4.push(command) + else: + c4 = C4Interface() + c4.push(data, topic=self.topic) + + def list_available(self): + """ Print a list of available Kitchenlight modes. """ + + print("Available Kitchenlight modes (options are optional):") + print(""" + off turn off Kitchenlight + checker[,DELAY[,COLOR_1[,COLOR_2]]] Checker + matrix[,LINES] Matrix + mood[,1|2] (1=Colorwheel, 2=Random) Moodlight + oc[,DELAY] Open Chaos + pacman Pacman + sine Sine + text[,TEXT[,DELAY]] Text + flood Flood + clock Clock""") + + def set_mode(self, mode, opts=[]): + """ Switch to given mode. """ + + mode = mode.lower() + if mode == "off": + return self.empty() + if mode == "checker": + return self.checker(*opts) + if mode == "matrix": + return self.matrix(*opts) + if mode == "mood": + return self.moodlight(*opts) + if mode == "oc": + return self.openchaos(*opts) + if mode == "pacman": + return self.pacman() + if mode == "sine": + return self.sine() + if mode == "text": + return self.text(*opts) + if mode == "flood": + return self.flood() + if mode == "clock": + return self.clock() + print("Error: unknown Kitchenlight mode {}!".format(mode)) + return False + + def empty(self): + """ Set to mode "empty" and turn off Kitchenlight. """ + + # Screen 0 + d = int(0).to_bytes(4, self._END) + self._switch(d, poweroff=True) + + def checker(self, delay=500, colA="0000ff", colB="00ff00"): + """ Set to mode "checker". + + delay = delay in ms (default 500) + colA = first color (default 0000ff) + colB = second color (default 00ff00) """ + + # Kind of a hack: lets treat the two colors as DMX lights + ca = Dmx("checker/a", colA.lstrip('#')) + cb = Dmx("checker/b", colB.lstrip('#')) + d = bytearray(20) + v = memoryview(d) + # Screen 1 + v[0:4] = int(1).to_bytes(4, self._END) + # Delay + v[4:8] = int(delay).to_bytes(4, self._END) + # ColorA R/G/B + v[8:10] = int(ca.color[0:2], base=16).to_bytes(2, self._END) + v[10:12] = int(ca.color[2:4], base=16).to_bytes(2, self._END) + v[12:14] = int(ca.color[4:6], base=16).to_bytes(2, self._END) + # ColorB R/G/B + v[14:16] = int(cb.color[0:2], base=16).to_bytes(2, self._END) + v[16:18] = int(cb.color[2:4], base=16).to_bytes(2, self._END) + v[18:20] = int(cb.color[4:6], base=16).to_bytes(2, self._END) + self._switch(d) + + def matrix(self, lines=8): + """ Set to mode "matrix". + + lines (>0, <32) = number of lines (default 8) """ + + if int(lines) > 31: lines = 31 # Maximal line count + d = bytearray(8) + v = memoryview(d) + # Screen 2 + v[0:4] = int(2).to_bytes(4, self._END) + v[4:8] = int(lines).to_bytes(4, self._END) + self._switch(d) + + def moodlight(self, mode=1): + """ Set to mode "moodlight". + + mode [1|2] = colorwheel(1) or random(2) """ + + if mode == 1: # Mode "Colorwheel" + d = bytearray(19) + v = memoryview(d) + # Screen 3 + v[0:4] = int(3).to_bytes(4, self._END) + # Mode + v[4:5] = int(mode).to_bytes(1, self._END) + # Step + v[5:9] = int(1).to_bytes(4, self._END) + # Fade delay + v[9:13] = int(10).to_bytes(4, self._END) + # Pause + v[13:17] = int(10000).to_bytes(4, self._END) + # Hue step + v[17:19] = int(30).to_bytes(2, self._END) + else: # Mode "Random" + d = bytearray(17) + v = memoryview(d) + # Screen 3 + v[0:4] = int(3).to_bytes(4, self._END) + # Mode + v[4:5] = int(mode).to_bytes(1, self._END) + # Step + v[5:9] = int(1).to_bytes(4, self._END) + # Fade delay + v[9:13] = int(10).to_bytes(4, self._END) + # Pause + v[13:17] = int(10000).to_bytes(4, self._END) + self._switch(d) + + def openchaos(self, delay=1000): + """ Set to mode "openchaos". + + delay = delay in milliseconds (default 1000) """ + + d = bytearray(8) + v = memoryview(d) + # Screen 4 + v[0:4] = int(4).to_bytes(4, self._END) + v[4:8] = int(delay).to_bytes(4, self._END) + self._switch(d) + + def pacman(self): + """ Set to mode "pacman". """ + + # Screen 5 + d = int(5).to_bytes(4, self._END) + self._switch(d) + + def sine(self): + """ Set to mode "sine". """ + + # Screen 6 + d = int(6).to_bytes(4, self._END) + self._switch(d) + + # Screen 7 is Strobo, which is disabled because it seems to do harm to + # the Kitchenlight. Evil strobo. + + def text(self, text="Hello World", delay=250): + """ Set to mode "text". + + text (str < 256 bytes) = text to display (default "Hello World") + delay = delay in milliseconds (default 250) """ + + text = text.encode("ascii", "ignore") + if len(text) > 256: # Maximum text length + print("Warning: text length must not exceed 256 characters!", file=sys.stderr) + text = text[:256] + d = bytearray(8 + len(text) + 1) + v = memoryview(d) + # Screen 8 + v[0:4] = int(8).to_bytes(4, self._END) + v[4:8] = int(delay).to_bytes(4, self._END) + v[8:8 + len(text)] = text + v[len(d) - 1:len(d)] = bytes(1) + self._switch(d) + + def flood(self): + """ Set to mode "flood". """ + # Screen 9 + d = int(9).to_bytes(4, self._END) + self._switch(d) + + def clock(self): + """ Set to mode "clock". """ + # Screen 11 + d = int(11).to_bytes(4, self._END) + self._switch(d) + + class Dmx: """ Abstraction of the 3 channel LED cans in the club. """ @@ -205,11 +425,13 @@ class C4Room: def __init__(self): self.c4 = C4Interface() - self.switch_state = "" # State of switches in the like of str("0010") + # get_switch_state() will store its result and a timestamp to reduce + # requests to the broker + self._switch_state = ("", 0.0) def _interactive_light_switch(self): """ Interactively ask for input. - + Returns str(userinput). Will not write to stdout if sys.stdin is no tty. """ @@ -219,8 +441,8 @@ class C4Room: for level in range(len(self.switches)): print((level * '|') + ",- " + self.switches[level][0]) - self.switch_state = self.get_switch_state() - print(self.switch_state) # Present current state + switch_state = self.get_switch_state() + print(switch_state) # Present current state try: userinput = sys.stdin.readline().rstrip('\n') @@ -230,8 +452,19 @@ class C4Room: return userinput - def get_switch_state(self): - """ Returns current state of switches as a string of 1s and 0s. """ + def get_switch_state(self, max_age=5): + """ Returns current state of switches as a string of 1s and 0s. + + max_age specifies how old (in seconds) a cached responce from a + previously done request may be before it is considered outdated. """ + + from time import time + + # We store switch states in self._switch_state to reduce requests to + # the broker. If this variable is neither empty nor too old, use it! + if self._switch_state[0] != "": + if time() - self._switch_state[1] <= max_age: + return self._switch_state[0] state = "" req = [] @@ -249,6 +482,7 @@ class C4Room: file=sys.stderr) state = '0' * len(self.switches) + self._switch_state = (state, time()) return state def light_switch(self, userinput=""): @@ -259,36 +493,49 @@ class C4Room: userinput = self._interactive_light_switch() if userinput == "": return + # Let's support some geeky binary operations! mode = 'n' # n = normal, a = AND, o = OR if not userinput.isdecimal(): - if userinput[0] == '&' and userinput[1:].isdecimal(): + if userinput[0] == '&' and userinput[1:].strip().isdecimal(): # AND operator, applied later after doing some more validation - userinput = userinput[1:] + userinput = userinput[1:].strip() mode = 'a' - elif userinput[0] == '|' and userinput[1:].isdecimal(): + + elif userinput[0] == '|' and userinput[1:].strip().isdecimal(): # OR operator, applied later after doing some more validation - userinput = userinput[1:] + userinput = userinput[1:].strip() mode = 'o' - elif userinput == ">>" or userinput == "<<": - # Left and right shift - if not self.switch_state: - self.switch_state = self.get_switch_state() - if userinput == ">>": + + elif (userinput[:2] == ">>" or userinput[:2] == "<<") \ + and (userinput[2:].strip() == "" or userinput[2:].strip().isdecimal()): + # Left or right shift + # How far shall we shift? + if userinput[2:].strip().isdecimal(): + shift_by = int(userinput[2:]) + else: + shift_by = 1 + + # Retrieve the current state of switches + switch_state = self.get_switch_state() + if userinput[:2] == ">>": # Right shift. '[2:]' removes the leading 'b0...'. - new_state = bin(int(self.switch_state, base=2) >> 1)[2:] + new_state = bin(int(switch_state, base=2) >> shift_by)[2:] else: # Left shift. '[2:]' removes the leading 'b0...'. - new_state = bin(int(self.switch_state, base=2) << 1)[2:] + new_state = bin(int(switch_state, base=2) << shift_by)[2:] # Cut any exceeding leftmost bits new_state = new_state[-len(self.switches):] # Pad with leading zeroes userinput = new_state.rjust(len(self.switches), '0') + else: - print("You're not paying attention!", file=sys.stderr) + # Oh no, input contained non-decimal characters which we could + # not parse. :( + print("Error: could not parse input!", file=sys.stderr) return if len(userinput) != len(self.switches): - # First try to convert from integer if userinput's length doesn't + # First try to convert from decimal if userinput's length doesn't # match if len(bin(int(userinput))) <= len(self.switches)+2: # ^ +2 because bin() returns strings like 'b0...' @@ -297,7 +544,7 @@ class C4Room: userinput = binary.rjust(len(self.switches), '0') else: print("Error: wrong number of digits (expected {}, got {})!".format( - len(self.switches), len(userinput))) + len(self.switches), len(userinput)), file=sys.stderr) return False # Now that everything special is expanded it's time to check if @@ -308,21 +555,19 @@ class C4Room: return False if mode == 'a': # AND operator - if not self.switch_state: - self.switch_state = self.get_switch_state() + switch_state = self.get_switch_state() userinput = "".join(map(lambda x, y: str(int(x) & int(y)), - userinput, self.switch_state)) + userinput, switch_state)) elif mode == 'o': # OR operator - if not self.switch_state: - self.switch_state = self.get_switch_state() + switch_state = self.get_switch_state() userinput = "".join(map(lambda x, y: str(int(x) | int(y)), - userinput, self.switch_state)) + userinput, switch_state)) command=[] for i in range(len(self.switches)): - # If we know their state, skip switches which are unchanged - if self.switch_state: - if self.switch_state[i] == userinput[i]: continue + # Skip unchanged switches if we happen to know their state + if "switch_state" in dir(): + if switch_state[i] == userinput[i]: continue command.append({ "topic" : self.switches[i][1], @@ -336,11 +581,11 @@ class C4Room: command = [] for light in self.lights: - if colorscheme.color_for(light.topic): + if colorscheme.get_color_for(light.topic): # Update internal state of this Dmx object, so we can query # .payload later - light.set_color(colorscheme.color_for(light.topic)) + light.set_color(colorscheme.get_color_for(light.topic)) if magic: # Send color to ghost instead of the "real" light @@ -446,225 +691,6 @@ class Keller(C4Room): lights = () -class Kitchenlight: - """ Interface to the Kitchenlight and its functions. """ - - _END = "little" # Kitchenlight endianess - - def __init__(self, topic="kitchenlight/change_screen", - powertopic="power/wohnzimmer/kitchenlight", - autopower=True): - self.topic = topic # Kitchenlight topic - self.powertopic = powertopic # Topic for power on - self.autopower = autopower # Power on on every mode change? - - def _switch(self, data, poweron=False, poweroff=False): - """ Send commands via a C4Interface to the MQTT broker. """ - - if self.autopower or poweron or poweroff: - c4 = C4Interface() - command = [] - command.append({ - "topic" : self.topic, - "payload" : data }) - if poweroff: - command.append({ - "topic" : self.powertopic, - "payload" : b'\x00'}) - elif self.autopower or poweron: - command.append({ - "topic" : self.powertopic, - "payload" : b'\x01'}) - c4.push(command) - else: - c4 = C4Interface() - c4.push(data, topic=self.topic) - - def list_available(self): - """ Print a list of available Kitchenlight modes. """ - - print("Available Kitchenlight modes (options are optional):") - print(""" - off turn off Kitchenlight - checker[,DELAY[,COLOR_1[,COLOR_2]]] Checker - matrix[,LINES] Matrix - mood[,1|2] (1=Colorwheel, 2=Random) Moodlight - oc[,DELAY] Open Chaos - pacman Pacman - sine Sine - text[,TEXT[,DELAY]] Text - flood Flood - clock Clock""") - - def set_mode(self, mode, opts=[]): - """ Switch to given mode. """ - - mode = mode.lower() - if mode == "off": - return self.empty() - if mode == "checker": - return self.checker(*opts) - if mode == "matrix": - return self.matrix(*opts) - if mode == "mood": - return self.moodlight(*opts) - if mode == "oc": - return self.openchaos(*opts) - if mode == "pacman": - return self.pacman() - if mode == "sine": - return self.sine() - if mode == "text": - return self.text(*opts) - if mode == "flood": - return self.flood() - if mode == "clock": - return self.clock() - print("Error: unknown Kitchenlight mode {}!".format(mode)) - return False - - def empty(self): - """ Set to mode "empty" and turn off Kitchenlight. """ - - # Screen 0 - d = int(0).to_bytes(4, self._END) - self._switch(d, poweroff=True) - - def checker(self, delay=500, colA="0000ff", colB="00ff00"): - """ Set to mode "checker". - - delay = delay in ms (default 500) - colA = first color (default 0000ff) - colB = second color (default 00ff00) """ - - # Kind of a hack: lets treat the two colors as DMX lights - ca = Dmx("checker/a", colA.lstrip('#')) - cb = Dmx("checker/b", colB.lstrip('#')) - d = bytearray(20) - v = memoryview(d) - # Screen 1 - v[0:4] = int(1).to_bytes(4, self._END) - # Delay - v[4:8] = int(delay).to_bytes(4, self._END) - # ColorA R/G/B - v[8:10] = int(ca.color[0:2], base=16).to_bytes(2, self._END) - v[10:12] = int(ca.color[2:4], base=16).to_bytes(2, self._END) - v[12:14] = int(ca.color[4:6], base=16).to_bytes(2, self._END) - # ColorB R/G/B - v[14:16] = int(cb.color[0:2], base=16).to_bytes(2, self._END) - v[16:18] = int(cb.color[2:4], base=16).to_bytes(2, self._END) - v[18:20] = int(cb.color[4:6], base=16).to_bytes(2, self._END) - self._switch(d) - - def matrix(self, lines=8): - """ Set to mode "matrix". - - lines (>0, <32) = number of lines (default 8) """ - - if int(lines) > 31: lines = 31 # Maximal line count - d = bytearray(8) - v = memoryview(d) - # Screen 2 - v[0:4] = int(2).to_bytes(4, self._END) - v[4:8] = int(lines).to_bytes(4, self._END) - self._switch(d) - - def moodlight(self, mode=1): - """ Set to mode "moodlight". - - mode [1|2] = colorwheel(1) or random(2) """ - - if mode == 1: # Mode "Colorwheel" - d = bytearray(19) - v = memoryview(d) - # Screen 3 - v[0:4] = int(3).to_bytes(4, self._END) - # Mode - v[4:5] = int(mode).to_bytes(1, self._END) - # Step - v[5:9] = int(1).to_bytes(4, self._END) - # Fade delay - v[9:13] = int(10).to_bytes(4, self._END) - # Pause - v[13:17] = int(10000).to_bytes(4, self._END) - # Hue step - v[17:19] = int(30).to_bytes(2, self._END) - else: # Mode "Random" - d = bytearray(17) - v = memoryview(d) - # Screen 3 - v[0:4] = int(3).to_bytes(4, self._END) - # Mode - v[4:5] = int(mode).to_bytes(1, self._END) - # Step - v[5:9] = int(1).to_bytes(4, self._END) - # Fade delay - v[9:13] = int(10).to_bytes(4, self._END) - # Pause - v[13:17] = int(10000).to_bytes(4, self._END) - self._switch(d) - - def openchaos(self, delay=1000): - """ Set to mode "openchaos". - - delay = delay in milliseconds (default 1000) """ - - d = bytearray(8) - v = memoryview(d) - # Screen 4 - v[0:4] = int(4).to_bytes(4, self._END) - v[4:8] = int(delay).to_bytes(4, self._END) - self._switch(d) - - def pacman(self): - """ Set to mode "pacman". """ - - # Screen 5 - d = int(5).to_bytes(4, self._END) - self._switch(d) - - def sine(self): - """ Set to mode "sine". """ - - # Screen 6 - d = int(6).to_bytes(4, self._END) - self._switch(d) - - # Screen 7 is Strobo, which is disabled because it seems to do harm to - # the Kitchenlight. Evil strobo. - - def text(self, text="Hello World", delay=250): - """ Set to mode "text". - - text (str < 256 bytes) = text to display (default "Hello World") - delay = delay in milliseconds (default 250) """ - - text = text.encode("ascii", "ignore") - if len(text) > 256: # Maximum text length - print("Warning: text length must not exceed 256 characters!", file=sys.stderr) - text = text[:256] - d = bytearray(8 + len(text) + 1) - v = memoryview(d) - # Screen 8 - v[0:4] = int(8).to_bytes(4, self._END) - v[4:8] = int(delay).to_bytes(4, self._END) - v[8:8 + len(text)] = text - v[len(d) - 1:len(d)] = bytes(1) - self._switch(d) - - def flood(self): - """ Set to mode "flood". """ - # Screen 9 - d = int(9).to_bytes(4, self._END) - self._switch(d) - - def clock(self): - """ Set to mode "clock". """ - # Screen 11 - d = int(11).to_bytes(4, self._END) - self._switch(d) - - class ColorScheme: """ Abstraction of a colorscheme. """ @@ -692,7 +718,7 @@ class ColorScheme: return self.from_file(autoinit) def __bool__(self): - # Return true if color_for has a chance to present anything useful + # Return true if get_color_for has a chance to present anything useful if self.mapping: return True if self.single_color: return True if self.return_random_color: return True @@ -764,44 +790,20 @@ class ColorScheme: color += hex(ch)[2:]*2 return color - def _single_color(self): - # Check if there is a range in the color string. If yes, replace it - # by a random color. - if self.single_color.find('-', 1, -1) == -1: - return self.single_color + def get_color_for(self, topic): + """ Returns color for topic. - from random import randint - color = "" - for i in range(len(self.single_color)): - if self.single_color[i] != '-': - try: - if self.single_color[i-1] == '-': - continue - elif self.single_color[i+1] == '-': - continue - except IndexError: pass - color += self.single_color[i] - else: - f, t = self.single_color[i-1], self.single_color[i+1] - color += hex(randint(int(f, base=16), int(t, base=16)))[2:] + Returns the color (in hexadecimal notation) this ColorScheme + associates with for the given topic. """ - return color - - def color_for(self, topic): - """ Returns the color (in hexadecimal notation) this ColorScheme assumes - for the given topic. """ - - # We need to take care not to return colors for both "normal" topics - # and masters, as setting masters would override other settings. - # If this ColorScheme has been read from a file though, we asssume that - # the user has taken care of this and apply what we are told to apply. if self.mapping: if topic in self.mapping.keys(): return self.mapping[topic] elif self.single_color: - if not self._topic_is_master(topic): - return self._single_color() + return self.single_color elif self.return_random_color: + # We need to take care not to return colors for both "normal" and + # master topics if not self._topic_is_master(topic): return self._random_color() # Fallback @@ -856,7 +858,7 @@ class ColorScheme: def from_color(self, color): """ Derive ColorScheme from a single hex color. """ - self.single_color = color.lstrip('#').strip('-') + self.single_color = color.lstrip('#') def from_random(self): """ Derive ColorScheme from random colors. """ @@ -884,8 +886,10 @@ class ColorScheme: def store(self, name): """ Store the current state of all lights as preset. """ - # First of all, refuse to override virtual presets - if name in self._virtual_presets: + # Refuse to save under a name used by virtual presets. Let's also + # refuse to save as "config" or "c4ctrl.conf", as we may use one these + # file names in the future. + if name in self._virtual_presets or name in ["config", "c4ctrl.conf"]: print("I'm sorry Dave. I'm afraid I can't do that. The name \"{}\" \ is reserved. Please choose a different one.".format(name)) return False @@ -894,11 +898,15 @@ is reserved. Please choose a different one.".format(name)) fd = sys.stdout else: import os - cfg_dir = self._get_cfg_dir(create=True) # Create config dir if missing + # Put preset in our config directory, create it if necessary + cfg_dir = self._get_cfg_dir(create=True) + # Strip any path elements + name = os.path.split(name)[1] fn = os.path.join(cfg_dir, name) + try: - fd = open(fn, 'xt') + fd = open(fn, 'xt') # x = new file (writing), t = text mode except FileExistsError: print("A preset with this name already exists, overwrite? [y/N]", end=' ', flush=True) @@ -1001,7 +1009,7 @@ class RemotePresets: def _expand_preset_name(self, name, rooms, available): """ Returns a valid preset name expanded from the given name. - + Takes care to match only presets which are available for all rooms specified.