Various enhancements

This commit is contained in:
Shy 2017-04-08 16:26:24 +02:00
parent 80186534dc
commit a4310ec1c7

200
c4ctrl.py
View file

@ -1,4 +1,6 @@
#!/usr/bin/env python3
#
# Author: Shy
"""
A command line client for Autoc4, the home automation system of the C4.
@ -10,9 +12,6 @@ Dependencies:
(available from https://github.com/eclipse/paho.mqtt.python)
"""
__date__ = "07 April 2017"
__author__ = "Shy"
import sys
@ -25,39 +24,41 @@ class C4Interface():
retain = True
debug = False
def __init__(self, topic=None):
# Set a default topic
if topic: self.topic = topic
def push(self, message, topic=None, retain=None):
""" Send a message to the MQTT broker.
message may a byte encoded payload or a list of byte encoded
payloads. """
message may be a byte encoded payload or a list of either dict()s
or tuples()s. If message is a byte encoded payload, topic= must be
given. dict()s and tuple()s should lool like this:
dict("topic": str(topic), "payload": bytes(payload))
tuple(str(topic), bytes(payload)) """
from paho.mqtt import publish
# Overwrite defaults
if topic: self.topic = topic
if retain != None: self.retain = retain
# Skip empty messages
if message == [] or message == "": return
# Set defaults
if retain == None: retain = self.retain
if type(message) == list:
# Add <qos> and <retain> to every message
for item in message.copy():
if type(item) == dict:
item["qos"] = self.qos
item["retain"] = self.retain
item["retain"] = retain
elif type(item) == tuple:
new_item = (
item[0] or self.topic, # topic
item[0] or topic, # topic
item[1], # payload
self.qos, # qos
self.retain # retain
retain # retain
)
message.remove(item)
message.append(new_item)
if self.debug: return print("[DEBUG] inhibited messages:", message)
if self.debug: return print("[DEBUG] inhibited messages:",
message, file=sys.stderr)
publish.multiple(message,
hostname=self.broker,
@ -66,29 +67,32 @@ class C4Interface():
else: # Message is not a list
if self.debug:
return print("[DEBUG] inhibited message to '{}': '{}'".format(
self.topic, message))
topic, message), file=sys.stderr)
publish.single(self.topic,
publish.single(topic,
payload=message,
qos=self.qos,
retain=self.retain,
retain=retain,
hostname=self.broker,
port=self.port)
def pull(self, topic=[]):
""" Return the state of a topic.
topic may be a list of topics or a single topic given as string. """
topic may be a list of topics or a single topic given as string.
Returns a paho message object or list of message objects. """
from paho.mqtt import subscribe
topic = topic or self.topic
# <topic> must be a list
# topic must be a list
if type(topic) == str:
topic = [topic]
# Skip empty queries
if topic == []: return
if self.debug:
print("[DEBUG] inhibited query for:", topic)
print("[DEBUG] inhibited query for:", topic, file=sys.stderr)
return []
return subscribe.simple(topic,
@ -100,15 +104,16 @@ class C4Interface():
def status(self):
""" Returns current status (string "open" or "closed") of the club. """
st = self.pull("club/status")
club_status = self.pull("club/status")
# Produce fake result to prevent errors if in debug mode
# Create a fake result to prevent errors if in debug mode
if C4Interface.debug:
print("[DEBUG] Warning: handing over fake data to allow further execution!")
class st: pass
st.payload = b'\x00'
print("[DEBUG] Warning: handing over fake data to allow for further execution!",
file=sys.stderr)
class club_status: pass
club_status.payload = b'\x00'
if st.payload == b'\x01':
if club_status.payload == b'\x01':
return "open"
else:
return "closed"
@ -131,6 +136,7 @@ class C4Interface():
class Dmx:
""" Abstraction of the 3 channel LED cans in the club. """
# 3 bytes for color, one each for red, green and blue
template = "000000"
def __init__(self, topic, color=None):
@ -139,20 +145,20 @@ class Dmx:
self.is_master = topic.rfind("/master") == len(topic)-7 # 7 = len("/master")
def _pad_color(self, color):
""" Merge hex color value into hex template.
""" Merge hex color values or payloads into the template.
Expand 4 bit hex code notation (eg. #f0f) and pad with template
to get a fitting payload for this kind of light. """
Expand 4 bit hex code notation (eg. #f0f) and pad with template. """
if len(color) > len(self.template):
# Silently truncate
# Silently truncate bytes exceeding template length
return color[:len(self.template)]
# Expand 3 char codes and codes of half the required length.
# Yet, lets presume that a 6-char code should never be expanded.
# Yet, let's presume that a 6-char code is alway meant to be
# interpreted as a color and should never be expanded.
if len(color) != 6 and len(color) == 3 or len(color) == (len(self.template) / 2):
expanded = ""
for c in color:
expanded += c*2
color = expanded
color = "".join(char*2 for char in color)
if len(color) == len(self.template): # Nothing more to do
return color
@ -165,6 +171,7 @@ class Dmx:
""" Set color (hex) for this instance.
The color is then available via its color variable. """
color = self._pad_color(color)
self.color = color
@ -174,12 +181,15 @@ class Dmx:
class Dmx4(Dmx):
""" Abstraction of the 4 channel LED cans in the club. """
# 3 bytes for color plus 1 byte for brightness
template = "000000ff"
class Dmx7(Dmx):
""" Abstraction of the 7 channel LED cans in the club. """
# 3 bytes for color, another 3 bytes for special functions and 1 byte
# for brightness
template = "000000000000ff"
@ -191,7 +201,10 @@ class C4Room:
self.switch_state = "" # State of switches in the like of str("0010")
def _interactive_light_switch(self):
""" Interactively ask for input. Returns str(userinput). """
""" Interactively ask for input.
Returns str(userinput). Will not write to stdout if sys.stdin is
no tty. """
if sys.stdin.isatty():
print("[{}]".format(self.name))
@ -211,18 +224,24 @@ class C4Room:
return userinput
def _get_state(self):
""" Returns current state of switches as a str() of 1s and 0s. """
""" Returns current state of switches as a string of 1s and 0s. """
state = ""
req = []
for topic in self.switches:
req.append(topic[1])
responce = self.c4.pull(req)
for sw in self.switches:
for r in responce:
if r.topic == sw[1]:
state += str(int.from_bytes(r.payload, byteorder="little"))
if C4Interface.debug:
print("[DEBUG] Warning: handing over fake data to allow for further execution!",
file=sys.stderr)
state = '0' * len(self.switches)
return state
def light_switch(self, userinput=""):
@ -278,9 +297,13 @@ class C4Room:
command=[]
for i in range(len(self.switches)):
# If we know their state, skip switches which are unchanged
if self.switch_state:
if self.switch_state[i] == userinput[i]: continue
command.append({
"topic" : self.switches[i][1],
"payload" : bytearray([int(userinput[i])])
"payload" : bytes([int(userinput[i])])
})
return self.c4.push(command)
@ -288,7 +311,7 @@ class C4Room:
def set_colorscheme(self, colorscheme, magic):
""" Apply colorscheme to the LED Cans in this room. """
cmd = []
command = []
for light in self.lights:
if colorscheme.color_for(light.topic):
@ -301,23 +324,24 @@ class C4Room:
# Generate the ghost topic for topic
ghost = "ghosts" + light.topic[light.topic.find('/'):]
cmd.append({
command.append({
"topic" : ghost,
"payload" : light.payload
})
else:
# Send data to the real lanterns, not fluffyd.
cmd.append({
command.append({
"topic" : light.topic,
"payload" : light.payload
})
if cmd == []: return
# Nothing to do. May happen if a preset defines no color for a room.
if command == []: return
if magic: # Do not retain "magic" messages
return self.c4.push(cmd, retain=False)
return self.c4.push(command, retain=False)
else:
return self.c4.push(cmd)
return self.c4.push(command)
class Wohnzimmer(C4Room):
@ -400,22 +424,9 @@ class Keller(C4Room):
class Kitchenlight:
""" The Kitchenlight. """
""" Interface to the Kitchenlight and its functions. """
_END = "little" # Endianess
_available_modes = """
off turn off
checker[,DELAY[,COLOR_1[,COLOR_2]]]
Checker
matrix[,LINES] Matrix
mood[,MODE] (1=Colorwheel, 2=Random)
Moodlight
oc[,DELAY] Open Chaos
pacman Pacman
sine Sine
text[,TEXT[,DELAY]] Text
flood Flood
clock Clock"""
_END = "little" # Kitchenlight endianess
def __init__(self, topic="kitchenlight/change_screen",
powertopic="power/wohnzimmer/kitchenlight",
@ -428,26 +439,42 @@ class Kitchenlight:
""" Send commands via a C4Interface to the MQTT broker. """
if self.autopower or poweron or poweroff:
c4 = C4Interface(self.topic)
cmd = []
cmd.append({
c4 = C4Interface()
command = []
command.append({
"topic" : self.topic,
"payload" : data })
if poweroff:
cmd.append({
command.append({
"topic" : self.powertopic,
"payload" : b'\x00'})
elif self.autopower or poweron:
cmd.append({
command.append({
"topic" : self.powertopic,
"payload" : b'\x01'})
c4.push(cmd)
c4.push(command)
else:
c4 = C4Interface(self.topic)
c4.push(data)
c4 = C4Interface()
c4.push(data, topic=self.topic)
def list_available(self):
""" Print a list of available Kitchenlight modes. """
print("Available Kitchenlight modes (options are optional):")
print("""
off turn off Kitchenlight
checker[,DELAY[,COLOR_1[,COLOR_2]]] Checker
matrix[,LINES] Matrix
mood[,1|2] (1=Colorwheel, 2=Random) Moodlight
oc[,DELAY] Open Chaos
pacman Pacman
sine Sine
text[,TEXT[,DELAY]] Text
flood Flood
clock Clock""")
def set_mode(self, mode, opts=[]):
"""Switch to given mode."""
""" Switch to given mode. """
mode = mode.lower()
if mode == "off":
@ -458,7 +485,7 @@ class Kitchenlight:
return self.matrix(*opts)
if mode == "mood":
return self.moodlight(*opts)
if mode == "oc" or mode == "openchaos":
if mode == "oc":
return self.openchaos(*opts)
if mode == "pacman":
return self.pacman()
@ -836,7 +863,8 @@ class ColorScheme:
# First of all, refuse to override virtual presets
if name in self._virtual_presets:
print("I'm sorry Dave. I'm afraid I can't do that. The name \"{}\" is reserved. Please choose a different one.".format(name))
print("I'm sorry Dave. I'm afraid I can't do that. The name \"{}\" \
is reserved. Please choose a different one.".format(name))
return False
if name == '-':
@ -1047,7 +1075,8 @@ class RemotePresets:
available = self.query_available(rooms.copy())
# Produce some fake data to prevent KeyErrors if in debug mode
if C4Interface.debug:
print("[DEBUG] Warning: handing over fake data to allow further execution!")
print("[DEBUG] Warning: handing over fake data to allow for further execution!",
file=sys.stderr)
available = {
"global" : [preset],
"wohnzimmer" : [preset],
@ -1079,7 +1108,8 @@ if __name__ == "__main__":
description="Command line client for AutoC4.")
parser.add_argument(
"-d", "--debug", action="store_true",
help="display what would be send to the MQTT broker, but do not actually connect")
help="display what would be send to the MQTT broker, but do not \
actually connect")
# Various club functions
group_fn = parser.add_argument_group(title="various functions")
@ -1104,7 +1134,9 @@ if __name__ == "__main__":
# Ambient control
group_cl = parser.add_argument_group(title="ambient color control",
description="PRESET may be either a preset name (which may be abbreviated), '#' followed by a color value in hex notation (eg. \"#ff0066\") or '-' to read from stdin.")
description="PRESET may be either a preset name (which may be \
abbreviated), '#' followed by a color value in hex notation (eg. \
\"#ff0066\") or '-' to read from stdin.")
group_cl.add_argument(
"-w", "--wohnzimmer", type=str, dest="w_color", metavar="PRESET",
help="apply local colorscheme PRESET to Wohnzimmer")
@ -1116,7 +1148,7 @@ if __name__ == "__main__":
help="apply local colorscheme PRESET to Fnordcenter")
group_cl.add_argument(
"-m", "--magic", action="store_true",
help="use fluffyd to change colors")
help="EXPERIMENTAL: use fluffyd to change colors")
group_cl.add_argument(
"-l", "--list-presets", action="store_true",
help="list locally available presets")
@ -1126,7 +1158,9 @@ if __name__ == "__main__":
# Switch control
group_sw = parser.add_argument_group(title="light switch control",
description="BINARY_CODE is a string of 0s or 1s for every light in the room. Accepts integers also. Will show some information and ask for input if omitted.")
description="BINARY_CODE is a string of 0s or 1s for every light in the \
room. Accepts decimals also. May be prepended by '&' or '^' as AND or \
OR operators. Will show some information and ask for input if omitted.")
group_sw.add_argument(
"-W", nargs='?', dest="w_switch", const="", metavar="BINARY_CODE",
help="switch lights in Wohnzimmer on/off")
@ -1142,13 +1176,16 @@ if __name__ == "__main__":
# Remote presets
group_rp = parser.add_argument_group(title="remote preset functions",
description="Available room names are \"wohnzimmer\", \"plenar\", \"fnord\" and \"keller\". Preset and room names may be abbreviated.")
description="Available room names are \"wohnzimmer\", \"plenar\", \
\"fnord\" and \"keller\". Preset and room names may be abbreviated.")
group_rp.add_argument(
"-r", "--remote-preset", type=str, metavar="PRESET[:ROOM[,ROOM,...]]",
help="activate remote PRESET for ROOM(s).")
help="activate remote PRESET for ROOM(s). Activates preset globally \
if ROOM is omitted.")
group_rp.add_argument(
"-R", "--list-remote", nargs='?', const="global", metavar="ROOM",
help="list remote presets for ROOM")
help="list remote presets for ROOM. Will list global presets if ROOM \
is omitted.")
args = parser.parse_args()
# Debug, gate, status and shutdown
@ -1167,8 +1204,7 @@ if __name__ == "__main__":
# Kitchenlight
if args.list_kl_modes:
print("Available Kitchenlight modes (options are optional):")
print(Kitchenlight._available_modes)
Kitchenlight().list_available()
if args.kl_mode:
kl = Kitchenlight()
mode = args.kl_mode.split(',')