Various modifications
This commit is contained in:
parent
8489443ac5
commit
ad5102f334
1 changed files with 302 additions and 294 deletions
596
c4ctrl.py
596
c4ctrl.py
|
@ -13,24 +13,25 @@ Dependencies:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
from random import choice # For client_id generation
|
||||||
|
|
||||||
|
|
||||||
class C4Interface():
|
class C4Interface():
|
||||||
""" Interaction with AutoC4, the C4 home automation system. """
|
""" Interaction with AutoC4, the C4 home automation system. """
|
||||||
|
|
||||||
from uuid import uuid4
|
|
||||||
|
|
||||||
broker = "autoc4.labor.koeln.ccc.de"
|
broker = "autoc4.labor.koeln.ccc.de"
|
||||||
port = 1883
|
port = 1883
|
||||||
qos = 2
|
qos = 2
|
||||||
retain = True
|
retain = True
|
||||||
# Generate a (sufficiently) unique client id
|
# Generate a (sufficiently) unique client id
|
||||||
client_id = "c4ctrl-" + uuid4().hex
|
client_id = "c4ctrl-" + "".join(
|
||||||
|
choice("0123456789abcdefABCDEF") for unused in range(16)
|
||||||
|
)
|
||||||
debug = False
|
debug = False
|
||||||
|
|
||||||
def push(self, message, topic=None, retain=None):
|
def push(self, message, topic=None, retain=None):
|
||||||
""" Send a message to the MQTT broker.
|
""" Send a message to the MQTT broker.
|
||||||
|
|
||||||
message may be a byte encoded payload or a list of either dict()s
|
message may be a byte encoded payload or a list of either dict()s
|
||||||
or tuples()s. If message is a byte encoded payload, topic= must be
|
or tuples()s. If message is a byte encoded payload, topic= must be
|
||||||
given. dict()s and tuple()s should lool like this:
|
given. dict()s and tuple()s should lool like this:
|
||||||
|
@ -60,7 +61,7 @@ class C4Interface():
|
||||||
)
|
)
|
||||||
message.remove(item)
|
message.remove(item)
|
||||||
message.append(new_item)
|
message.append(new_item)
|
||||||
|
|
||||||
if self.debug: return print("[DEBUG] inhibited messages:",
|
if self.debug: return print("[DEBUG] inhibited messages:",
|
||||||
message, file=sys.stderr)
|
message, file=sys.stderr)
|
||||||
|
|
||||||
|
@ -84,7 +85,7 @@ class C4Interface():
|
||||||
|
|
||||||
def pull(self, topic=[]):
|
def pull(self, topic=[]):
|
||||||
""" Return the state of a topic.
|
""" Return the state of a topic.
|
||||||
|
|
||||||
topic may be a list of topics or a single topic given as string.
|
topic may be a list of topics or a single topic given as string.
|
||||||
Returns a paho message object or list of message objects. """
|
Returns a paho message object or list of message objects. """
|
||||||
|
|
||||||
|
@ -140,6 +141,225 @@ class C4Interface():
|
||||||
self.push(payload, topic="club/shutdown", retain=False)
|
self.push(payload, topic="club/shutdown", retain=False)
|
||||||
|
|
||||||
|
|
||||||
|
class Kitchenlight:
|
||||||
|
""" Interface to the Kitchenlight and its functions. """
|
||||||
|
|
||||||
|
_END = "little" # Kitchenlight endianess
|
||||||
|
|
||||||
|
def __init__(self, topic="kitchenlight/change_screen",
|
||||||
|
powertopic="power/wohnzimmer/kitchenlight",
|
||||||
|
autopower=True):
|
||||||
|
self.topic = topic # Kitchenlight topic
|
||||||
|
self.powertopic = powertopic # Topic for power on
|
||||||
|
self.autopower = autopower # Power on on every mode change?
|
||||||
|
|
||||||
|
def _switch(self, data, poweron=False, poweroff=False):
|
||||||
|
""" Send commands via a C4Interface to the MQTT broker. """
|
||||||
|
|
||||||
|
if self.autopower or poweron or poweroff:
|
||||||
|
c4 = C4Interface()
|
||||||
|
command = []
|
||||||
|
command.append({
|
||||||
|
"topic" : self.topic,
|
||||||
|
"payload" : data })
|
||||||
|
if poweroff:
|
||||||
|
command.append({
|
||||||
|
"topic" : self.powertopic,
|
||||||
|
"payload" : b'\x00'})
|
||||||
|
elif self.autopower or poweron:
|
||||||
|
command.append({
|
||||||
|
"topic" : self.powertopic,
|
||||||
|
"payload" : b'\x01'})
|
||||||
|
c4.push(command)
|
||||||
|
else:
|
||||||
|
c4 = C4Interface()
|
||||||
|
c4.push(data, topic=self.topic)
|
||||||
|
|
||||||
|
def list_available(self):
|
||||||
|
""" Print a list of available Kitchenlight modes. """
|
||||||
|
|
||||||
|
print("Available Kitchenlight modes (options are optional):")
|
||||||
|
print("""
|
||||||
|
off turn off Kitchenlight
|
||||||
|
checker[,DELAY[,COLOR_1[,COLOR_2]]] Checker
|
||||||
|
matrix[,LINES] Matrix
|
||||||
|
mood[,1|2] (1=Colorwheel, 2=Random) Moodlight
|
||||||
|
oc[,DELAY] Open Chaos
|
||||||
|
pacman Pacman
|
||||||
|
sine Sine
|
||||||
|
text[,TEXT[,DELAY]] Text
|
||||||
|
flood Flood
|
||||||
|
clock Clock""")
|
||||||
|
|
||||||
|
def set_mode(self, mode, opts=[]):
|
||||||
|
""" Switch to given mode. """
|
||||||
|
|
||||||
|
mode = mode.lower()
|
||||||
|
if mode == "off":
|
||||||
|
return self.empty()
|
||||||
|
if mode == "checker":
|
||||||
|
return self.checker(*opts)
|
||||||
|
if mode == "matrix":
|
||||||
|
return self.matrix(*opts)
|
||||||
|
if mode == "mood":
|
||||||
|
return self.moodlight(*opts)
|
||||||
|
if mode == "oc":
|
||||||
|
return self.openchaos(*opts)
|
||||||
|
if mode == "pacman":
|
||||||
|
return self.pacman()
|
||||||
|
if mode == "sine":
|
||||||
|
return self.sine()
|
||||||
|
if mode == "text":
|
||||||
|
return self.text(*opts)
|
||||||
|
if mode == "flood":
|
||||||
|
return self.flood()
|
||||||
|
if mode == "clock":
|
||||||
|
return self.clock()
|
||||||
|
print("Error: unknown Kitchenlight mode {}!".format(mode))
|
||||||
|
return False
|
||||||
|
|
||||||
|
def empty(self):
|
||||||
|
""" Set to mode "empty" and turn off Kitchenlight. """
|
||||||
|
|
||||||
|
# Screen 0
|
||||||
|
d = int(0).to_bytes(4, self._END)
|
||||||
|
self._switch(d, poweroff=True)
|
||||||
|
|
||||||
|
def checker(self, delay=500, colA="0000ff", colB="00ff00"):
|
||||||
|
""" Set to mode "checker".
|
||||||
|
|
||||||
|
delay = delay in ms (default 500)
|
||||||
|
colA = first color (default 0000ff)
|
||||||
|
colB = second color (default 00ff00) """
|
||||||
|
|
||||||
|
# Kind of a hack: lets treat the two colors as DMX lights
|
||||||
|
ca = Dmx("checker/a", colA.lstrip('#'))
|
||||||
|
cb = Dmx("checker/b", colB.lstrip('#'))
|
||||||
|
d = bytearray(20)
|
||||||
|
v = memoryview(d)
|
||||||
|
# Screen 1
|
||||||
|
v[0:4] = int(1).to_bytes(4, self._END)
|
||||||
|
# Delay
|
||||||
|
v[4:8] = int(delay).to_bytes(4, self._END)
|
||||||
|
# ColorA R/G/B
|
||||||
|
v[8:10] = int(ca.color[0:2], base=16).to_bytes(2, self._END)
|
||||||
|
v[10:12] = int(ca.color[2:4], base=16).to_bytes(2, self._END)
|
||||||
|
v[12:14] = int(ca.color[4:6], base=16).to_bytes(2, self._END)
|
||||||
|
# ColorB R/G/B
|
||||||
|
v[14:16] = int(cb.color[0:2], base=16).to_bytes(2, self._END)
|
||||||
|
v[16:18] = int(cb.color[2:4], base=16).to_bytes(2, self._END)
|
||||||
|
v[18:20] = int(cb.color[4:6], base=16).to_bytes(2, self._END)
|
||||||
|
self._switch(d)
|
||||||
|
|
||||||
|
def matrix(self, lines=8):
|
||||||
|
""" Set to mode "matrix".
|
||||||
|
|
||||||
|
lines (>0, <32) = number of lines (default 8) """
|
||||||
|
|
||||||
|
if int(lines) > 31: lines = 31 # Maximal line count
|
||||||
|
d = bytearray(8)
|
||||||
|
v = memoryview(d)
|
||||||
|
# Screen 2
|
||||||
|
v[0:4] = int(2).to_bytes(4, self._END)
|
||||||
|
v[4:8] = int(lines).to_bytes(4, self._END)
|
||||||
|
self._switch(d)
|
||||||
|
|
||||||
|
def moodlight(self, mode=1):
|
||||||
|
""" Set to mode "moodlight".
|
||||||
|
|
||||||
|
mode [1|2] = colorwheel(1) or random(2) """
|
||||||
|
|
||||||
|
if mode == 1: # Mode "Colorwheel"
|
||||||
|
d = bytearray(19)
|
||||||
|
v = memoryview(d)
|
||||||
|
# Screen 3
|
||||||
|
v[0:4] = int(3).to_bytes(4, self._END)
|
||||||
|
# Mode
|
||||||
|
v[4:5] = int(mode).to_bytes(1, self._END)
|
||||||
|
# Step
|
||||||
|
v[5:9] = int(1).to_bytes(4, self._END)
|
||||||
|
# Fade delay
|
||||||
|
v[9:13] = int(10).to_bytes(4, self._END)
|
||||||
|
# Pause
|
||||||
|
v[13:17] = int(10000).to_bytes(4, self._END)
|
||||||
|
# Hue step
|
||||||
|
v[17:19] = int(30).to_bytes(2, self._END)
|
||||||
|
else: # Mode "Random"
|
||||||
|
d = bytearray(17)
|
||||||
|
v = memoryview(d)
|
||||||
|
# Screen 3
|
||||||
|
v[0:4] = int(3).to_bytes(4, self._END)
|
||||||
|
# Mode
|
||||||
|
v[4:5] = int(mode).to_bytes(1, self._END)
|
||||||
|
# Step
|
||||||
|
v[5:9] = int(1).to_bytes(4, self._END)
|
||||||
|
# Fade delay
|
||||||
|
v[9:13] = int(10).to_bytes(4, self._END)
|
||||||
|
# Pause
|
||||||
|
v[13:17] = int(10000).to_bytes(4, self._END)
|
||||||
|
self._switch(d)
|
||||||
|
|
||||||
|
def openchaos(self, delay=1000):
|
||||||
|
""" Set to mode "openchaos".
|
||||||
|
|
||||||
|
delay = delay in milliseconds (default 1000) """
|
||||||
|
|
||||||
|
d = bytearray(8)
|
||||||
|
v = memoryview(d)
|
||||||
|
# Screen 4
|
||||||
|
v[0:4] = int(4).to_bytes(4, self._END)
|
||||||
|
v[4:8] = int(delay).to_bytes(4, self._END)
|
||||||
|
self._switch(d)
|
||||||
|
|
||||||
|
def pacman(self):
|
||||||
|
""" Set to mode "pacman". """
|
||||||
|
|
||||||
|
# Screen 5
|
||||||
|
d = int(5).to_bytes(4, self._END)
|
||||||
|
self._switch(d)
|
||||||
|
|
||||||
|
def sine(self):
|
||||||
|
""" Set to mode "sine". """
|
||||||
|
|
||||||
|
# Screen 6
|
||||||
|
d = int(6).to_bytes(4, self._END)
|
||||||
|
self._switch(d)
|
||||||
|
|
||||||
|
# Screen 7 is Strobo, which is disabled because it seems to do harm to
|
||||||
|
# the Kitchenlight. Evil strobo.
|
||||||
|
|
||||||
|
def text(self, text="Hello World", delay=250):
|
||||||
|
""" Set to mode "text".
|
||||||
|
|
||||||
|
text (str < 256 bytes) = text to display (default "Hello World")
|
||||||
|
delay = delay in milliseconds (default 250) """
|
||||||
|
|
||||||
|
text = text.encode("ascii", "ignore")
|
||||||
|
if len(text) > 256: # Maximum text length
|
||||||
|
print("Warning: text length must not exceed 256 characters!", file=sys.stderr)
|
||||||
|
text = text[:256]
|
||||||
|
d = bytearray(8 + len(text) + 1)
|
||||||
|
v = memoryview(d)
|
||||||
|
# Screen 8
|
||||||
|
v[0:4] = int(8).to_bytes(4, self._END)
|
||||||
|
v[4:8] = int(delay).to_bytes(4, self._END)
|
||||||
|
v[8:8 + len(text)] = text
|
||||||
|
v[len(d) - 1:len(d)] = bytes(1)
|
||||||
|
self._switch(d)
|
||||||
|
|
||||||
|
def flood(self):
|
||||||
|
""" Set to mode "flood". """
|
||||||
|
# Screen 9
|
||||||
|
d = int(9).to_bytes(4, self._END)
|
||||||
|
self._switch(d)
|
||||||
|
|
||||||
|
def clock(self):
|
||||||
|
""" Set to mode "clock". """
|
||||||
|
# Screen 11
|
||||||
|
d = int(11).to_bytes(4, self._END)
|
||||||
|
self._switch(d)
|
||||||
|
|
||||||
|
|
||||||
class Dmx:
|
class Dmx:
|
||||||
""" Abstraction of the 3 channel LED cans in the club. """
|
""" Abstraction of the 3 channel LED cans in the club. """
|
||||||
|
|
||||||
|
@ -205,11 +425,13 @@ class C4Room:
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.c4 = C4Interface()
|
self.c4 = C4Interface()
|
||||||
self.switch_state = "" # State of switches in the like of str("0010")
|
# get_switch_state() will store its result and a timestamp to reduce
|
||||||
|
# requests to the broker
|
||||||
|
self._switch_state = ("", 0.0)
|
||||||
|
|
||||||
def _interactive_light_switch(self):
|
def _interactive_light_switch(self):
|
||||||
""" Interactively ask for input.
|
""" Interactively ask for input.
|
||||||
|
|
||||||
Returns str(userinput). Will not write to stdout if sys.stdin is
|
Returns str(userinput). Will not write to stdout if sys.stdin is
|
||||||
no tty. """
|
no tty. """
|
||||||
|
|
||||||
|
@ -219,8 +441,8 @@ class C4Room:
|
||||||
for level in range(len(self.switches)):
|
for level in range(len(self.switches)):
|
||||||
print((level * '|') + ",- " + self.switches[level][0])
|
print((level * '|') + ",- " + self.switches[level][0])
|
||||||
|
|
||||||
self.switch_state = self.get_switch_state()
|
switch_state = self.get_switch_state()
|
||||||
print(self.switch_state) # Present current state
|
print(switch_state) # Present current state
|
||||||
|
|
||||||
try:
|
try:
|
||||||
userinput = sys.stdin.readline().rstrip('\n')
|
userinput = sys.stdin.readline().rstrip('\n')
|
||||||
|
@ -230,8 +452,19 @@ class C4Room:
|
||||||
|
|
||||||
return userinput
|
return userinput
|
||||||
|
|
||||||
def get_switch_state(self):
|
def get_switch_state(self, max_age=5):
|
||||||
""" Returns current state of switches as a string of 1s and 0s. """
|
""" Returns current state of switches as a string of 1s and 0s.
|
||||||
|
|
||||||
|
max_age specifies how old (in seconds) a cached responce from a
|
||||||
|
previously done request may be before it is considered outdated. """
|
||||||
|
|
||||||
|
from time import time
|
||||||
|
|
||||||
|
# We store switch states in self._switch_state to reduce requests to
|
||||||
|
# the broker. If this variable is neither empty nor too old, use it!
|
||||||
|
if self._switch_state[0] != "":
|
||||||
|
if time() - self._switch_state[1] <= max_age:
|
||||||
|
return self._switch_state[0]
|
||||||
|
|
||||||
state = ""
|
state = ""
|
||||||
req = []
|
req = []
|
||||||
|
@ -249,6 +482,7 @@ class C4Room:
|
||||||
file=sys.stderr)
|
file=sys.stderr)
|
||||||
state = '0' * len(self.switches)
|
state = '0' * len(self.switches)
|
||||||
|
|
||||||
|
self._switch_state = (state, time())
|
||||||
return state
|
return state
|
||||||
|
|
||||||
def light_switch(self, userinput=""):
|
def light_switch(self, userinput=""):
|
||||||
|
@ -259,36 +493,49 @@ class C4Room:
|
||||||
userinput = self._interactive_light_switch()
|
userinput = self._interactive_light_switch()
|
||||||
if userinput == "": return
|
if userinput == "": return
|
||||||
|
|
||||||
|
# Let's support some geeky binary operations!
|
||||||
mode = 'n' # n = normal, a = AND, o = OR
|
mode = 'n' # n = normal, a = AND, o = OR
|
||||||
if not userinput.isdecimal():
|
if not userinput.isdecimal():
|
||||||
if userinput[0] == '&' and userinput[1:].isdecimal():
|
if userinput[0] == '&' and userinput[1:].strip().isdecimal():
|
||||||
# AND operator, applied later after doing some more validation
|
# AND operator, applied later after doing some more validation
|
||||||
userinput = userinput[1:]
|
userinput = userinput[1:].strip()
|
||||||
mode = 'a'
|
mode = 'a'
|
||||||
elif userinput[0] == '|' and userinput[1:].isdecimal():
|
|
||||||
|
elif userinput[0] == '|' and userinput[1:].strip().isdecimal():
|
||||||
# OR operator, applied later after doing some more validation
|
# OR operator, applied later after doing some more validation
|
||||||
userinput = userinput[1:]
|
userinput = userinput[1:].strip()
|
||||||
mode = 'o'
|
mode = 'o'
|
||||||
elif userinput == ">>" or userinput == "<<":
|
|
||||||
# Left and right shift
|
elif (userinput[:2] == ">>" or userinput[:2] == "<<") \
|
||||||
if not self.switch_state:
|
and (userinput[2:].strip() == "" or userinput[2:].strip().isdecimal()):
|
||||||
self.switch_state = self.get_switch_state()
|
# Left or right shift
|
||||||
if userinput == ">>":
|
# How far shall we shift?
|
||||||
|
if userinput[2:].strip().isdecimal():
|
||||||
|
shift_by = int(userinput[2:])
|
||||||
|
else:
|
||||||
|
shift_by = 1
|
||||||
|
|
||||||
|
# Retrieve the current state of switches
|
||||||
|
switch_state = self.get_switch_state()
|
||||||
|
if userinput[:2] == ">>":
|
||||||
# Right shift. '[2:]' removes the leading 'b0...'.
|
# Right shift. '[2:]' removes the leading 'b0...'.
|
||||||
new_state = bin(int(self.switch_state, base=2) >> 1)[2:]
|
new_state = bin(int(switch_state, base=2) >> shift_by)[2:]
|
||||||
else:
|
else:
|
||||||
# Left shift. '[2:]' removes the leading 'b0...'.
|
# Left shift. '[2:]' removes the leading 'b0...'.
|
||||||
new_state = bin(int(self.switch_state, base=2) << 1)[2:]
|
new_state = bin(int(switch_state, base=2) << shift_by)[2:]
|
||||||
# Cut any exceeding leftmost bits
|
# Cut any exceeding leftmost bits
|
||||||
new_state = new_state[-len(self.switches):]
|
new_state = new_state[-len(self.switches):]
|
||||||
# Pad with leading zeroes
|
# Pad with leading zeroes
|
||||||
userinput = new_state.rjust(len(self.switches), '0')
|
userinput = new_state.rjust(len(self.switches), '0')
|
||||||
|
|
||||||
else:
|
else:
|
||||||
print("You're not paying attention!", file=sys.stderr)
|
# Oh no, input contained non-decimal characters which we could
|
||||||
|
# not parse. :(
|
||||||
|
print("Error: could not parse input!", file=sys.stderr)
|
||||||
return
|
return
|
||||||
|
|
||||||
if len(userinput) != len(self.switches):
|
if len(userinput) != len(self.switches):
|
||||||
# First try to convert from integer if userinput's length doesn't
|
# First try to convert from decimal if userinput's length doesn't
|
||||||
# match
|
# match
|
||||||
if len(bin(int(userinput))) <= len(self.switches)+2:
|
if len(bin(int(userinput))) <= len(self.switches)+2:
|
||||||
# ^ +2 because bin() returns strings like 'b0...'
|
# ^ +2 because bin() returns strings like 'b0...'
|
||||||
|
@ -297,7 +544,7 @@ class C4Room:
|
||||||
userinput = binary.rjust(len(self.switches), '0')
|
userinput = binary.rjust(len(self.switches), '0')
|
||||||
else:
|
else:
|
||||||
print("Error: wrong number of digits (expected {}, got {})!".format(
|
print("Error: wrong number of digits (expected {}, got {})!".format(
|
||||||
len(self.switches), len(userinput)))
|
len(self.switches), len(userinput)), file=sys.stderr)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# Now that everything special is expanded it's time to check if
|
# Now that everything special is expanded it's time to check if
|
||||||
|
@ -308,21 +555,19 @@ class C4Room:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if mode == 'a': # AND operator
|
if mode == 'a': # AND operator
|
||||||
if not self.switch_state:
|
switch_state = self.get_switch_state()
|
||||||
self.switch_state = self.get_switch_state()
|
|
||||||
userinput = "".join(map(lambda x, y: str(int(x) & int(y)),
|
userinput = "".join(map(lambda x, y: str(int(x) & int(y)),
|
||||||
userinput, self.switch_state))
|
userinput, switch_state))
|
||||||
elif mode == 'o': # OR operator
|
elif mode == 'o': # OR operator
|
||||||
if not self.switch_state:
|
switch_state = self.get_switch_state()
|
||||||
self.switch_state = self.get_switch_state()
|
|
||||||
userinput = "".join(map(lambda x, y: str(int(x) | int(y)),
|
userinput = "".join(map(lambda x, y: str(int(x) | int(y)),
|
||||||
userinput, self.switch_state))
|
userinput, switch_state))
|
||||||
|
|
||||||
command=[]
|
command=[]
|
||||||
for i in range(len(self.switches)):
|
for i in range(len(self.switches)):
|
||||||
# If we know their state, skip switches which are unchanged
|
# Skip unchanged switches if we happen to know their state
|
||||||
if self.switch_state:
|
if "switch_state" in dir():
|
||||||
if self.switch_state[i] == userinput[i]: continue
|
if switch_state[i] == userinput[i]: continue
|
||||||
|
|
||||||
command.append({
|
command.append({
|
||||||
"topic" : self.switches[i][1],
|
"topic" : self.switches[i][1],
|
||||||
|
@ -336,11 +581,11 @@ class C4Room:
|
||||||
|
|
||||||
command = []
|
command = []
|
||||||
for light in self.lights:
|
for light in self.lights:
|
||||||
if colorscheme.color_for(light.topic):
|
if colorscheme.get_color_for(light.topic):
|
||||||
|
|
||||||
# Update internal state of this Dmx object, so we can query
|
# Update internal state of this Dmx object, so we can query
|
||||||
# <object>.payload later
|
# <object>.payload later
|
||||||
light.set_color(colorscheme.color_for(light.topic))
|
light.set_color(colorscheme.get_color_for(light.topic))
|
||||||
|
|
||||||
if magic:
|
if magic:
|
||||||
# Send color to ghost instead of the "real" light
|
# Send color to ghost instead of the "real" light
|
||||||
|
@ -446,225 +691,6 @@ class Keller(C4Room):
|
||||||
lights = ()
|
lights = ()
|
||||||
|
|
||||||
|
|
||||||
class Kitchenlight:
|
|
||||||
""" Interface to the Kitchenlight and its functions. """
|
|
||||||
|
|
||||||
_END = "little" # Kitchenlight endianess
|
|
||||||
|
|
||||||
def __init__(self, topic="kitchenlight/change_screen",
|
|
||||||
powertopic="power/wohnzimmer/kitchenlight",
|
|
||||||
autopower=True):
|
|
||||||
self.topic = topic # Kitchenlight topic
|
|
||||||
self.powertopic = powertopic # Topic for power on
|
|
||||||
self.autopower = autopower # Power on on every mode change?
|
|
||||||
|
|
||||||
def _switch(self, data, poweron=False, poweroff=False):
|
|
||||||
""" Send commands via a C4Interface to the MQTT broker. """
|
|
||||||
|
|
||||||
if self.autopower or poweron or poweroff:
|
|
||||||
c4 = C4Interface()
|
|
||||||
command = []
|
|
||||||
command.append({
|
|
||||||
"topic" : self.topic,
|
|
||||||
"payload" : data })
|
|
||||||
if poweroff:
|
|
||||||
command.append({
|
|
||||||
"topic" : self.powertopic,
|
|
||||||
"payload" : b'\x00'})
|
|
||||||
elif self.autopower or poweron:
|
|
||||||
command.append({
|
|
||||||
"topic" : self.powertopic,
|
|
||||||
"payload" : b'\x01'})
|
|
||||||
c4.push(command)
|
|
||||||
else:
|
|
||||||
c4 = C4Interface()
|
|
||||||
c4.push(data, topic=self.topic)
|
|
||||||
|
|
||||||
def list_available(self):
|
|
||||||
""" Print a list of available Kitchenlight modes. """
|
|
||||||
|
|
||||||
print("Available Kitchenlight modes (options are optional):")
|
|
||||||
print("""
|
|
||||||
off turn off Kitchenlight
|
|
||||||
checker[,DELAY[,COLOR_1[,COLOR_2]]] Checker
|
|
||||||
matrix[,LINES] Matrix
|
|
||||||
mood[,1|2] (1=Colorwheel, 2=Random) Moodlight
|
|
||||||
oc[,DELAY] Open Chaos
|
|
||||||
pacman Pacman
|
|
||||||
sine Sine
|
|
||||||
text[,TEXT[,DELAY]] Text
|
|
||||||
flood Flood
|
|
||||||
clock Clock""")
|
|
||||||
|
|
||||||
def set_mode(self, mode, opts=[]):
|
|
||||||
""" Switch to given mode. """
|
|
||||||
|
|
||||||
mode = mode.lower()
|
|
||||||
if mode == "off":
|
|
||||||
return self.empty()
|
|
||||||
if mode == "checker":
|
|
||||||
return self.checker(*opts)
|
|
||||||
if mode == "matrix":
|
|
||||||
return self.matrix(*opts)
|
|
||||||
if mode == "mood":
|
|
||||||
return self.moodlight(*opts)
|
|
||||||
if mode == "oc":
|
|
||||||
return self.openchaos(*opts)
|
|
||||||
if mode == "pacman":
|
|
||||||
return self.pacman()
|
|
||||||
if mode == "sine":
|
|
||||||
return self.sine()
|
|
||||||
if mode == "text":
|
|
||||||
return self.text(*opts)
|
|
||||||
if mode == "flood":
|
|
||||||
return self.flood()
|
|
||||||
if mode == "clock":
|
|
||||||
return self.clock()
|
|
||||||
print("Error: unknown Kitchenlight mode {}!".format(mode))
|
|
||||||
return False
|
|
||||||
|
|
||||||
def empty(self):
|
|
||||||
""" Set to mode "empty" and turn off Kitchenlight. """
|
|
||||||
|
|
||||||
# Screen 0
|
|
||||||
d = int(0).to_bytes(4, self._END)
|
|
||||||
self._switch(d, poweroff=True)
|
|
||||||
|
|
||||||
def checker(self, delay=500, colA="0000ff", colB="00ff00"):
|
|
||||||
""" Set to mode "checker".
|
|
||||||
|
|
||||||
delay = delay in ms (default 500)
|
|
||||||
colA = first color (default 0000ff)
|
|
||||||
colB = second color (default 00ff00) """
|
|
||||||
|
|
||||||
# Kind of a hack: lets treat the two colors as DMX lights
|
|
||||||
ca = Dmx("checker/a", colA.lstrip('#'))
|
|
||||||
cb = Dmx("checker/b", colB.lstrip('#'))
|
|
||||||
d = bytearray(20)
|
|
||||||
v = memoryview(d)
|
|
||||||
# Screen 1
|
|
||||||
v[0:4] = int(1).to_bytes(4, self._END)
|
|
||||||
# Delay
|
|
||||||
v[4:8] = int(delay).to_bytes(4, self._END)
|
|
||||||
# ColorA R/G/B
|
|
||||||
v[8:10] = int(ca.color[0:2], base=16).to_bytes(2, self._END)
|
|
||||||
v[10:12] = int(ca.color[2:4], base=16).to_bytes(2, self._END)
|
|
||||||
v[12:14] = int(ca.color[4:6], base=16).to_bytes(2, self._END)
|
|
||||||
# ColorB R/G/B
|
|
||||||
v[14:16] = int(cb.color[0:2], base=16).to_bytes(2, self._END)
|
|
||||||
v[16:18] = int(cb.color[2:4], base=16).to_bytes(2, self._END)
|
|
||||||
v[18:20] = int(cb.color[4:6], base=16).to_bytes(2, self._END)
|
|
||||||
self._switch(d)
|
|
||||||
|
|
||||||
def matrix(self, lines=8):
|
|
||||||
""" Set to mode "matrix".
|
|
||||||
|
|
||||||
lines (>0, <32) = number of lines (default 8) """
|
|
||||||
|
|
||||||
if int(lines) > 31: lines = 31 # Maximal line count
|
|
||||||
d = bytearray(8)
|
|
||||||
v = memoryview(d)
|
|
||||||
# Screen 2
|
|
||||||
v[0:4] = int(2).to_bytes(4, self._END)
|
|
||||||
v[4:8] = int(lines).to_bytes(4, self._END)
|
|
||||||
self._switch(d)
|
|
||||||
|
|
||||||
def moodlight(self, mode=1):
|
|
||||||
""" Set to mode "moodlight".
|
|
||||||
|
|
||||||
mode [1|2] = colorwheel(1) or random(2) """
|
|
||||||
|
|
||||||
if mode == 1: # Mode "Colorwheel"
|
|
||||||
d = bytearray(19)
|
|
||||||
v = memoryview(d)
|
|
||||||
# Screen 3
|
|
||||||
v[0:4] = int(3).to_bytes(4, self._END)
|
|
||||||
# Mode
|
|
||||||
v[4:5] = int(mode).to_bytes(1, self._END)
|
|
||||||
# Step
|
|
||||||
v[5:9] = int(1).to_bytes(4, self._END)
|
|
||||||
# Fade delay
|
|
||||||
v[9:13] = int(10).to_bytes(4, self._END)
|
|
||||||
# Pause
|
|
||||||
v[13:17] = int(10000).to_bytes(4, self._END)
|
|
||||||
# Hue step
|
|
||||||
v[17:19] = int(30).to_bytes(2, self._END)
|
|
||||||
else: # Mode "Random"
|
|
||||||
d = bytearray(17)
|
|
||||||
v = memoryview(d)
|
|
||||||
# Screen 3
|
|
||||||
v[0:4] = int(3).to_bytes(4, self._END)
|
|
||||||
# Mode
|
|
||||||
v[4:5] = int(mode).to_bytes(1, self._END)
|
|
||||||
# Step
|
|
||||||
v[5:9] = int(1).to_bytes(4, self._END)
|
|
||||||
# Fade delay
|
|
||||||
v[9:13] = int(10).to_bytes(4, self._END)
|
|
||||||
# Pause
|
|
||||||
v[13:17] = int(10000).to_bytes(4, self._END)
|
|
||||||
self._switch(d)
|
|
||||||
|
|
||||||
def openchaos(self, delay=1000):
|
|
||||||
""" Set to mode "openchaos".
|
|
||||||
|
|
||||||
delay = delay in milliseconds (default 1000) """
|
|
||||||
|
|
||||||
d = bytearray(8)
|
|
||||||
v = memoryview(d)
|
|
||||||
# Screen 4
|
|
||||||
v[0:4] = int(4).to_bytes(4, self._END)
|
|
||||||
v[4:8] = int(delay).to_bytes(4, self._END)
|
|
||||||
self._switch(d)
|
|
||||||
|
|
||||||
def pacman(self):
|
|
||||||
""" Set to mode "pacman". """
|
|
||||||
|
|
||||||
# Screen 5
|
|
||||||
d = int(5).to_bytes(4, self._END)
|
|
||||||
self._switch(d)
|
|
||||||
|
|
||||||
def sine(self):
|
|
||||||
""" Set to mode "sine". """
|
|
||||||
|
|
||||||
# Screen 6
|
|
||||||
d = int(6).to_bytes(4, self._END)
|
|
||||||
self._switch(d)
|
|
||||||
|
|
||||||
# Screen 7 is Strobo, which is disabled because it seems to do harm to
|
|
||||||
# the Kitchenlight. Evil strobo.
|
|
||||||
|
|
||||||
def text(self, text="Hello World", delay=250):
|
|
||||||
""" Set to mode "text".
|
|
||||||
|
|
||||||
text (str < 256 bytes) = text to display (default "Hello World")
|
|
||||||
delay = delay in milliseconds (default 250) """
|
|
||||||
|
|
||||||
text = text.encode("ascii", "ignore")
|
|
||||||
if len(text) > 256: # Maximum text length
|
|
||||||
print("Warning: text length must not exceed 256 characters!", file=sys.stderr)
|
|
||||||
text = text[:256]
|
|
||||||
d = bytearray(8 + len(text) + 1)
|
|
||||||
v = memoryview(d)
|
|
||||||
# Screen 8
|
|
||||||
v[0:4] = int(8).to_bytes(4, self._END)
|
|
||||||
v[4:8] = int(delay).to_bytes(4, self._END)
|
|
||||||
v[8:8 + len(text)] = text
|
|
||||||
v[len(d) - 1:len(d)] = bytes(1)
|
|
||||||
self._switch(d)
|
|
||||||
|
|
||||||
def flood(self):
|
|
||||||
""" Set to mode "flood". """
|
|
||||||
# Screen 9
|
|
||||||
d = int(9).to_bytes(4, self._END)
|
|
||||||
self._switch(d)
|
|
||||||
|
|
||||||
def clock(self):
|
|
||||||
""" Set to mode "clock". """
|
|
||||||
# Screen 11
|
|
||||||
d = int(11).to_bytes(4, self._END)
|
|
||||||
self._switch(d)
|
|
||||||
|
|
||||||
|
|
||||||
class ColorScheme:
|
class ColorScheme:
|
||||||
""" Abstraction of a colorscheme. """
|
""" Abstraction of a colorscheme. """
|
||||||
|
|
||||||
|
@ -692,7 +718,7 @@ class ColorScheme:
|
||||||
return self.from_file(autoinit)
|
return self.from_file(autoinit)
|
||||||
|
|
||||||
def __bool__(self):
|
def __bool__(self):
|
||||||
# Return true if color_for has a chance to present anything useful
|
# Return true if get_color_for has a chance to present anything useful
|
||||||
if self.mapping: return True
|
if self.mapping: return True
|
||||||
if self.single_color: return True
|
if self.single_color: return True
|
||||||
if self.return_random_color: return True
|
if self.return_random_color: return True
|
||||||
|
@ -764,44 +790,20 @@ class ColorScheme:
|
||||||
color += hex(ch)[2:]*2
|
color += hex(ch)[2:]*2
|
||||||
return color
|
return color
|
||||||
|
|
||||||
def _single_color(self):
|
def get_color_for(self, topic):
|
||||||
# Check if there is a range in the color string. If yes, replace it
|
""" Returns color for topic.
|
||||||
# by a random color.
|
|
||||||
if self.single_color.find('-', 1, -1) == -1:
|
|
||||||
return self.single_color
|
|
||||||
|
|
||||||
from random import randint
|
Returns the color (in hexadecimal notation) this ColorScheme
|
||||||
color = ""
|
associates with for the given topic. """
|
||||||
for i in range(len(self.single_color)):
|
|
||||||
if self.single_color[i] != '-':
|
|
||||||
try:
|
|
||||||
if self.single_color[i-1] == '-':
|
|
||||||
continue
|
|
||||||
elif self.single_color[i+1] == '-':
|
|
||||||
continue
|
|
||||||
except IndexError: pass
|
|
||||||
color += self.single_color[i]
|
|
||||||
else:
|
|
||||||
f, t = self.single_color[i-1], self.single_color[i+1]
|
|
||||||
color += hex(randint(int(f, base=16), int(t, base=16)))[2:]
|
|
||||||
|
|
||||||
return color
|
|
||||||
|
|
||||||
def color_for(self, topic):
|
|
||||||
""" Returns the color (in hexadecimal notation) this ColorScheme assumes
|
|
||||||
for the given topic. """
|
|
||||||
|
|
||||||
# We need to take care not to return colors for both "normal" topics
|
|
||||||
# and masters, as setting masters would override other settings.
|
|
||||||
# If this ColorScheme has been read from a file though, we asssume that
|
|
||||||
# the user has taken care of this and apply what we are told to apply.
|
|
||||||
if self.mapping:
|
if self.mapping:
|
||||||
if topic in self.mapping.keys():
|
if topic in self.mapping.keys():
|
||||||
return self.mapping[topic]
|
return self.mapping[topic]
|
||||||
elif self.single_color:
|
elif self.single_color:
|
||||||
if not self._topic_is_master(topic):
|
return self.single_color
|
||||||
return self._single_color()
|
|
||||||
elif self.return_random_color:
|
elif self.return_random_color:
|
||||||
|
# We need to take care not to return colors for both "normal" and
|
||||||
|
# master topics
|
||||||
if not self._topic_is_master(topic):
|
if not self._topic_is_master(topic):
|
||||||
return self._random_color()
|
return self._random_color()
|
||||||
# Fallback
|
# Fallback
|
||||||
|
@ -856,7 +858,7 @@ class ColorScheme:
|
||||||
def from_color(self, color):
|
def from_color(self, color):
|
||||||
""" Derive ColorScheme from a single hex color. """
|
""" Derive ColorScheme from a single hex color. """
|
||||||
|
|
||||||
self.single_color = color.lstrip('#').strip('-')
|
self.single_color = color.lstrip('#')
|
||||||
|
|
||||||
def from_random(self):
|
def from_random(self):
|
||||||
""" Derive ColorScheme from random colors. """
|
""" Derive ColorScheme from random colors. """
|
||||||
|
@ -884,8 +886,10 @@ class ColorScheme:
|
||||||
def store(self, name):
|
def store(self, name):
|
||||||
""" Store the current state of all lights as preset. """
|
""" Store the current state of all lights as preset. """
|
||||||
|
|
||||||
# First of all, refuse to override virtual presets
|
# Refuse to save under a name used by virtual presets. Let's also
|
||||||
if name in self._virtual_presets:
|
# refuse to save as "config" or "c4ctrl.conf", as we may use one these
|
||||||
|
# file names in the future.
|
||||||
|
if name in self._virtual_presets or name in ["config", "c4ctrl.conf"]:
|
||||||
print("I'm sorry Dave. I'm afraid I can't do that. The name \"{}\" \
|
print("I'm sorry Dave. I'm afraid I can't do that. The name \"{}\" \
|
||||||
is reserved. Please choose a different one.".format(name))
|
is reserved. Please choose a different one.".format(name))
|
||||||
return False
|
return False
|
||||||
|
@ -894,11 +898,15 @@ is reserved. Please choose a different one.".format(name))
|
||||||
fd = sys.stdout
|
fd = sys.stdout
|
||||||
else:
|
else:
|
||||||
import os
|
import os
|
||||||
cfg_dir = self._get_cfg_dir(create=True) # Create config dir if missing
|
|
||||||
|
|
||||||
|
# Put preset in our config directory, create it if necessary
|
||||||
|
cfg_dir = self._get_cfg_dir(create=True)
|
||||||
|
# Strip any path elements
|
||||||
|
name = os.path.split(name)[1]
|
||||||
fn = os.path.join(cfg_dir, name)
|
fn = os.path.join(cfg_dir, name)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
fd = open(fn, 'xt')
|
fd = open(fn, 'xt') # x = new file (writing), t = text mode
|
||||||
except FileExistsError:
|
except FileExistsError:
|
||||||
print("A preset with this name already exists, overwrite? [y/N]",
|
print("A preset with this name already exists, overwrite? [y/N]",
|
||||||
end=' ', flush=True)
|
end=' ', flush=True)
|
||||||
|
@ -1001,7 +1009,7 @@ class RemotePresets:
|
||||||
|
|
||||||
def _expand_preset_name(self, name, rooms, available):
|
def _expand_preset_name(self, name, rooms, available):
|
||||||
""" Returns a valid preset name expanded from the given name.
|
""" Returns a valid preset name expanded from the given name.
|
||||||
|
|
||||||
Takes care to match only presets which are available for all rooms
|
Takes care to match only presets which are available for all rooms
|
||||||
specified.
|
specified.
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue