Implemented '|' and '&' ops for light switching

This commit is contained in:
Shy 2017-04-07 19:58:09 +02:00
parent 78dbe320e0
commit b643246d55

186
c4ctrl.py
View file

@ -1,23 +1,26 @@
#!/usr/bin/env python3
#
# c4ctrl: Command line client for AutoC4
#
# Author: Shy
"""
This is a command line client for Autoc4, the home automation system of the C4.
A command line client for Autoc4, the home automation system of the C4.
Some parts of it **may** be useful as python module for simple tasks.
May be useful as python module for simple tasks.
Dependencies:
Paho Python Client
(available from https://github.com/eclipse/paho.mqtt.python)
"""
__date__ = "07 April 2017"
__author__ = "Shy"
import sys
class C4Interface():
""" Interaction with the C4 home automation system. """
""" Interaction with AutoC4, the C4 home automation system. """
port = 1883
broker = "autoc4.labor.koeln.ccc.de"
port = 1883
qos = 2
retain = True
debug = False
@ -26,20 +29,21 @@ class C4Interface():
# Set a default topic
if topic: self.topic = topic
def push(self, cmd, topic=None, retain=None):
def push(self, message, topic=None, retain=None):
""" Send a message to the MQTT broker.
cmd may a byte encoded payload or a list of byte encoded
message may a byte encoded payload or a list of byte encoded
payloads. """
from paho.mqtt import publish
# Overwrite defaults
if topic: self.topic = topic
if retain != None: self.retain = retain
if type(cmd) == list:
if type(message) == list:
# Add <qos> and <retain> to every message
for item in cmd.copy():
for item in message.copy():
if type(item) == dict:
item["qos"] = self.qos
item["retain"] = self.retain
@ -50,22 +54,22 @@ class C4Interface():
self.qos, # qos
self.retain # retain
)
cmd.remove(item)
cmd.append(new_item)
message.remove(item)
message.append(new_item)
if self.debug: return print("[DEBUG] inhibited messages:", cmd)
if self.debug: return print("[DEBUG] inhibited messages:", message)
publish.multiple(cmd,
publish.multiple(message,
hostname=self.broker,
port=self.port)
else:
else: # Message is not a list
if self.debug:
return print("[DEBUG] inhibited message to '{}': '{}'".format(
self.topic, cmd))
self.topic, message))
publish.single(self.topic,
payload=cmd,
payload=message,
qos=self.qos,
retain=self.retain,
hostname=self.broker,
@ -75,7 +79,9 @@ class C4Interface():
""" Return the state of a topic.
topic may be a list of topics or a single topic given as string. """
from paho.mqtt import subscribe
topic = topic or self.topic
# <topic> must be a list
if type(topic) == str:
@ -93,6 +99,7 @@ class C4Interface():
def status(self):
""" Returns current status (string "open" or "closed") of the club. """
st = self.pull("club/status")
# Produce fake result to prevent errors if in debug mode
@ -108,15 +115,17 @@ class C4Interface():
def open_gate(self):
"""Open the gate."""
self.push(cmd=b'\x01', topic="club/gate", retain=False)
self.push(None, topic="club/gate", retain=False)
def shutdown(self, force=False):
"""Invoke the shutdown routine."""
if force:
payload = b'\x44'
else:
payload = b'\x00'
self.push(cmd=payload, topic="club/shutdown", retain=False)
self.push(payload, topic="club/shutdown", retain=False)
class Dmx:
@ -179,27 +188,19 @@ class C4Room:
def __init__(self):
self.c4 = C4Interface()
self.switch_state = "" # State of switches in the like of str("0010")
def _interactive_light_switch(self):
# Interactively ask for input
""" Interactively ask for input. Returns str(userinput). """
if sys.stdin.isatty():
print("[{}]".format(self.name))
print("Please enter 0 or 1 for every light:")
for level in range(len(self.switches)):
print((level * '|') + ",- " + self.switches[level][0])
# Current state of switches
state = ""
req = []
for t in self.switches:
req.append(t[1])
responce = self.c4.pull(req)
for sw in self.switches:
for r in responce:
if r.topic == sw[1]:
state = state + str(int.from_bytes(r.payload,
byteorder="little"))
print(state) # Present current state
self.switch_state = self._get_state()
print(self.switch_state) # Present current state
try:
userinput = sys.stdin.readline().rstrip('\n')
@ -209,42 +210,84 @@ class C4Room:
return userinput
def _get_state(self):
""" Returns current state of switches as a str() of 1s and 0s. """
state = ""
req = []
for topic in self.switches:
req.append(topic[1])
responce = self.c4.pull(req)
for sw in self.switches:
for r in responce:
if r.topic == sw[1]:
state += str(int.from_bytes(r.payload, byteorder="little"))
return state
def light_switch(self, userinput=""):
""" Switch lamps in a room on or off. """
if not userinput:
# Derive user input from stdin
userinput = self._interactive_light_switch()
if userinput == "": return
mode = 'n' # n = normal, a = AND, o = OR
if not userinput.isdecimal():
print("You're not paying attention!", file=sys.stderr)
return
if userinput[0] == '&' and userinput[1:].isdecimal():
# AND operator
userinput = userinput[1:]
mode = 'a'
elif userinput[0] == '|' and userinput[1:].isdecimal():
# OR operator
userinput = userinput[1:]
mode = 'o'
else:
print("You're not paying attention!", file=sys.stderr)
return
if len(userinput) != len(self.switches):
# First try to convert from integer if userinput's length doesn't
# match
if len(bin(int(userinput))) <= len(self.switches)+2:
# +2 because bin() returns something like 'b0...'
# Try to interpret as integer
binary = bin(int(userinput))[2:]
userinput = str(len(self.switches)*'0')[:-len(binary)] + binary
# ^ +2 because bin() returns strings like 'b0...'
binary = bin(int(userinput))[2:] # Strip leading 'b0'
# Pad with leading zeroes
userinput = binary.rjust(len(self.switches), '0')
else:
print("Error: wrong number of digits (expected {}, got {})!".format(
len(self.switches), len(userinput)))
return False
cmd=[]
for si in range(len(self.switches)):
if userinput[si] not in "01":
print("Error: invalid digit: " + userinput[si])
# Now that everything special is expanded it's time to check if
# userinput really consists of 1s and 0s only
for digit in userinput:
if digit not in "01":
print("Error: invalid digit: " + digit, file=sys.stderr)
return False
cmd.append({
"topic" : self.switches[si][1],
"payload" : bytearray([int(userinput[si])])
if mode == 'a': # AND operator
if not self.switch_state: self.switch_state = self._get_state()
userinput = "".join(map(lambda x, y: str(int(x) & int(y)),
userinput, self.switch_state))
elif mode == 'o': # OR operator
if not self.switch_state: self.switch_state = self._get_state()
userinput = "".join(map(lambda x, y: str(int(x) | int(y)),
userinput, self.switch_state))
command=[]
for i in range(len(self.switches)):
command.append({
"topic" : self.switches[i][1],
"payload" : bytearray([int(userinput[i])])
})
return self.c4.push(cmd)
return self.c4.push(command)
def set_colorscheme(self, colorscheme, magic):
""" Apply colorscheme to the LED Cans in this room. """
cmd = []
for light in self.lights:
if colorscheme.color_for(light.topic):
@ -383,6 +426,7 @@ class Kitchenlight:
def _switch(self, data, poweron=False, poweroff=False):
""" Send commands via a C4Interface to the MQTT broker. """
if self.autopower or poweron or poweroff:
c4 = C4Interface(self.topic)
cmd = []
@ -404,6 +448,7 @@ class Kitchenlight:
def set_mode(self, mode, opts=[]):
"""Switch to given mode."""
mode = mode.lower()
if mode == "off":
return self.empty()
@ -430,12 +475,18 @@ class Kitchenlight:
def empty(self):
""" Set to mode "empty" and turn off Kitchenlight. """
# Screen 0
d = int(0).to_bytes(4, self._END)
self._switch(d, poweroff=True)
def checker(self, delay=500, colA="0000ff", colB="00ff00"):
""" Set to mode "checker". """
""" Set to mode "checker".
delay = delay in ms (default 500)
colA = first color (default 0000ff)
colB = second color (default 00ff00) """
# Kind of a hack: lets treat the two colors as DMX lights
ca = Dmx("checker/a", colA.lstrip('#'))
cb = Dmx("checker/b", colB.lstrip('#'))
@ -456,7 +507,10 @@ class Kitchenlight:
self._switch(d)
def matrix(self, lines=8):
""" Set to mode "matrix". """
""" Set to mode "matrix".
lines (>0, <32) = number of lines (default 8) """
if int(lines) > 31: lines = 31 # Maximal line count
d = bytearray(8)
v = memoryview(d)
@ -466,7 +520,10 @@ class Kitchenlight:
self._switch(d)
def moodlight(self, mode=1):
""" Set to mode "moodlight". """
""" Set to mode "moodlight".
mode [1|2] = colorwheel(1) or random(2) """
if mode == 1: # Mode "Colorwheel"
d = bytearray(19)
v = memoryview(d)
@ -498,7 +555,10 @@ class Kitchenlight:
self._switch(d)
def openchaos(self, delay=1000):
""" Set to mode "openchaos". """
""" Set to mode "openchaos".
delay = delay in milliseconds (default 1000) """
d = bytearray(8)
v = memoryview(d)
# Screen 4
@ -508,21 +568,27 @@ class Kitchenlight:
def pacman(self):
""" Set to mode "pacman". """
# Screen 5
d = int(5).to_bytes(4, self._END)
self._switch(d)
def sine(self):
""" Set to mode "sine". """
# Screen 6
d = int(6).to_bytes(4, self._END)
self._switch(d)
# Screen 7 is Strobo, which is disabled because it seems to do harm to
# the Kitchenlight. Evil strobo!
# the Kitchenlight. Evil strobo.
def text(self, text="Hello World", delay=250):
""" Set to mode "text". """
""" Set to mode "text".
text (str < 256 bytes) = text to display (default "Hello World")
delay = delay in milliseconds (default 250) """
text = text.encode("ascii", "ignore")
if len(text) > 256: # Maximum text length
print("Warning: text length must not exceed 256 characters!", file=sys.stderr)
@ -739,15 +805,19 @@ class ColorScheme:
def from_color(self, color):
""" Derive ColorScheme from a single hex color. """
self.single_color = color.lstrip('#').strip('-')
def from_random(self):
""" Derive ColorScheme from random colors. """
self.return_random_color = True
def list_available(self):
""" List available presets. """
import os
cfg_dir = self._get_cfg_dir()
if not cfg_dir:
self.available = self._virtual_presets.copy()
@ -763,6 +833,7 @@ class ColorScheme:
def store(self, name):
""" Store the current state of all lights as preset. """
# First of all, refuse to override virtual presets
if name in self._virtual_presets:
print("I'm sorry Dave. I'm afraid I can't do that. The name \"{}\" is reserved. Please choose a different one.".format(name))
@ -821,6 +892,7 @@ class ColorScheme:
else:
fd.write("{} = {}\n".format(topic, color))
# Do not close stdout
if name != '-':
fd.close()
print("Wrote preset \"{}\"".format(name))
@ -865,6 +937,7 @@ class RemotePresets:
def _expand_room_name(self, name):
""" Returns a valid room name expanded from the given name. """
if name in self.map.keys():
# Return on exact match
return name
@ -922,6 +995,7 @@ class RemotePresets:
def query_available(self, rooms=["global"]):
""" Returns a dict of remotely available presets for [rooms]. """
import json
# Presets in "global" are available everywhere and should always be included
@ -1041,8 +1115,8 @@ if __name__ == "__main__":
"-f", "--fnordcenter", type=str, dest="f_color", metavar="PRESET",
help="apply local colorscheme PRESET to Fnordcenter")
group_cl.add_argument(
"-M", "--magic", action="store_true",
help="Use fluffyd to change colors.")
"-m", "--magic", action="store_true",
help="use fluffyd to change colors")
group_cl.add_argument(
"-l", "--list-presets", action="store_true",
help="list locally available presets")