#!/usr/bin/env python3
# ____ __
# / __// /
# / / / /___ ____
# \ \__\_ / / __/____ __ _
# \___\ |_| / / |_ _| \ |
# \ \__ | || / |_
# \___\|_||_\_\___\
#
# c4ctrl: A command line client for Autoc4.
#
# Author: Shy
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see .
"""
A command line client for Autoc4, the home automation system of the C4.
Run 'c4ctrl -h' for usage information.
Dependencies:
* Paho Python Client
(available from https://github.com/eclipse/paho.mqtt.python).
* Some parts will work on UNIX-like operating systems only.
"""
import sys
from random import choice # for client_id generation.
class C4Interface: # {{{1
""" Interaction with AutoC4, the C4 home automation system. """
broker = "autoc4.labor.koeln.ccc.de"
port = 1883
qos = 0
retain = True
# Generate a (sufficiently) unique client id.
client_id = "c4ctrl-" + "".join(
choice("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
for unused in range(16))
debug = False
def on_permission_error(self, error):
""" Called when catching a PermissionDenied exception while connecting. """
print("Error: You don't have permission to connect to the broker.", file=sys.stderr)
print("Maybe you're not connected to the internal C4 network?", file=sys.stderr)
print(error, file=sys.stderr)
sys.exit(1)
def on_os_error(self, error):
""" Called when catching a OSError exception while connecting. """
print("Error: unable to open a network socket.", file=sys.stderr)
print(error, file=sys.stderr)
sys.exit(1)
def push(self, message, topic=None, retain=None):
""" Send a message to the MQTT broker.
message may be a byte encoded payload or a list of either dict()s
or tuples()s. If message is a byte encoded payload, topic= must be
given. dict()s and tuple()s should look like:
dict("topic": str(topic), "payload": bytes(payload))
tuple(str(topic), bytes(payload)) """
from paho.mqtt import publish
# Skip empty messages.
if message == [] or message == "": return
# Set defaults.
if retain == None: retain = self.retain
if type(message) == list:
# Add and to every message.
for item in message.copy():
if type(item) == dict:
item["qos"] = self.qos
item["retain"] = retain
elif type(item) == tuple:
new_item = (
item[0] or topic, # topic
item[1], # payload
self.qos, # qos
retain # retain
)
message.remove(item)
message.append(new_item)
if self.debug: return print("[DEBUG] inhibited messages:",
message, file=sys.stderr)
try:
publish.multiple(message,
hostname=self.broker,
port=self.port,
client_id=self.client_id)
except PermissionError as error:
self.on_permission_error(error)
except OSError as error:
self.on_os_error(error)
else: # Message is not a list.
if self.debug:
return print("[DEBUG] inhibited message to '{}': '{}'".format(
topic, message), file=sys.stderr)
try:
publish.single(topic,
payload=message,
qos=self.qos,
retain=retain,
hostname=self.broker,
port=self.port,
client_id=self.client_id)
except PermissionError as error:
self.on_permission_error(error)
except OSError as error:
self.on_os_error(error)
def pull(self, topic=[]):
""" Return the state of a topic.
topic may be a list of topics or a single topic given as string.
Returns a paho message object or list of message objects. """
from paho.mqtt import subscribe
# Convert topics of type string to a single item list.
if type(topic) == str:
topic = [topic]
# Skip empty queries.
if topic == []: return
if self.debug:
print("[DEBUG] inhibited query for:", topic, file=sys.stderr)
return []
try:
return subscribe.simple(topic,
msg_count=len(topic),
qos=self.qos,
hostname=self.broker,
port=self.port,
client_id=self.client_id)
except PermissionError as error:
self.on_permission_error(error)
except OSError as error:
self.on_os_error(error)
def status(self):
""" Returns current status (string "open" or "closed") of the club. """
club_status = self.pull("club/status")
# Create a fake result to prevent errors if in debug mode.
if C4Interface.debug:
print("[DEBUG] Warning: handing over fake data to allow for further execution!",
file=sys.stderr)
class club_status: pass
club_status.payload = b'\x00'
if club_status.payload == b'\x01':
return "open"
else:
return "closed"
def open_gate(self):
""" Open the gate. """
self.push(None, topic="club/gate", retain=False)
def shutdown(self, force=False):
""" Invoke the shutdown routine. """
if force:
payload = b'\x44'
else:
payload = b'\x00'
self.push(payload, topic="club/shutdown", retain=False)
# }}}1
class Kitchenlight: # {{{1
""" Interface to the Kitchenlight and its functions. """
# List of available modes.
modes = [
"off",
"checker",
"matrix",
"mood",
"oc",
"pacman",
"sine",
"text",
"flood",
"clock"
]
_END = "little" # Kitchenlight endianess.
def __init__(self, topic="kitchenlight/change_screen",
powertopic="power/wohnzimmer/kitchenlight",
autopower=True):
self.topic = topic # Kitchenlight topic.
self.powertopic = powertopic # Topic for power on.
self.autopower = autopower # Power on on every mode change?
def _switch(self, data, poweron=False, poweroff=False):
""" Send commands via a C4Interface to the MQTT broker. """
if self.autopower or poweron or poweroff:
c4 = C4Interface()
command = []
command.append({
"topic" : self.topic,
"payload" : data })
if poweroff:
command.append({
"topic" : self.powertopic,
"payload" : b'\x00'})
elif self.autopower or poweron:
command.append({
"topic" : self.powertopic,
"payload" : b'\x01'})
c4.push(command)
else:
c4 = C4Interface()
c4.push(data, topic=self.topic)
def _expand_mode_name(self, name):
# Return exact match.
if name in self.modes:
return name
for mode in self.modes:
if mode.find(name) == 0:
return mode
# Fallback.
return name
def list_available(self):
""" Print a list of available Kitchenlight modes. """
print(
"Available Kitchenlight modes (options are optional):\n\n"
" off turn off Kitchenlight\n"
" checker [DELAY] [COLOR_1] [COLOR_2] Checker\n"
" matrix [LINES] Matrix\n"
" mood [1|2] (1=Colorwheel, 2=Random) Moodlight\n"
" oc [DELAY] Open Chaos\n"
" pacman Pacman\n"
" sine Sine\n"
" text [TEXT] [DELAY] Text\n"
" flood Flood\n"
" clock Clock\n"
)
def set_mode(self, mode, opts=[]):
""" Switch to given mode. """
mode = self._expand_mode_name(mode.lower())
if mode == "off":
return self.empty()
if mode == "checker":
return self.checker(*opts)
if mode == "matrix":
return self.matrix(*opts)
if mode == "mood":
return self.moodlight(*opts)
if mode == "oc":
return self.openchaos(*opts)
if mode == "pacman":
return self.pacman()
if mode == "sine":
return self.sine()
if mode == "text":
return self.text(*opts)
if mode == "flood":
return self.flood()
if mode == "clock":
return self.clock()
print("Error: unknown Kitchenlight mode {}!".format(mode))
return False
def empty(self):
""" Set to mode "empty" and turn off Kitchenlight. """
# Screen 0
d = int(0).to_bytes(4, self._END)
self._switch(d, poweroff=True)
def checker(self, delay=500, colA="0000ff", colB="00ff00"):
""" Set to mode "checker".
delay = delay in ms (default 500)
colA = first color (default 0000ff)
colB = second color (default 00ff00) """
# Kind of a hack: lets treat the two colors as DMX lights.
ca = Dmx("checker/a", colA.lstrip('#'))
cb = Dmx("checker/b", colB.lstrip('#'))
d = bytearray(20)
v = memoryview(d)
# Screen 1
v[0:4] = int(1).to_bytes(4, self._END)
# Delay
v[4:8] = int(delay).to_bytes(4, self._END)
# ColorA R/G/B
v[8:10] = int(ca.color[0:2], base=16).to_bytes(2, self._END)
v[10:12] = int(ca.color[2:4], base=16).to_bytes(2, self._END)
v[12:14] = int(ca.color[4:6], base=16).to_bytes(2, self._END)
# ColorB R/G/B
v[14:16] = int(cb.color[0:2], base=16).to_bytes(2, self._END)
v[16:18] = int(cb.color[2:4], base=16).to_bytes(2, self._END)
v[18:20] = int(cb.color[4:6], base=16).to_bytes(2, self._END)
self._switch(d)
def matrix(self, lines=8):
""" Set to mode "matrix".
lines (>0, <32) = number of lines (default 8) """
if int(lines) > 31: lines = 31 # Maximal line count.
d = bytearray(8)
v = memoryview(d)
# Screen 2
v[0:4] = int(2).to_bytes(4, self._END)
v[4:8] = int(lines).to_bytes(4, self._END)
self._switch(d)
def moodlight(self, mode=1):
""" Set to mode "moodlight".
mode [1|2] = colorwheel(1) or random(2) """
if mode == 1: # Mode "Colorwheel".
d = bytearray(19)
v = memoryview(d)
# Screen 3
v[0:4] = int(3).to_bytes(4, self._END)
# Mode
v[4:5] = int(mode).to_bytes(1, self._END)
# Step
v[5:9] = int(1).to_bytes(4, self._END)
# Fade delay
v[9:13] = int(10).to_bytes(4, self._END)
# Pause
v[13:17] = int(10000).to_bytes(4, self._END)
# Hue step
v[17:19] = int(30).to_bytes(2, self._END)
else: # Mode "Random".
d = bytearray(17)
v = memoryview(d)
# Screen 3
v[0:4] = int(3).to_bytes(4, self._END)
# Mode
v[4:5] = int(mode).to_bytes(1, self._END)
# Step
v[5:9] = int(1).to_bytes(4, self._END)
# Fade delay
v[9:13] = int(10).to_bytes(4, self._END)
# Pause
v[13:17] = int(10000).to_bytes(4, self._END)
self._switch(d)
def openchaos(self, delay=1000):
""" Set to mode "openchaos".
delay = delay in milliseconds (default 1000). """
d = bytearray(8)
v = memoryview(d)
# Screen 4
v[0:4] = int(4).to_bytes(4, self._END)
v[4:8] = int(delay).to_bytes(4, self._END)
self._switch(d)
def pacman(self):
""" Set to mode "pacman". """
# Screen 5
d = int(5).to_bytes(4, self._END)
self._switch(d)
def sine(self):
""" Set to mode "sine". """
# Screen 6
d = int(6).to_bytes(4, self._END)
self._switch(d)
# Screen 7 is Strobo, which is disabled because it seems to do harm to
# the Kitchenlight. Evil strobo.
def text(self, text="Hello World", delay=250):
""" Set to mode "text".
text (str < 256 bytes) = text to display (default "Hello World").
delay = delay in milliseconds (default 250). """
text = text.encode("ascii", "ignore")
if len(text) > 255: # Maximum text length.
print("Warning: text length must not exceed 255 characters!", file=sys.stderr)
text = text[:255]
d = bytearray(8 + len(text) + 1)
v = memoryview(d)
# Screen 8
v[0:4] = int(8).to_bytes(4, self._END)
v[4:8] = int(delay).to_bytes(4, self._END)
v[8:8 + len(text)] = text
v[len(d) - 1:len(d)] = bytes(1)
self._switch(d)
def flood(self):
""" Set to mode "flood". """
# Screen 9
d = int(9).to_bytes(4, self._END)
self._switch(d)
def clock(self):
""" Set to mode "clock". """
# Screen 11
d = int(11).to_bytes(4, self._END)
self._switch(d)
# }}}1
class Dmx: # {{{1
""" Abstraction of the 3 channel LED cans. """
# 3 bytes for color, one each for red, green and blue.
template = "000000"
def __init__(self, topic, color=None):
self.topic = topic
self.set_color(color or self.template)
self.is_master = topic.rfind("/master") == len(topic)-7 # 7 = len("/master")
def _pad_color(self, color):
""" Merge hex color values or payloads into the template.
Expand 4 bit hex code notation (eg. #f0f) and pad with template
to get a fitting payload for this kind of light. """
if len(color) > len(self.template):
# Silently truncate bytes exceeding template length.
return color[:len(self.template)]
# Expand 3 char codes and codes of half the required length.
# Yet, let's presume that a 6-char code is alway meant to be
# interpreted as a color and should never be expanded.
if len(color) != 6 and len(color) == 3 or len(color) == (len(self.template) / 2):
color = "".join(char*2 for char in color)
if len(color) == len(self.template): # Nothing more to do.
return color
# Add padding.
color = color + self.template[len(color):]
return color
def set_color(self, color):
""" Set color (hex) for this instance.
The color is then available via its color variable. """
color = self._pad_color(color)
self.color = color
self.payload = bytearray.fromhex(color)
# }}}1
class Dmx4(Dmx): # {{{1
""" Abstraction of the 4 channel LED cans. """
# 3 bytes for color plus 1 byte for brightness.
template = "000000ff"
# }}}1
class Dmx7(Dmx): # {{{1
""" Abstraction of the 7 channel LED cans. """
# 3 bytes for color, another 3 bytes for special functions and 1 byte
# for brightness.
template = "000000000000ff"
# }}}1
class C4Room: # {{{1
""" Methods of rooms in the club. """
def __init__(self):
self.c4 = C4Interface()
# get_switch_state() will store its result and a timestamp to reduce
# requests to the broker.
self._switch_state = ("", 0.0)
def _interactive_light_switch(self):
""" Interactively ask for input.
Returns str(userinput). Will not write to stdout if sys.stdin is
no tty. """
if sys.stdin.isatty():
print("[{}]".format(self.name))
print("Please enter 0 or 1 for every light:")
for level in range(len(self.switches)):
print((level * '|') + ",- " + self.switches[level][0])
switch_state = self.get_switch_state()
print(switch_state) # Present current state.
try:
userinput = sys.stdin.readline().rstrip('\n')
except KeyboardInterrupt:
print("\rInterrupted by user.")
return ""
return userinput
def get_switch_state(self, max_age=5):
""" Returns current state of switches as a string of 1s and 0s.
max_age specifies how old (in seconds) a cached responce from a
previously done request may be before it is considered outdated. """
from time import time
# We store switch states in self._switch_state to reduce requests to
# the broker. If this variable is neither empty nor too old, use it!
if self._switch_state[0] != "":
if time() - self._switch_state[1] <= max_age:
return self._switch_state[0]
state = ""
req = []
for topic in self.switches:
req.append(topic[1])
responce = self.c4.pull(req)
for sw in self.switches:
for r in responce:
if r.topic == sw[1]:
state += str(int.from_bytes(r.payload, sys.byteorder))
if C4Interface.debug:
print("[DEBUG] Warning: handing over fake data to allow for further execution!",
file=sys.stderr)
state = '0' * len(self.switches)
self._switch_state = (state, time())
return state
def light_switch(self, userinput=""):
""" Switch lamps in a room on or off. """
if not userinput:
# Derive user input from stdin.
userinput = self._interactive_light_switch()
if userinput == "": return
# Let's support some geeky binary operations!
mode = 'n' # n = normal, a = AND, o = OR, x = XOR.
if not userinput.isdecimal():
if userinput == '-':
print(self.get_switch_state())
return
elif userinput[0] == '&' and userinput[1:].strip().isdecimal():
# AND operator, applied later after doing some more validation.
userinput = userinput[1:].strip()
mode = 'a'
elif userinput[0] == '|' and userinput[1:].strip().isdecimal():
# OR operator, applied later after doing some more validation.
userinput = userinput[1:].strip()
mode = 'o'
elif userinput[0] == '^' and userinput[1:].strip().isdecimal():
# XOR operator, applied later after doing some more validation.
userinput = userinput[1:].strip()
mode = 'x'
elif (userinput[:2] == ">>" or userinput[:2] == "<<") \
and (userinput[2:].strip() == "" or userinput[2:].strip().isdecimal()):
# Left or right shift
# How far shall we shift?
if userinput[2:].strip().isdecimal():
shift_by = int(userinput[2:])
else:
shift_by = 1
# Retrieve the current state of switches.
switch_state = self.get_switch_state()
if userinput[:2] == ">>":
# Right shift. '[2:]' removes the leading 'b0...'.
new_state = bin(int(switch_state, base=2) >> shift_by)[2:]
else:
# Left shift. '[2:]' removes the leading 'b0...'.
new_state = bin(int(switch_state, base=2) << shift_by)[2:]
# Cut any exceeding leftmost bits.
new_state = new_state[-len(self.switches):]
# Pad with leading zeroes.
userinput = new_state.rjust(len(self.switches), '0')
else:
# Oh no, input contained non-decimal characters which we could
# not parse. :(
print("Error: could not parse input!", file=sys.stderr)
return
if len(userinput) != len(self.switches):
# First try to convert from decimal if userinput's length doesn't
# match.
if len(bin(int(userinput))) <= len(self.switches)+2:
# ^ +2 because bin() returns strings like 'b0...'.
# Convert to binary and pad with leading zeroes.
userinput = "{{:0{}b}}".format(len(self.switches)).format(int(userinput))
else:
print("Error: wrong number of digits (expected {}, got {})!".format(
len(self.switches), len(userinput)), file=sys.stderr)
return False
# Now that everything special is expanded it's time to check if
# userinput really consists of 1s and 0s only.
for digit in userinput:
if digit not in "01":
print("Error: invalid digit: " + digit, file=sys.stderr)
return False
if mode == 'a': # AND operator.
switch_state = self.get_switch_state()
userinput = "".join(map(lambda x, y: str(int(x) & int(y)),
userinput, switch_state))
elif mode == 'o': # OR operator.
switch_state = self.get_switch_state()
userinput = "".join(map(lambda x, y: str(int(x) | int(y)),
userinput, switch_state))
elif mode == 'x': # XOR operator.
switch_state = self.get_switch_state()
userinput = "".join(map(lambda x, y: str(int(x) ^ int(y)),
userinput, switch_state))
command=[]
for i in range(len(self.switches)):
# Skip unchanged switches if we happen to know their state.
if "switch_state" in dir():
if switch_state[i] == userinput[i]: continue
command.append({
"topic" : self.switches[i][1],
"payload" : bytes([int(userinput[i])])
})
return self.c4.push(command)
def set_colorscheme(self, colorscheme, magic):
""" Apply colorscheme to the LED Cans in this room. """
command = []
for light in self.lights:
if colorscheme.get_color_for(light.topic):
# Update internal state of this Dmx object, so we can query
#