Refactoring Code to mesh with the architecture set by https://github.com/GammaGames/kano_wand
Overall - moving from bluez to bleak will give us universal usage, and the rest of the ML side of this project can go on without a hitch.
This commit is contained in:
@@ -0,0 +1,707 @@
|
||||
# name='kano_wand'
|
||||
# description='Python module for interfacing with the Kano wand'
|
||||
# author='Jesse Lieberg (@GammaGames)'
|
||||
# url='https://github.com/GammaGames/kano_wand'
|
||||
|
||||
from enum import Enum
|
||||
import inspect
|
||||
import numpy
|
||||
import threading
|
||||
import uuid
|
||||
|
||||
from time import sleep
|
||||
|
||||
class _INFO(Enum):
|
||||
"""Enum containing info UUIDs"""
|
||||
SERVICE = '64A70010-F691-4B93-A6F4-0968F5B648F8'
|
||||
ORGANIZATION_CHAR = '64A7000B-F691-4B93-A6F4-0968F5B648F8'
|
||||
SOFTWARE_CHAR = '64A70013-F691-4B93-A6F4-0968F5B648F8'
|
||||
HARDWARE_CHAR = '64A70001-F691-4B93-A6F4-0968F5B648F8'
|
||||
|
||||
class _IO(Enum):
|
||||
"""Enum containing _IO UUIDs"""
|
||||
SERVICE = '64A70012-F691-4B93-A6F4-0968F5B648F8'
|
||||
BATTERY_CHAR = '64A70007-F691-4B93-A6F4-0968F5B648F8'
|
||||
USER_BUTTON_CHAR = '64A7000D-F691-4B93-A6F4-0968F5B648F8'
|
||||
VIBRATOR_CHAR = '64A70008-F691-4B93-A6F4-0968F5B648F8'
|
||||
LED_CHAR = '64A70009-F691-4B93-A6F4-0968F5B648F8'
|
||||
KEEP_ALIVE_CHAR = '64A7000F-F691-4B93-A6F4-0968F5B648F8'
|
||||
|
||||
class _SENSOR(Enum):
|
||||
"""Enum containing sensor UUIDs"""
|
||||
SERVICE = '64A70011-F691-4B93-A6F4-0968F5B648F8'
|
||||
TEMP_CHAR = '64A70014-F691-4B93-A6F4-0968F5B648F8'
|
||||
QUATERNIONS_CHAR = '64A70002-F691-4B93-A6F4-0968F5B648F8'
|
||||
# RAW_CHAR = '64A7000A-F691-4B93-A6F4-0968F5B648F8'
|
||||
# MOTION_CHAR = '64A7000C-F691-4B93-A6F4-0968F5B648F8'
|
||||
MAGN_CALIBRATE_CHAR = '64A70021-F691-4B93-A6F4-0968F5B648F8'
|
||||
QUATERNIONS_RESET_CHAR = '64A70004-F691-4B93-A6F4-0968F5B648F8'
|
||||
|
||||
class PATTERN(Enum):
|
||||
"""Enum for wand vibration patterns"""
|
||||
REGULAR = 1
|
||||
SHORT = 2
|
||||
BURST = 3
|
||||
LONG = 4
|
||||
SHORT_LONG = 5
|
||||
SHORT_SHORT = 6
|
||||
BIG_PAUSE = 7
|
||||
|
||||
class Wand(Peripheral, DefaultDelegate):
|
||||
"""A wand class to interact with the Kano wand
|
||||
"""
|
||||
|
||||
def __init__(self, device, debug=False):
|
||||
"""Create a new wand
|
||||
|
||||
Arguments:
|
||||
device {bluepy.ScanEntry} -- Device information
|
||||
|
||||
Keyword Arguments:
|
||||
debug {bool} -- Print debug messages (default: {False})
|
||||
"""
|
||||
super().__init__(None)
|
||||
# Meta stuff
|
||||
self.debug = debug
|
||||
self._dev = device
|
||||
self.name = device.getValueText(9)
|
||||
|
||||
if debug:
|
||||
print("Wand: {}\n\rWand Mac: {}".format(self.name, device.addr))
|
||||
|
||||
# Notification stuff
|
||||
self.connected = False
|
||||
self._position_callbacks = {}
|
||||
self._position_subscribed = False
|
||||
self._button_callbacks = {}
|
||||
self._button_subscribed = False
|
||||
self._temperature_callbacks = {}
|
||||
self._temperature_subscribed = False
|
||||
self._battery_callbacks = {}
|
||||
self._battery_subscribed = False
|
||||
self._notification_thread = None
|
||||
self._position_notification_handle = 41
|
||||
self._button_notification_handle = 33
|
||||
self._temp_notification_handle = 56
|
||||
self._battery_notification_handle = 23
|
||||
|
||||
def connect(self):
|
||||
if self.debug:
|
||||
print("Connecting to {}...".format(self.name))
|
||||
|
||||
super(Wand, self).connect(self._dev)
|
||||
self._lock = threading.Lock()
|
||||
self.connected = True
|
||||
self.setDelegate(self)
|
||||
self._info_service = self.getServiceByUUID(_INFO.SERVICE.value)
|
||||
self._io_service = self.getServiceByUUID(_IO.SERVICE.value)
|
||||
self._sensor_service = self.getServiceByUUID(_SENSOR.SERVICE.value)
|
||||
|
||||
self.post_connect()
|
||||
|
||||
if self.debug:
|
||||
print("Connected to {}".format(self.name))
|
||||
|
||||
def post_connect(self):
|
||||
"""Do anything necessary after connecting
|
||||
"""
|
||||
pass
|
||||
|
||||
def disconnect(self):
|
||||
super().disconnect()
|
||||
self.connected = False
|
||||
self._position_subscribed = False
|
||||
self._button_subscribed = False
|
||||
self._temperature_subscribed = False
|
||||
self._battery_subscribed = False
|
||||
|
||||
self.post_disconnect()
|
||||
|
||||
if self.debug:
|
||||
print("Disconnected from {}".format(self.name))
|
||||
|
||||
def post_disconnect(self):
|
||||
"""Do anything necessary after disconnecting
|
||||
"""
|
||||
pass
|
||||
|
||||
def get_organization(self):
|
||||
"""Get organization of device
|
||||
|
||||
Returns {str} -- Organization name
|
||||
"""
|
||||
with self._lock:
|
||||
if not hasattr(self, "_organization_handle"):
|
||||
handle = self._info_service.getCharacteristics(_INFO.ORGANIZATION_CHAR.value)[0]
|
||||
self._organization_handle = handle.getHandle()
|
||||
return self.readCharacteristic(self._organization_handle).decode("utf-8")
|
||||
|
||||
def get_software_version(self):
|
||||
"""Get software version
|
||||
|
||||
Returns {str} -- Version number
|
||||
"""
|
||||
with self._lock:
|
||||
if not hasattr(self, "_software_handle"):
|
||||
handle = self._info_service.getCharacteristics(_INFO.SOFTWARE_CHAR.value)[0]
|
||||
self._software_handle = handle.getHandle()
|
||||
return self.readCharacteristic(self._software_handle).decode("utf-8")
|
||||
|
||||
def get_hardware_version(self):
|
||||
"""Get hardware version
|
||||
|
||||
Returns {str} -- Hardware version
|
||||
"""
|
||||
with self._lock:
|
||||
if not hasattr(self, "_hardware_handle"):
|
||||
handle = self._info_service.getCharacteristics(_INFO.HARDWARE_CHAR.value)[0]
|
||||
self._hardware_handle = handle.getHandle()
|
||||
return self.readCharacteristic(self._hardware_handle).decode("utf-8")
|
||||
|
||||
def get_battery(self):
|
||||
"""Get battery level (currently only returns 0)
|
||||
|
||||
Returns {str} -- Battery level
|
||||
"""
|
||||
with self._lock:
|
||||
if not hasattr(self, "_battery_handle"):
|
||||
handle = self._io_service.getCharacteristics(_IO.BATTERY_CHAR.value)[0]
|
||||
self._battery_handle = handle.getHandle()
|
||||
return self.readCharacteristic(self._battery_handle).decode("utf-8")
|
||||
|
||||
def get_button(self):
|
||||
"""Get current button status
|
||||
|
||||
Returns {bool} -- Button pressed status
|
||||
"""
|
||||
with self._lock:
|
||||
if not hasattr(self, "_button_handle"):
|
||||
handle = self._io_service.getCharacteristics(_IO.USER_BUTTON_CHAR.value)[0]
|
||||
self._button_handle = handle.getHandle()
|
||||
|
||||
data = self.readCharacteristic(self._button_handle)
|
||||
return data[0] == 1
|
||||
|
||||
def get_temperature(self):
|
||||
"""Get temperature
|
||||
|
||||
Returns {str} -- Battery level
|
||||
"""
|
||||
with self._lock:
|
||||
if not hasattr(self, "_temperature_handle"):
|
||||
handle = self._sensor_service.getCharacteristics(_SENSOR.TEMP_CHAR.value)[0]
|
||||
self._temperature_handle = handle.getHandle()
|
||||
return self.readCharacteristic(self._temperature_handle).decode("utf-8")
|
||||
|
||||
def keep_alive(self):
|
||||
"""Keep the wand's connection active
|
||||
|
||||
Returns {bytes} -- Status
|
||||
"""
|
||||
# Is not documented because it doesn't seem to work?
|
||||
if self.debug:
|
||||
print("Keeping wand alive.")
|
||||
|
||||
with self._lock:
|
||||
if not hasattr(self, "_alive_handle"):
|
||||
handle = self._io_service.getCharacteristics(_IO.KEEP_ALIVE_CHAR.value)[0]
|
||||
self._alive_handle = handle.getHandle()
|
||||
return self.writeCharacteristic(self._alive_handle, bytes([1]), withResponse=True)
|
||||
|
||||
def vibrate(self, pattern=PATTERN.REGULAR):
|
||||
"""Vibrate wand with pattern
|
||||
|
||||
Keyword Arguments:
|
||||
pattern {kano_wand.PATTERN} -- Vibration pattern (default: {PATTERN.REGULAR})
|
||||
|
||||
Returns {bytes} -- Status
|
||||
"""
|
||||
with self._lock:
|
||||
if isinstance(pattern, PATTERN):
|
||||
message = [pattern.value]
|
||||
else:
|
||||
message = [pattern]
|
||||
|
||||
if self.debug:
|
||||
print("Setting LED to {}".format(message))
|
||||
|
||||
if not hasattr(self, "_vibrator_handle"):
|
||||
handle = self._io_service.getCharacteristics(_IO.VIBRATOR_CHAR.value)[0]
|
||||
self._vibrator_handle = handle.getHandle()
|
||||
return self.writeCharacteristic(self._vibrator_handle, bytes(message), withResponse=True)
|
||||
|
||||
def set_led(self, color="0x2185d0", on=True):
|
||||
"""Set the LED's color
|
||||
|
||||
Keyword Arguments:
|
||||
color {str} -- Color hex code (default: {"0x2185d0"})
|
||||
on {bool} -- Whether light is on or off (default: {True})
|
||||
|
||||
Returns {bytes} -- Status
|
||||
"""
|
||||
message = []
|
||||
if on:
|
||||
message.append(1)
|
||||
else:
|
||||
message.append(0)
|
||||
|
||||
# I got this from Kano's node module
|
||||
color = int(color.replace("#", ""), 16)
|
||||
r = (color >> 16) & 255
|
||||
g = (color >> 8) & 255
|
||||
b = color & 255
|
||||
rgb = (((r & 248) << 8) + ((g & 252) << 3) + ((b & 248) >> 3))
|
||||
message.append(rgb >> 8)
|
||||
message.append(rgb & 0xff)
|
||||
|
||||
if self.debug:
|
||||
print("Setting LED to {}".format(message))
|
||||
|
||||
with self._lock:
|
||||
if not hasattr(self, "_led_handle"):
|
||||
handle = self._io_service.getCharacteristics(_IO.LED_CHAR.value)[0]
|
||||
self._led_handle = handle.getHandle()
|
||||
return self.writeCharacteristic(self._led_handle, bytes(message), withResponse=True)
|
||||
|
||||
# SENSORS
|
||||
def on(self, event, callback):
|
||||
"""Add an event listener
|
||||
|
||||
Arguments:
|
||||
event {str} -- Event type, "position", "button", "temp", or "battery"
|
||||
callback {function} -- Callback function
|
||||
|
||||
Returns {str} -- ID of the callback for removal later
|
||||
"""
|
||||
if self.debug:
|
||||
print("Adding callback for {} notification...".format(event))
|
||||
|
||||
id = None
|
||||
if event == "position":
|
||||
id = uuid.uuid4()
|
||||
self._position_callbacks[id] = callback
|
||||
self.subscribe_position()
|
||||
elif event == "button":
|
||||
id = uuid.uuid4()
|
||||
self._button_callbacks[id] = callback
|
||||
self.subscribe_button()
|
||||
elif event == "temp":
|
||||
id = uuid.uuid4()
|
||||
self._temperature_callbacks[id] = callback
|
||||
self.subscribe_temperature()
|
||||
elif event == "battery":
|
||||
id = uuid.uuid4()
|
||||
self._battery_callbacks[id] = callback
|
||||
self.subscribe_battery()
|
||||
|
||||
return id
|
||||
|
||||
def off(self, uuid, continue_notifications=False):
|
||||
"""Remove a callback
|
||||
|
||||
Arguments:
|
||||
uuid {str} -- Remove a callback with its id
|
||||
|
||||
Keyword Arguments:
|
||||
continue_notifications {bool} -- Keep notification thread running (default: {False})
|
||||
|
||||
Returns {bool} -- If removal was successful or not
|
||||
"""
|
||||
removed = False
|
||||
if self._position_callbacks.get(uuid) != None:
|
||||
removed = True
|
||||
self._position_callbacks.pop(uuid)
|
||||
if len(self._position_callbacks.values()) == 0:
|
||||
self.unsubscribe_position(continue_notifications=continue_notifications)
|
||||
elif self._button_callbacks.get(uuid) != None:
|
||||
removed = True
|
||||
self._button_callbacks.pop(uuid)
|
||||
if len(self._button_callbacks.values()) == 0:
|
||||
self.unsubscribe_button(continue_notifications=continue_notifications)
|
||||
elif self._temperature_callbacks.get(uuid) != None:
|
||||
removed = True
|
||||
self._temperature_callbacks.pop(uuid)
|
||||
if len(self._temperature_callbacks.values()) == 0:
|
||||
self.unsubscribe_temperature(continue_notifications=continue_notifications)
|
||||
elif self._battery_callbacks.get(uuid) != None:
|
||||
removed = True
|
||||
self._battery_callbacks.pop(uuid)
|
||||
if len(self._battery_callbacks.values()) == 0:
|
||||
self.unsubscribe_battery(continue_notifications=continue_notifications)
|
||||
|
||||
if self.debug:
|
||||
if removed:
|
||||
print("Removed callback {}".format(uuid))
|
||||
else:
|
||||
print("Could not remove callback {}".format(uuid))
|
||||
|
||||
return removed
|
||||
|
||||
def subscribe_position(self):
|
||||
"""Subscribe to position notifications and start thread if necessary
|
||||
"""
|
||||
if self.debug:
|
||||
print("Subscribing to position notification")
|
||||
|
||||
self._position_subscribed = True
|
||||
with self._lock:
|
||||
if not hasattr(self, "_position_handle"):
|
||||
handle = self._sensor_service.getCharacteristics(_SENSOR.QUATERNIONS_CHAR.value)[0]
|
||||
self._position_handle = handle.getHandle()
|
||||
|
||||
self.writeCharacteristic(self._position_handle + 1, bytes([1, 0]))
|
||||
self._start_notification_thread()
|
||||
|
||||
def unsubscribe_position(self, continue_notifications=False):
|
||||
"""Unsubscribe to position notifications
|
||||
|
||||
Keyword Arguments:
|
||||
continue_notifications {bool} -- Keep notification thread running (default: {False})
|
||||
"""
|
||||
if self.debug:
|
||||
print("Unsubscribing from position notification")
|
||||
|
||||
self._position_subscribed = continue_notifications
|
||||
with self._lock:
|
||||
if not hasattr(self, "_position_handle"):
|
||||
handle = self._sensor_service.getCharacteristics(_SENSOR.QUATERNIONS_CHAR.value)[0]
|
||||
self._position_handle = handle.getHandle()
|
||||
|
||||
self.writeCharacteristic(self._position_handle + 1, bytes([0, 0]))
|
||||
|
||||
def subscribe_button(self):
|
||||
"""Subscribe to button notifications and start thread if necessary
|
||||
"""
|
||||
if self.debug:
|
||||
print("Subscribing to button notification")
|
||||
|
||||
self._button_subscribed = True
|
||||
with self._lock:
|
||||
if not hasattr(self, "_button_handle"):
|
||||
handle = self._io_service.getCharacteristics(_IO.USER_BUTTON_CHAR.value)[0]
|
||||
self._button_handle = handle.getHandle()
|
||||
|
||||
self.writeCharacteristic(self._button_handle + 1, bytes([1, 0]))
|
||||
self._start_notification_thread()
|
||||
|
||||
def unsubscribe_button(self, continue_notifications=False):
|
||||
"""Unsubscribe to button notifications
|
||||
|
||||
Keyword Arguments:
|
||||
continue_notifications {bool} -- Keep notification thread running (default: {False})
|
||||
"""
|
||||
if self.debug:
|
||||
print("Unsubscribing from button notification")
|
||||
|
||||
self._button_subscribed = continue_notifications
|
||||
with self._lock:
|
||||
if not hasattr(self, "_button_handle"):
|
||||
handle = self._io_service.getCharacteristics(_IO.USER_BUTTON_CHAR.value)[0]
|
||||
self._button_handle = handle.getHandle()
|
||||
|
||||
self.writeCharacteristic(self._button_handle + 1, bytes([0, 0]))
|
||||
|
||||
def subscribe_temperature(self):
|
||||
"""Subscribe to temperature notifications and start thread if necessary
|
||||
"""
|
||||
if self.debug:
|
||||
print("Subscribing to temperature notification")
|
||||
|
||||
self._temperature_subscribed = True
|
||||
with self._lock:
|
||||
if not hasattr(self, "_temp_handle"):
|
||||
handle = self._sensor_service.getCharacteristics(_SENSOR.TEMP_CHAR.value)[0]
|
||||
self._temp_handle = handle.getHandle()
|
||||
|
||||
self.writeCharacteristic(self._temp_handle + 1, bytes([1, 0]))
|
||||
self._start_notification_thread()
|
||||
|
||||
def unsubscribe_temperature(self, continue_notifications=False):
|
||||
"""Unsubscribe to temperature notifications
|
||||
|
||||
Keyword Arguments:
|
||||
continue_notifications {bool} -- Keep notification thread running (default: {False})
|
||||
"""
|
||||
if self.debug:
|
||||
print("Unsubscribing from temperature notification")
|
||||
|
||||
self._temperature_subscribed = continue_notifications
|
||||
with self._lock:
|
||||
if not hasattr(self, "_temp_handle"):
|
||||
handle = self._sensor_service.getCharacteristics(_SENSOR.TEMP_CHAR.value)[0]
|
||||
self._temp_handle = handle.getHandle()
|
||||
|
||||
self.writeCharacteristic(self._temp_handle + 1, bytes([0, 0]))
|
||||
|
||||
def subscribe_battery(self):
|
||||
"""Subscribe to battery notifications and start thread if necessary
|
||||
"""
|
||||
if self.debug:
|
||||
print("Subscribing to battery notification")
|
||||
|
||||
self._battery_subscribed = True
|
||||
with self._lock:
|
||||
if not hasattr(self, "_battery_handle"):
|
||||
handle = self._io_service.getCharacteristics(_IO.BATTERY_CHAR .value)[0]
|
||||
self._battery_handle = handle.getHandle()
|
||||
|
||||
self.writeCharacteristic(self._battery_handle + 1, bytes([1, 0]))
|
||||
self._start_notification_thread()
|
||||
|
||||
def unsubscribe_battery(self, continue_notifications=False):
|
||||
"""Unsubscribe to battery notifications
|
||||
|
||||
Keyword Arguments:
|
||||
continue_notifications {bool} -- Keep notification thread running (default: {False})
|
||||
"""
|
||||
if self.debug:
|
||||
print("Unsubscribing from battery notification")
|
||||
|
||||
self._battery_subscribed = continue_notifications
|
||||
with self._lock:
|
||||
if not hasattr(self, "_battery_handle"):
|
||||
handle = self._io_service.getCharacteristics(_IO.BATTERY_CHAR .value)[0]
|
||||
self._battery_handle = handle.getHandle()
|
||||
|
||||
self.writeCharacteristic(self._battery_handle + 1, bytes([0, 0]))
|
||||
|
||||
def _start_notification_thread(self):
|
||||
try:
|
||||
if self._notification_thread == None:
|
||||
self.reset_position()
|
||||
self._notification_thread = threading.Thread(target=self._notification_wait)
|
||||
self._notification_thread.start()
|
||||
except:
|
||||
pass
|
||||
|
||||
def _notification_wait(self):
|
||||
if self.debug:
|
||||
print("Notification thread started")
|
||||
|
||||
while (self.connected and
|
||||
(self._position_subscribed or
|
||||
self._button_subscribed or
|
||||
self._temperature_subscribed or
|
||||
self._battery_subscribed)):
|
||||
try:
|
||||
if super().waitForNotifications(1):
|
||||
continue
|
||||
except:
|
||||
continue
|
||||
|
||||
if self.debug:
|
||||
print("Notification thread stopped")
|
||||
|
||||
def _on_position(self, data):
|
||||
"""Private function for position notification
|
||||
|
||||
Arguments:
|
||||
data {bytes} -- Data from device
|
||||
"""
|
||||
# I got part of this from Kano's node module and modified it
|
||||
y = numpy.int16(numpy.uint16(int.from_bytes(data[0:2], byteorder='little')))
|
||||
x = -1 * numpy.int16(numpy.uint16(int.from_bytes(data[2:4], byteorder='little')))
|
||||
w = -1 * numpy.int16(numpy.uint16(int.from_bytes(data[4:6], byteorder='little')))
|
||||
z = numpy.int16(numpy.uint16(int.from_bytes(data[6:8], byteorder='little')))
|
||||
|
||||
if self.debug:
|
||||
pitch = "Pitch: {}".format(z).ljust(16)
|
||||
roll = "Roll: {}".format(w).ljust(16)
|
||||
print("{}{}(x, y): ({}, {})".format(pitch, roll, x, y))
|
||||
|
||||
self.on_position(x, y, z, w)
|
||||
for callback in self._position_callbacks.values():
|
||||
callback(x, y, z, w)
|
||||
|
||||
def on_position(self, roll, x, y, z):
|
||||
"""Function called on position notification
|
||||
|
||||
Arguments:
|
||||
x {int} -- X position of wand (Between -1000 and 1000)
|
||||
y {int} -- Y position of wand (Between -1000 and 1000)
|
||||
pitch {int} -- Pitch of wand (Between -1000 and 1000)
|
||||
roll {int} -- Roll of wand (Between -1000 and 1000)
|
||||
"""
|
||||
pass
|
||||
|
||||
def reset_position(self):
|
||||
"""Reset the quaternains of the wand
|
||||
"""
|
||||
handle = self._sensor_service.getCharacteristics(_SENSOR.QUATERNIONS_RESET_CHAR.value)[0].getHandle()
|
||||
with self._lock:
|
||||
self.writeCharacteristic(handle, bytes([1]))
|
||||
|
||||
def _on_button(self, data):
|
||||
"""Private function for button notification
|
||||
|
||||
Arguments:
|
||||
data {bytes} -- Data from device
|
||||
"""
|
||||
val = data[0] == 1
|
||||
|
||||
if self.debug:
|
||||
print("Button: {}".format(val))
|
||||
|
||||
self.on_button(val)
|
||||
for callback in self._button_callbacks.values():
|
||||
callback(val)
|
||||
|
||||
def on_button(self, value):
|
||||
"""Function called on button notification
|
||||
|
||||
Arguments:
|
||||
pressed {bool} -- If button is pressed
|
||||
"""
|
||||
pass
|
||||
|
||||
def _on_temperature(self, data):
|
||||
"""Private function for temperature notification
|
||||
|
||||
Arguments:
|
||||
data {bytes} -- Data from device
|
||||
"""
|
||||
val = numpy.int16(numpy.uint16(int.from_bytes(data[0:2], byteorder='little')))
|
||||
|
||||
if self.debug:
|
||||
print("Temperature: {}".format(val))
|
||||
|
||||
self.on_temperature(val)
|
||||
for callback in self._temperature_callbacks.values():
|
||||
callback(val)
|
||||
|
||||
def on_temperature(self, value):
|
||||
"""Function called on temperature notification
|
||||
|
||||
Arguments:
|
||||
value {int} -- Temperature of the wand
|
||||
"""
|
||||
pass
|
||||
|
||||
def _on_battery(self, data):
|
||||
"""Private function for battery notification
|
||||
|
||||
Arguments:
|
||||
data {bytes} -- Data from device
|
||||
"""
|
||||
val = data[0]
|
||||
|
||||
if self.debug:
|
||||
print("Battery: {}".format(val))
|
||||
|
||||
self.on_battery(val)
|
||||
for callback in self._battery_callbacks.values():
|
||||
callback(val)
|
||||
|
||||
def on_battery(self, value):
|
||||
"""Function called on battery notification
|
||||
|
||||
Arguments:
|
||||
value {int} -- Battery level of the wand
|
||||
"""
|
||||
|
||||
def handleNotification(self, cHandle, data):
|
||||
"""Handle notifications subscribed to
|
||||
|
||||
Arguments:
|
||||
cHandle {int} -- Handle of notification
|
||||
data {bytes} -- Data from device
|
||||
"""
|
||||
if cHandle == self._position_notification_handle:
|
||||
self._on_position(data)
|
||||
elif cHandle == self._button_notification_handle:
|
||||
self._on_button(data)
|
||||
elif cHandle == self._temp_notification_handle:
|
||||
self._on_temperature(data)
|
||||
elif cHandle == self._battery_notification_handle:
|
||||
self._on_battery(data)
|
||||
|
||||
class Shop(DefaultDelegate):
|
||||
"""A scanner class to connect to wands
|
||||
"""
|
||||
def __init__(self, wand_class=Wand, debug=False):
|
||||
"""Create a new scanner
|
||||
|
||||
Keyword Arguments:
|
||||
wand_class {class} -- Class to use when connecting to wand (default: {Wand})
|
||||
debug {bool} -- Print debug messages (default: {False})
|
||||
"""
|
||||
super().__init__()
|
||||
self.wand_class = wand_class
|
||||
self.debug = debug
|
||||
self._name = None
|
||||
self._prefix = None
|
||||
self._mac = None
|
||||
self._scanner = Scanner().withDelegate(self)
|
||||
|
||||
def scan(self, name=None, prefix="Kano-Wand", mac=None, timeout=1.0, connect=False):
|
||||
"""Scan for devices
|
||||
|
||||
Keyword Arguments:
|
||||
name {str} -- Name of the device to scan for (default: {None})
|
||||
prefix {str} -- Prefix of name of device to scan for (default: {"Kano-Wand"})
|
||||
mac {str} -- MAC Address of the device to scan for (default: {None})
|
||||
timeout {float} -- Timeout before returning from scan (default: {1.0})
|
||||
connect {bool} -- Connect to the wands automatically (default: {False})
|
||||
|
||||
Returns {Wand[]} -- Array of wand objects
|
||||
"""
|
||||
|
||||
if self.debug:
|
||||
print("Scanning for {} seconds...".format(timeout))
|
||||
try:
|
||||
name_check = not (name is None)
|
||||
prefix_check = not (prefix is None)
|
||||
mac_check = not (mac is None)
|
||||
assert name_check or prefix_check or mac_check
|
||||
except AssertionError as e:
|
||||
print("Either a name, prefix, or mac address must be provided to find a wand")
|
||||
raise e
|
||||
|
||||
if name is not None:
|
||||
self._name = name
|
||||
elif prefix is not None:
|
||||
self._prefix = prefix
|
||||
elif mac is not None:
|
||||
self._mac = mac
|
||||
|
||||
self.wands = []
|
||||
self._scanner.scan(timeout)
|
||||
if connect:
|
||||
for wand in self.wands:
|
||||
wand.connect()
|
||||
return self.wands
|
||||
|
||||
def handleDiscovery(self, device, isNewDev, isNewData):
|
||||
"""Check if the device matches
|
||||
|
||||
Arguments:
|
||||
device {bluepy.ScanEntry} -- Device data
|
||||
isNewDev {bool} -- Whether the device is new
|
||||
isNewData {bool} -- Whether the device has already been seen
|
||||
"""
|
||||
|
||||
if isNewDev:
|
||||
# Perform initial detection attempt
|
||||
mode = 0
|
||||
found = 0
|
||||
name = device.getValueText(9)
|
||||
if self._name is not None:
|
||||
mode += 1
|
||||
if name == self._name:
|
||||
found += 1
|
||||
if self._prefix is not None:
|
||||
mode += 1
|
||||
if name is not None and name.startswith(self._prefix):
|
||||
found += 1
|
||||
if self._mac is not None:
|
||||
mode += 1
|
||||
if device.addr == self._mac:
|
||||
found += 1
|
||||
|
||||
if found >= mode:
|
||||
self.wands.append(self.wand_class(device, debug=self.debug))
|
||||
elif self.debug:
|
||||
if name != "None":
|
||||
print("Mac: {}\tCommon Name: {}".format(device.addr, name))
|
||||
else:
|
||||
print("Mac: {}".format(device.addr))
|
||||
+68
-64
@@ -5,88 +5,92 @@ import platform
|
||||
import uuid
|
||||
import pandas as pd
|
||||
from kano_wand.ble_client import KanoBLEClient
|
||||
from kano_wand.constants import NINE_AXIS, ACCELEROMETER, BUTTON
|
||||
from time import sleep
|
||||
from tkinter import *
|
||||
from bleak import discover
|
||||
|
||||
"""
|
||||
This is used for reading and decoding values from t he Kano Harry Potter Coding Wand
|
||||
- Wand sends 25 updates to gyro and accel every second
|
||||
- Wand sends 3 Dimensional data as 48 bit reverse marshalled integers
|
||||
- Wand seems to send smaller single dimensional data in 16bit unsigned integers
|
||||
"""
|
||||
|
||||
HEIGHT = 2
|
||||
WIDTH = 35
|
||||
LOOP = asyncio.get_event_loop()
|
||||
|
||||
|
||||
class SpellCollectionGUI:
|
||||
def __init__(self, root, spells, kano_ble_client):
|
||||
self.root = root
|
||||
self.kano_ble_client = kano_ble_client
|
||||
self.data_output_file = f"recording-session-{uuid.uuid4()}.csv"
|
||||
# class SpellCollectionGUI:
|
||||
# def __init__(self, root, spells, kano_ble_client):
|
||||
# self.root = root
|
||||
# self.kano_ble_client = kano_ble_client
|
||||
# self.data_output_file = f"recording-session-{uuid.uuid4()}.csv"
|
||||
#
|
||||
# self.root = root
|
||||
# self.root.title("Microphone Recorder")
|
||||
#
|
||||
# self.label = Label(root, text="Select a spell, and then press start to begin recording spell data!")
|
||||
# self.label.pack()
|
||||
#
|
||||
# self.spell_variable = StringVar(root)
|
||||
# self.spell_list = OptionMenu(root, self.spell_variable,
|
||||
# *["", *[spell for spell in spells]
|
||||
# ], command=self.select_spell)
|
||||
# self.spell_list.config(height=HEIGHT, width=WIDTH)
|
||||
# self.spell_list.pack()
|
||||
#
|
||||
# self.recording_button = Button(root, text="Start Recording", command=self.start_recording, height=HEIGHT,
|
||||
# width=WIDTH)
|
||||
# self.recording_button.pack()
|
||||
#
|
||||
# self.close_button = Button(root, text="Close", command=root.quit, height=HEIGHT, width=WIDTH)
|
||||
# self.close_button.pack()
|
||||
#
|
||||
# self.spell = None
|
||||
# self.root.after(0, self.kano_ble_client.connect, True)
|
||||
#
|
||||
# def select_spell(self, selected_value):
|
||||
# self.spell = selected_value
|
||||
#
|
||||
# def start_recording(self):
|
||||
# print("Started Recording")
|
||||
# self.root.after(0)
|
||||
#
|
||||
# self.kano_ble_client.start_recieving_data()
|
||||
#
|
||||
#
|
||||
# def select_spell(spells):
|
||||
# try:
|
||||
# spell = spells[int(input("Please select a spell.\n " +
|
||||
# "\n".join([f"{s} - {spells[s]}" for s in range(len(spells))]) +
|
||||
# "\nenter -1 to exit\n"))]
|
||||
# print(f"You have selected {spell}")
|
||||
# return spell
|
||||
# except Exception as e:
|
||||
# print(e)
|
||||
# print("Selected spell is uncorrect")
|
||||
# return None
|
||||
|
||||
self.root = root
|
||||
self.root.title("Microphone Recorder")
|
||||
|
||||
self.label = Label(root, text="Select a spell, and then press start to begin recording spell data!")
|
||||
self.label.pack()
|
||||
|
||||
self.spell_variable = StringVar(root)
|
||||
self.spell_list = OptionMenu(root, self.spell_variable,
|
||||
*["", *[spell for spell in spells]
|
||||
], command=self.select_spell)
|
||||
self.spell_list.config(height=HEIGHT, width=WIDTH)
|
||||
self.spell_list.pack()
|
||||
|
||||
self.recording_button = Button(root, text="Start Recording", command=self.start_recording, height=HEIGHT,
|
||||
width=WIDTH)
|
||||
self.recording_button.pack()
|
||||
|
||||
self.close_button = Button(root, text="Close", command=root.quit, height=HEIGHT, width=WIDTH)
|
||||
self.close_button.pack()
|
||||
|
||||
self.spell = None
|
||||
self.root.after(0, self.kano_ble_client.connect, True)
|
||||
|
||||
def select_spell(self, selected_value):
|
||||
self.spell = selected_value
|
||||
|
||||
def start_recording(self):
|
||||
print("Started Recording")
|
||||
self.root.after(0)
|
||||
|
||||
self.kano_ble_client.start_recieving_data()
|
||||
|
||||
|
||||
def select_spell(spells):
|
||||
try:
|
||||
spell = spells[int(input("Please select a spell.\n " +
|
||||
"\n".join([f"{s} - {spells[s]}" for s in range(len(spells))]) +
|
||||
"\nenter -1 to exit\n"))]
|
||||
print(f"You have selected {spell}")
|
||||
return spell
|
||||
except Exception as e:
|
||||
print(e)
|
||||
print("Selected spell is uncorrect")
|
||||
return None
|
||||
async def main(loop):
|
||||
device_address = "D8:9B:12:D1:08:80"
|
||||
|
||||
os.environ["PYTHONASYNCIODEBUG"] = str(1)
|
||||
address = (
|
||||
device_address # <--- Change to your device's address here if you are using Windows or Linux
|
||||
if platform.system() != "Darwin"
|
||||
else "243E23AE-4A99-406C-B317-18F1BD7B4CBE" # <--- Change to your device's address here if you are using macOS
|
||||
)
|
||||
# device_address = "D8:9B:12:D1:08:80"
|
||||
#
|
||||
# os.environ["PYTHONASYNCIODEBUG"] = str(1)
|
||||
# address = (
|
||||
# device_address # <--- Change to your device's address here if you are using Windows or Linux
|
||||
# if platform.system() != "Darwin"
|
||||
# else "243E23AE-4A99-406C-B317-18F1BD7B4CBE" # <--- Change to your device's address here if you are using macOS
|
||||
# )
|
||||
|
||||
# One method for connect
|
||||
# One for getting all data
|
||||
# One for getting data when button is pressed
|
||||
devices = await discover()
|
||||
for d in devices:
|
||||
print(d)
|
||||
print(d.name, d.address)
|
||||
|
||||
|
||||
|
||||
spells = pd.read_csv("spells.csv")["spells"]
|
||||
sensors = [NINE_AXIS, ACCELEROMETER, BUTTON]
|
||||
|
||||
kble = KanoBLEClient(address, loop)
|
||||
await kble.connect(True)
|
||||
await kble.start_recieving_data(sensors)
|
||||
@@ -98,8 +102,8 @@ async def main(loop):
|
||||
|
||||
await kble.stop_recieving_data(sensors)
|
||||
print("finished main")
|
||||
|
||||
df = None
|
||||
#
|
||||
# df = None
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,38 @@
|
||||
from enum import Enum
|
||||
|
||||
class INFO(Enum):
|
||||
"""Enum containing info UUIDs"""
|
||||
SERVICE = '64A70010-F691-4B93-A6F4-0968F5B648F8'
|
||||
ORGANIZATION_CHAR = '64A7000B-F691-4B93-A6F4-0968F5B648F8'
|
||||
SOFTWARE_CHAR = '64A70013-F691-4B93-A6F4-0968F5B648F8'
|
||||
HARDWARE_CHAR = '64A70001-F691-4B93-A6F4-0968F5B648F8'
|
||||
|
||||
class IO(Enum):
|
||||
"""Enum containing _IO UUIDs"""
|
||||
SERVICE = '64A70012-F691-4B93-A6F4-0968F5B648F8'
|
||||
BATTERY_CHAR = '64A70007-F691-4B93-A6F4-0968F5B648F8'
|
||||
USER_BUTTON_CHAR = '64A7000D-F691-4B93-A6F4-0968F5B648F8'
|
||||
VIBRATOR_CHAR = '64A70008-F691-4B93-A6F4-0968F5B648F8'
|
||||
LED_CHAR = '64A70009-F691-4B93-A6F4-0968F5B648F8'
|
||||
KEEP_ALIVE_CHAR = '64A7000F-F691-4B93-A6F4-0968F5B648F8'
|
||||
|
||||
class SENSOR(Enum):
|
||||
"""Enum containing sensor UUIDs"""
|
||||
SERVICE = '64A70011-F691-4B93-A6F4-0968F5B648F8'
|
||||
TEMP_CHAR = '64A70014-F691-4B93-A6F4-0968F5B648F8'
|
||||
QUATERNIONS_CHAR = '64A70002-F691-4B93-A6F4-0968F5B648F8'
|
||||
# RAW_CHAR = '64A7000A-F691-4B93-A6F4-0968F5B648F8'
|
||||
# MOTION_CHAR = '64A7000C-F691-4B93-A6F4-0968F5B648F8'
|
||||
MAGN_CALIBRATE_CHAR = '64A70021-F691-4B93-A6F4-0968F5B648F8'
|
||||
QUATERNIONS_RESET_CHAR = '64A70004-F691-4B93-A6F4-0968F5B648F8'
|
||||
|
||||
|
||||
class PATTERN(Enum):
|
||||
"""Enum for wand vibration patterns"""
|
||||
REGULAR = 1
|
||||
SHORT = 2
|
||||
BURST = 3
|
||||
LONG = 4
|
||||
SHORT_LONG = 5
|
||||
SHORT_SHORT = 6
|
||||
BIG_PAUSE = 7
|
||||
@@ -0,0 +1,97 @@
|
||||
from bleak import discover
|
||||
from wand import Wand
|
||||
|
||||
class Shop(object):
|
||||
"""A scanner class to connect to wands
|
||||
"""
|
||||
def __init__(self, wand_class=Wand, debug=False):
|
||||
"""Create a new scanner
|
||||
|
||||
Keyword Arguments:
|
||||
wand_class {class} -- Class to use when connecting to wand (default: {Wand})
|
||||
debug {bool} -- Print debug messages (default: {False})
|
||||
"""
|
||||
self.wand_class = wand_class
|
||||
self.debug = debug
|
||||
self._name = None
|
||||
self._prefix = None
|
||||
self._mac = None
|
||||
|
||||
async def scan(self, name=None, prefix="Kano-Wand", mac=None, timeout=2.0, connect=False):
|
||||
"""Scan for devices
|
||||
|
||||
Keyword Arguments:
|
||||
name {str} -- Name of the device to scan for (default: {None})
|
||||
prefix {str} -- Prefix of name of device to scan for (default: {"Kano-Wand"})
|
||||
mac {str} -- MAC Address of the device to scan for (default: {None})
|
||||
timeout {float} -- Timeout before returning from scan (default: {1.0})
|
||||
connect {bool} -- Connect to the wands automatically (default: {False})
|
||||
|
||||
Returns {Wand[]} -- Array of wand objects
|
||||
"""
|
||||
|
||||
if self.debug:
|
||||
print("Scanning for {} seconds...".format(timeout))
|
||||
try:
|
||||
name_check = not (name is None)
|
||||
prefix_check = not (prefix is None)
|
||||
mac_check = not (mac is None)
|
||||
assert name_check or prefix_check or mac_check
|
||||
except AssertionError as e:
|
||||
print("Either a name, prefix, or mac address must be provided to find a wand")
|
||||
raise e
|
||||
|
||||
if name is not None:
|
||||
self._name = name
|
||||
elif prefix is not None:
|
||||
self._prefix = prefix
|
||||
elif mac is not None:
|
||||
self._mac = mac
|
||||
|
||||
self.wands = []
|
||||
|
||||
devices = await discover(timeout= timeout)
|
||||
|
||||
#TODO: Add in wand filtering.
|
||||
|
||||
# if isNewDev:
|
||||
# # Perform initial detection attempt
|
||||
# mode = 0
|
||||
# found = 0
|
||||
# name = device.getValueText(9)
|
||||
# if self._name is not None:
|
||||
# mode += 1
|
||||
# if name == self._name:
|
||||
# found += 1
|
||||
# if self._prefix is not None:
|
||||
# mode += 1
|
||||
# if name is not None and name.startswith(self._prefix):
|
||||
# found += 1
|
||||
# if self._mac is not None:
|
||||
# mode += 1
|
||||
# if device.addr == self._mac:
|
||||
# found += 1
|
||||
#
|
||||
# if found >= mode:
|
||||
# self.wands.append(self.wand_class(device, debug=self.debug))
|
||||
# elif self.debug:
|
||||
# if name != "None":
|
||||
# print("Mac: {}\tCommon Name: {}".format(device.addr, name))
|
||||
# else:
|
||||
# print("Mac: {}".format(device.addr))
|
||||
print(devices)
|
||||
devices = list(filter(lambda x : x.name.startswith("kano"), devices))
|
||||
print(devices)
|
||||
self.wands = [Wand(d.address, d.name) for d in devices]
|
||||
# for d in devices:
|
||||
# #Convert to device making here.
|
||||
# print(d)
|
||||
# print(d.name, d.address)
|
||||
print(self.wands)
|
||||
|
||||
#TODO: Process device list here for the shop - > not all BLE devices are necessary
|
||||
# client = BleakClient(self.wand_address, loop=self.loop)
|
||||
if connect:
|
||||
for wand in self.wands:
|
||||
wand.connect()
|
||||
return self.wands
|
||||
@@ -0,0 +1,608 @@
|
||||
import threading
|
||||
import numpy as np
|
||||
|
||||
from constants import *
|
||||
from bleak import BleakClient
|
||||
from uuid import uuid4
|
||||
|
||||
|
||||
class Wand(object):
|
||||
"""A wand class to interact with the Kano wand
|
||||
"""
|
||||
|
||||
def __init__(self, device_addr, name, debug=True):
|
||||
"""Create a new wand
|
||||
|
||||
Arguments:
|
||||
device {bluepy.ScanEntry} -- Device information
|
||||
|
||||
Keyword Arguments:
|
||||
debug {bool} -- Print debug messages (default: {False})
|
||||
"""
|
||||
super().__init__(None)
|
||||
self.debug = debug
|
||||
self._dev = BleakClient(device_addr)
|
||||
self.name = name
|
||||
|
||||
if debug:
|
||||
print("Wand: {}\n\rWand Mac: {}".format(self.name, self._dev.address))
|
||||
|
||||
# Notification stuff
|
||||
self.connected = False
|
||||
self._position_callbacks = {}
|
||||
self._position_subscribed = False
|
||||
self._button_callbacks = {}
|
||||
self._button_subscribed = False
|
||||
self._temperature_callbacks = {}
|
||||
self._temperature_subscribed = False
|
||||
self._battery_callbacks = {}
|
||||
self._battery_subscribed = False
|
||||
self._notification_thread = None
|
||||
self._position_notification_handle = 41
|
||||
self._button_notification_handle = 33
|
||||
self._temp_notification_handle = 56
|
||||
self._battery_notification_handle = 23
|
||||
|
||||
async def connect(self):
|
||||
if self.debug:
|
||||
print("Connecting to {}...".format(self.name))
|
||||
|
||||
connected = await self._dev.connect()
|
||||
if not connected:
|
||||
raise Exception("ERROR NOT CONNECTED TO THE DEVICE")
|
||||
|
||||
self.connected = True
|
||||
self.post_connect()
|
||||
|
||||
if self.debug:
|
||||
print("Connected to {}".format(self.name))
|
||||
|
||||
def post_connect(self):
|
||||
"""Do anything necessary after connecting
|
||||
"""
|
||||
pass
|
||||
|
||||
def disconnect(self):
|
||||
|
||||
self._dev.disconnect()
|
||||
self.connected = False
|
||||
self.post_disconnect()
|
||||
|
||||
if self.debug:
|
||||
print("Disconnected from {}".format(self.name))
|
||||
|
||||
def post_disconnect(self):
|
||||
"""Do anything necessary after disconnecting
|
||||
"""
|
||||
pass
|
||||
|
||||
# def get_organization(self):
|
||||
# """Get organization of device
|
||||
#
|
||||
# Returns {str} -- Organization name
|
||||
# """
|
||||
# # with self._lock:
|
||||
# if not hasattr(self, "_organization_handle"):
|
||||
# handle = self._info_service.getCharacteristics(INFO.ORGANIZATION_CHAR.value)[0]
|
||||
# self._organization_handle = handle.getHandle()
|
||||
# return self.readCharacteristic(self._organization_handle).decode("utf-8")
|
||||
|
||||
# def get_software_version(self):
|
||||
#
|
||||
# """Get software version
|
||||
#
|
||||
# Returns {str} -- Version number
|
||||
# """
|
||||
# #with self._lock:
|
||||
# if not hasattr(self, "_software_handle"):
|
||||
# handle = self._info_service.getCharacteristics(INFO.SOFTWARE_CHAR.value)[0]
|
||||
# self._software_handle = handle.getHandle()
|
||||
# return self.readCharacteristic(self._software_handle).decode("utf-8")
|
||||
|
||||
# def get_hardware_version(self):
|
||||
#
|
||||
# """Get hardware version
|
||||
#
|
||||
# Returns {str} -- Hardware version
|
||||
# """
|
||||
# #with self._lock:
|
||||
# if not hasattr(self, "_hardware_handle"):
|
||||
# handle = self._info_service.getCharacteristics(INFO.HARDWARE_CHAR.value)[0]
|
||||
# self._hardware_handle = handle.getHandle()
|
||||
# return self.readCharacteristic(self._hardware_handle).decode("utf-8")
|
||||
#
|
||||
#
|
||||
# def get_battery(self):
|
||||
# """Get battery level (currently only returns 0)
|
||||
#
|
||||
# Returns {str} -- Battery level
|
||||
# """
|
||||
# # with self._lock:
|
||||
# if not hasattr(self, "_battery_handle"):
|
||||
# handle = self._io_service.getCharacteristics(IO.BATTERY_CHAR.value)[0]
|
||||
# self._battery_handle = handle.getHandle()
|
||||
# return self.readCharacteristic(self._battery_handle).decode("utf-8")
|
||||
|
||||
|
||||
async def get_button(self):
|
||||
"""Get current button status
|
||||
|
||||
Returns {bool} -- Button pressed status
|
||||
"""
|
||||
# with self._lock:
|
||||
# if not hasattr(self, "_button_handle"):
|
||||
# handle = self._io_service.getCharacteristics(IO.USER_BUTTON_CHAR.value)[0]
|
||||
# self._button_handle = handle.getHandle()
|
||||
# data = self.readCharacteristic(self._button_handle)
|
||||
data = await self._dev.read_gatt_char(IO.USER_BUTTON_CHAR.value)
|
||||
return data[0] == 1
|
||||
|
||||
|
||||
async def get_temperature(self):
|
||||
"""Get temperature
|
||||
|
||||
Returns {str} -- Battery level
|
||||
"""
|
||||
# with self._lock:
|
||||
# if not hasattr(self, "_temperature_handle"):
|
||||
# handle = self._sensor_service.getCharacteristics(SENSOR.TEMP_CHAR.value)[0]
|
||||
# self._temperature_handle = handle.getHandle()
|
||||
return await self._dev.read_gatt_char(SENSOR.TEMP_CHAR.value)
|
||||
|
||||
|
||||
async def keep_alive(self):
|
||||
"""Keep the wand's connection active
|
||||
|
||||
Returns {bytes} -- Status
|
||||
"""
|
||||
# Is not documented because it doesn't seem to work?
|
||||
if self.debug:
|
||||
print("Keeping wand alive.")
|
||||
|
||||
# with self._lock:
|
||||
# if not hasattr(self, "_alive_handle"):
|
||||
# handle = self._io_service.getCharacteristics(IO.KEEP_ALIVE_CHAR.value)[0]
|
||||
# self._alive_handle = handle.getHandle()
|
||||
return await self._dev.write_gatt_char(IO.KEEP_ALIVE_CHAR.value, bytes([1]), response=True)
|
||||
# writeCharacteristic(self._alive_handle, bytes([1]), withResponse=True)
|
||||
|
||||
|
||||
async def vibrate(self, pattern=PATTERN.REGULAR):
|
||||
"""Vibrate wand with pattern
|
||||
|
||||
Keyword Arguments:
|
||||
pattern {kano_wand.PATTERN} -- Vibration pattern (default: {PATTERN.REGULAR})
|
||||
|
||||
Returns {bytes} -- Status
|
||||
"""
|
||||
# with self._lock:
|
||||
if isinstance(pattern, PATTERN):
|
||||
message = [pattern.value]
|
||||
else:
|
||||
message = [pattern]
|
||||
|
||||
if self.debug:
|
||||
print("Setting VibrationLed to {}".format(message))
|
||||
#
|
||||
# if not hasattr(self, "_vibrator_handle"):
|
||||
# handle = self._io_service.getCharacteristics(IO.VIBRATOR_CHAR.value)[0]
|
||||
# self._vibrator_handle = handle.getHandle()
|
||||
# self.writeCharacteristic(self._vibrator_handle, bytes(message), withResponse=True)
|
||||
|
||||
return await self._dev.write_gatt_char(IO.VIBRATOR_CHAR.value, bytes(message), response=True)
|
||||
|
||||
|
||||
async def set_led(self, color="0x2185d0", on=True):
|
||||
"""Set the LED's color
|
||||
|
||||
Keyword Arguments:
|
||||
color {str} -- Color hex code (default: {"0x2185d0"})
|
||||
on {bool} -- Whether light is on or off (default: {True})
|
||||
|
||||
Returns {bytes} -- Status
|
||||
"""
|
||||
message = []
|
||||
if on:
|
||||
message.append(1)
|
||||
else:
|
||||
message.append(0)
|
||||
|
||||
# I got this from Kano's node module
|
||||
color = int(color.replace("#", ""), 16)
|
||||
r = (color >> 16) & 255
|
||||
g = (color >> 8) & 255
|
||||
b = color & 255
|
||||
rgb = (((r & 248) << 8) + ((g & 252) << 3) + ((b & 248) >> 3))
|
||||
message.append(rgb >> 8)
|
||||
message.append(rgb & 0xff)
|
||||
|
||||
if self.debug:
|
||||
print("Setting LED to {}".format(message))
|
||||
|
||||
# with self._lock:
|
||||
# if not hasattr(self, "_led_handle"):
|
||||
# handle = self._io_service.getCharacteristics(IO.LED_CHAR.value)[0]
|
||||
# self._led_handle = handle.getHandle()
|
||||
# self.writeCharacteristic(self._led_handle, bytes(message), withResponse=True)
|
||||
return await self._dev.write_gatt_char(IO.LED_CHAR.value, bytes(message), response=True)
|
||||
|
||||
|
||||
# SENSORS
|
||||
# def on(self, event, callback):
|
||||
# """Add an event listener
|
||||
#
|
||||
# Arguments:
|
||||
# event {str} -- Event type, "position", "button", "temp", or "battery"
|
||||
# callback {function} -- Callback function
|
||||
#
|
||||
# Returns {str} -- ID of the callback for removal later
|
||||
# """
|
||||
# if self.debug:
|
||||
# print("Adding callback for {} notification...".format(event))
|
||||
#
|
||||
# id = None
|
||||
# if event == "position":
|
||||
# id = uuid4()
|
||||
# self._position_callbacks[id] = callback
|
||||
# self.subscribe_position()
|
||||
# elif event == "button":
|
||||
# id = uuid4()
|
||||
# self._button_callbacks[id] = callback
|
||||
# self.subscribe_button()
|
||||
# elif event == "temp":
|
||||
# id = uuid4()
|
||||
# self._temperature_callbacks[id] = callback
|
||||
# self.subscribe_temperature()
|
||||
# elif event == "battery":
|
||||
# id = uuid4()
|
||||
# self._battery_callbacks[id] = callback
|
||||
# self.subscribe_battery()
|
||||
#
|
||||
# return id
|
||||
#
|
||||
#
|
||||
# def off(self, uuid, continue_notifications=False):
|
||||
# """Remove a callback
|
||||
#
|
||||
# Arguments:
|
||||
# uuid {str} -- Remove a callback with its id
|
||||
#
|
||||
# Keyword Arguments:
|
||||
# continue_notifications {bool} -- Keep notification thread running (default: {False})
|
||||
#
|
||||
# Returns {bool} -- If removal was successful or not
|
||||
# """
|
||||
# removed = False
|
||||
# if self._position_callbacks.get(uuid) != None:
|
||||
# removed = True
|
||||
# self._position_callbacks.pop(uuid)
|
||||
# if len(self._position_callbacks.values()) == 0:
|
||||
# self.unsubscribe_position(continue_notifications=continue_notifications)
|
||||
# elif self._button_callbacks.get(uuid) != None:
|
||||
# removed = True
|
||||
# self._button_callbacks.pop(uuid)
|
||||
# if len(self._button_callbacks.values()) == 0:
|
||||
# self.unsubscribe_button(continue_notifications=continue_notifications)
|
||||
# elif self._temperature_callbacks.get(uuid) != None:
|
||||
# removed = True
|
||||
# self._temperature_callbacks.pop(uuid)
|
||||
# if len(self._temperature_callbacks.values()) == 0:
|
||||
# self.unsubscribe_temperature(continue_notifications=continue_notifications)
|
||||
# elif self._battery_callbacks.get(uuid) != None:
|
||||
# removed = True
|
||||
# self._battery_callbacks.pop(uuid)
|
||||
# if len(self._battery_callbacks.values()) == 0:
|
||||
# self.unsubscribe_battery(continue_notifications=continue_notifications)
|
||||
#
|
||||
# if self.debug:
|
||||
# if removed:
|
||||
# print("Removed callback {}".format(uuid))
|
||||
# else:
|
||||
# print("Could not remove callback {}".format(uuid))
|
||||
#
|
||||
# return removed
|
||||
|
||||
|
||||
# def subscribe_position(self):
|
||||
# """Subscribe to position notifications and start thread if necessary
|
||||
# """
|
||||
# if self.debug:
|
||||
# print("Subscribing to position notification")
|
||||
#
|
||||
# self._position_subscribed = True
|
||||
# # with self._lock:
|
||||
# if not hasattr(self, "_position_handle"):
|
||||
# handle = self._sensor_service.getCharacteristics(SENSOR.QUATERNIONS_CHAR.value)[0]
|
||||
# self._position_handle = handle.getHandle()
|
||||
#
|
||||
# self.writeCharacteristic(self._position_handle + 1, bytes([1, 0]))
|
||||
#
|
||||
#
|
||||
# self._start_notification_thread()
|
||||
#
|
||||
#
|
||||
# def unsubscribe_position(self, continue_notifications=False):
|
||||
# """Unsubscribe to position notifications
|
||||
#
|
||||
# Keyword Arguments:
|
||||
# continue_notifications {bool} -- Keep notification thread running (default: {False})
|
||||
# """
|
||||
# if self.debug:
|
||||
# print("Unsubscribing from position notification")
|
||||
#
|
||||
# self._position_subscribed = continue_notifications
|
||||
# # with self._lock:
|
||||
# if not hasattr(self, "_position_handle"):
|
||||
# handle = self._sensor_service.getCharacteristics(SENSOR.QUATERNIONS_CHAR.value)[0]
|
||||
# self._position_handle = handle.getHandle()
|
||||
#
|
||||
# self.writeCharacteristic(self._position_handle + 1, bytes([0, 0]))
|
||||
|
||||
|
||||
async def subscribe_button(self):
|
||||
"""Subscribe to button notifications and start thread if necessary
|
||||
"""
|
||||
if self.debug:
|
||||
print("Subscribing to button notification")
|
||||
|
||||
self._button_subscribed = True
|
||||
await self._dev.start_notify(IO.USER_BUTTON_CHAR.value, self._on_button)
|
||||
# with self._lock:
|
||||
# if not hasattr(self, "_button_handle"):
|
||||
# handle = self._io_service.getCharacteristics(IO.USER_BUTTON_CHAR.value)[0]
|
||||
# self._button_handle = handle.getHandle()
|
||||
|
||||
# self.writeCharacteristic(self._button_handle + 1, bytes([1, 0]))
|
||||
# self._start_notification_thread()
|
||||
|
||||
|
||||
|
||||
async def unsubscribe_button(self, continue_notifications=False):
|
||||
"""Unsubscribe to button notifications
|
||||
|
||||
Keyword Arguments:
|
||||
continue_notifications {bool} -- Keep notification thread running (default: {False})
|
||||
"""
|
||||
if self.debug:
|
||||
print("Unsubscribing from button notification")
|
||||
|
||||
self._button_subscribed = continue_notifications
|
||||
# with self._lock:
|
||||
# if not hasattr(self, "_button_handle"):
|
||||
# handle = self._io_service.getCharacteristics(IO.USER_BUTTON_CHAR.value)[0]
|
||||
# self._button_handle = handle.getHandle()
|
||||
#
|
||||
# self.writeCharacteristic(self._button_handle + 1, bytes([0, 0]))
|
||||
await self._dev.stop_notify(IO.USER_BUTTON_CHAR.value)
|
||||
|
||||
|
||||
# def subscribe_temperature(self):
|
||||
# """Subscribe to temperature notifications and start thread if necessary
|
||||
# """
|
||||
# if self.debug:
|
||||
# print("Subscribing to temperature notification")
|
||||
#
|
||||
# self._temperature_subscribed = True
|
||||
# # with self._lock:
|
||||
# if not hasattr(self, "_temp_handle"):
|
||||
# handle = self._sensor_service.getCharacteristics(SENSOR.TEMP_CHAR.value)[0]
|
||||
# self._temp_handle = handle.getHandle()
|
||||
#
|
||||
# self.writeCharacteristic(self._temp_handle + 1, bytes([1, 0]))
|
||||
#
|
||||
# self._start_notification_thread()
|
||||
#
|
||||
#
|
||||
# def unsubscribe_temperature(self, continue_notifications=False):
|
||||
# """Unsubscribe to temperature notifications
|
||||
#
|
||||
# Keyword Arguments:
|
||||
# continue_notifications {bool} -- Keep notification thread running (default: {False})
|
||||
# """
|
||||
# if self.debug:
|
||||
# print("Unsubscribing from temperature notification")
|
||||
#
|
||||
# self._temperature_subscribed = continue_notifications
|
||||
# # with self._lock:
|
||||
# if not hasattr(self, "_temp_handle"):
|
||||
# handle = self._sensor_service.getCharacteristics(SENSOR.TEMP_CHAR.value)[0]
|
||||
# self._temp_handle = handle.getHandle()
|
||||
#
|
||||
# self.writeCharacteristic(self._temp_handle + 1, bytes([0, 0]))
|
||||
#
|
||||
#
|
||||
# def subscribe_battery(self):
|
||||
# """Subscribe to battery notifications and start thread if necessary
|
||||
# """
|
||||
# if self.debug:
|
||||
# print("Subscribing to battery notification")
|
||||
#
|
||||
# self._battery_subscribed = True
|
||||
# # with self._lock:
|
||||
# if not hasattr(self, "_battery_handle"):
|
||||
# handle = self._io_service.getCharacteristics(IO.BATTERY_CHAR.value)[0]
|
||||
# self._battery_handle = handle.getHandle()
|
||||
#
|
||||
# self.writeCharacteristic(self._battery_handle + 1, bytes([1, 0]))
|
||||
# self._start_notification_thread()
|
||||
#
|
||||
#
|
||||
# def unsubscribe_battery(self, continue_notifications=False):
|
||||
# """Unsubscribe to battery notifications
|
||||
#
|
||||
# Keyword Arguments:
|
||||
# continue_notifications {bool} -- Keep notification thread running (default: {False})
|
||||
# """
|
||||
# if self.debug:
|
||||
# print("Unsubscribing from battery notification")
|
||||
#
|
||||
# self._battery_subscribed = continue_notifications
|
||||
# # with self._lock:
|
||||
# if not hasattr(self, "_battery_handle"):
|
||||
# handle = self._io_service.getCharacteristics(IO.BATTERY_CHAR.value)[0]
|
||||
# self._battery_handle = handle.getHandle()
|
||||
#
|
||||
# self.writeCharacteristic(self._battery_handle + 1, bytes([0, 0]))
|
||||
#
|
||||
#
|
||||
# def _start_notification_thread(self):
|
||||
# try:
|
||||
# if self._notification_thread == None:
|
||||
# self.reset_position()
|
||||
# self._notification_thread = threading.Thread(target=self._notification_wait)
|
||||
# self._notification_thread.start()
|
||||
# except:
|
||||
# pass
|
||||
|
||||
|
||||
# def _notification_wait(self):
|
||||
# if self.debug:
|
||||
# print("Notification thread started")
|
||||
#
|
||||
# while (self.connected and
|
||||
# (self._position_subscribed or
|
||||
# self._button_subscribed or
|
||||
# self._temperature_subscribed or
|
||||
# self._battery_subscribed)):
|
||||
# try:
|
||||
# if super().waitForNotifications(1):
|
||||
# continue
|
||||
# except:
|
||||
# continue
|
||||
#
|
||||
# if self.debug:
|
||||
# print("Notification thread stopped")
|
||||
|
||||
|
||||
def _on_position(self, data):
|
||||
"""Private function for position notification
|
||||
|
||||
Arguments:
|
||||
data {bytes} -- Data from device
|
||||
"""
|
||||
# I got part of this from Kano's node module and modified it
|
||||
y = np.int16(np.uint16(int.from_bytes(data[0:2], byteorder='little')))
|
||||
x = -1 * np.int16(np.uint16(int.from_bytes(data[2:4], byteorder='little')))
|
||||
w = -1 * np.int16(np.uint16(int.from_bytes(data[4:6], byteorder='little')))
|
||||
z = np.int16(np.uint16(int.from_bytes(data[6:8], byteorder='little')))
|
||||
|
||||
if self.debug:
|
||||
pitch = "Pitch: {}".format(z).ljust(16)
|
||||
roll = "Roll: {}".format(w).ljust(16)
|
||||
print("{}{}(x, y): ({}, {})".format(pitch, roll, x, y))
|
||||
|
||||
self.on_position(x, y, z, w)
|
||||
for callback in self._position_callbacks.values():
|
||||
callback(x, y, z, w)
|
||||
|
||||
|
||||
def on_position(self, roll, x, y, z):
|
||||
"""Function called on position notification
|
||||
|
||||
Arguments:
|
||||
x {int} -- X position of wand (Between -1000 and 1000)
|
||||
y {int} -- Y position of wand (Between -1000 and 1000)
|
||||
pitch {int} -- Pitch of wand (Between -1000 and 1000)
|
||||
roll {int} -- Roll of wand (Between -1000 and 1000)
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
# def reset_position(self):
|
||||
# """Reset the quaternains of the wand
|
||||
# """
|
||||
# handle = self._sensor_service.getCharacteristics(SENSOR.QUATERNIONS_RESET_CHAR.value)[0].getHandle()
|
||||
# # with self._lock:
|
||||
# self.writeCharacteristic(handle, bytes([1]))
|
||||
|
||||
|
||||
def _on_button(self, data):
|
||||
"""Private function for button notification
|
||||
|
||||
Arguments:
|
||||
data {bytes} -- Data from device
|
||||
"""
|
||||
val = data[0] == 1
|
||||
|
||||
if self.debug:
|
||||
print("Button: {}".format(val))
|
||||
|
||||
self.on_button(val)
|
||||
for callback in self._button_callbacks.values():
|
||||
callback(val)
|
||||
|
||||
|
||||
def on_button(self, value):
|
||||
"""Function called on button notification
|
||||
|
||||
Arguments:
|
||||
pressed {bool} -- If button is pressed
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
def _on_temperature(self, data):
|
||||
"""Private function for temperature notification
|
||||
|
||||
Arguments:
|
||||
data {bytes} -- Data from device
|
||||
"""
|
||||
val = np.int16(np.uint16(int.from_bytes(data[0:2], byteorder='little')))
|
||||
|
||||
if self.debug:
|
||||
print("Temperature: {}".format(val))
|
||||
|
||||
self.on_temperature(val)
|
||||
for callback in self._temperature_callbacks.values():
|
||||
callback(val)
|
||||
|
||||
|
||||
def on_temperature(self, value):
|
||||
"""Function called on temperature notification
|
||||
|
||||
Arguments:
|
||||
value {int} -- Temperature of the wand
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
def _on_battery(self, data):
|
||||
"""Private function for battery notification
|
||||
|
||||
Arguments:
|
||||
data {bytes} -- Data from device
|
||||
"""
|
||||
val = data[0]
|
||||
|
||||
if self.debug:
|
||||
print("Battery: {}".format(val))
|
||||
|
||||
self.on_battery(val)
|
||||
for callback in self._battery_callbacks.values():
|
||||
callback(val)
|
||||
|
||||
|
||||
def on_battery(self, value):
|
||||
"""Function called on battery notification
|
||||
|
||||
Arguments:
|
||||
value {int} -- Battery level of the wand
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
def handleNotification(self, cHandle, data):
|
||||
"""Handle notifications subscribed to
|
||||
|
||||
Arguments:
|
||||
cHandle {int} -- Handle of notification
|
||||
data {bytes} -- Data from device
|
||||
"""
|
||||
if cHandle == SENSOR.QUATERNIONS_CHAR.value:
|
||||
self._on_position(data)
|
||||
elif cHandle == IO.USER_BUTTON_CHAR.value:
|
||||
self._on_button(data)
|
||||
elif cHandle == SENSOR.TEMP_CHAR.value:
|
||||
self._on_temperature(data)
|
||||
elif cHandle == IO.BATTERY_CHAR.value:
|
||||
self._on_battery(data)
|
||||
@@ -0,0 +1,65 @@
|
||||
from shop import Shop
|
||||
from wand import Wand
|
||||
from constants import *
|
||||
import asyncio
|
||||
import sys
|
||||
|
||||
# Custom wand class extending the default wand
|
||||
class MyWand(Wand):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.colors = ["#a333c8", "2185d0", "0x21ba45", "#fbbd08", "#f2711c", "#db2828"]
|
||||
self.position_id = None
|
||||
|
||||
# Do some functions after connecting
|
||||
def post_connect(self):
|
||||
print("Connected to {}".format(self.name))
|
||||
# # Vibrate the wand and set its color to red
|
||||
# self.set_led(self.colors.pop())
|
||||
# # Subscribe to notifications
|
||||
# self.subscribe_button()
|
||||
|
||||
# Button callback, automatically called after connecting to wand
|
||||
def on_button(self, pressed):
|
||||
if pressed:
|
||||
# Unsubscribe from the position callback
|
||||
# NOTE You could pass `continue_notifications=True`
|
||||
# to continue using the wand's `on_position` function
|
||||
|
||||
self.set_led(self.colors.pop())
|
||||
# Disconnect if we run out of colors
|
||||
if len(self.colors) == 0:
|
||||
self.disconnect()
|
||||
|
||||
async def main():
|
||||
|
||||
# Create a new wand scanner
|
||||
shop = Shop(wand_class=MyWand, debug=True)
|
||||
wands = []
|
||||
try:
|
||||
# While we don't have any wands
|
||||
while len(wands) == 0:
|
||||
# Scan for wands and automatically connect
|
||||
print("Scanning...")
|
||||
wands = await shop.scan(connect=True)
|
||||
# For each wand (Only tested with one)
|
||||
for wand in wands:
|
||||
# Vibrate the wand and set its color to red
|
||||
wand.vibrate(PATTERN.BURST)
|
||||
|
||||
# Callback for position
|
||||
# def onPos(x, y, pitch, roll):
|
||||
# pitch = "Pitch: {}".format(pitch).ljust(16)
|
||||
# roll = "Roll: {}".format(roll).ljust(16)
|
||||
# print("{}{}(x, y): ({}, {})".format(pitch, roll, x, y))
|
||||
|
||||
# Add the event callback to the wand
|
||||
# wand.position_id = wand.on("position", onPos)
|
||||
|
||||
# Detect keyboard interrupt disconnect
|
||||
except KeyboardInterrupt as e:
|
||||
for wand in wands:
|
||||
wand.disconnect()
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -1,3 +1,5 @@
|
||||
from enum import Enum
|
||||
|
||||
BUTTON = 1
|
||||
NINE_AXIS = 2
|
||||
ACCELEROMETER = 3
|
||||
@@ -15,3 +17,5 @@ CHARACTERISTIC_UUIDS = {
|
||||
TEMPERATURE:("64a70014-f691-4b93-a6f4-0968f5b648f8"),
|
||||
MAGNETOMETER:("64a70021-f691-4b93-a6f4-0968f5b648f8")
|
||||
} # <--- Change to the characteristic you want to enable notifications from.
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user