diff --git a/KanoWandGamma/kano_wand.py b/KanoWandGamma/kano_wand.py new file mode 100644 index 0000000..0bc21f6 --- /dev/null +++ b/KanoWandGamma/kano_wand.py @@ -0,0 +1,707 @@ +# name='kano_wand' +# description='Python module for interfacing with the Kano wand' +# author='Jesse Lieberg (@GammaGames)' +# url='https://github.com/GammaGames/kano_wand' + +from enum import Enum +import inspect +import numpy +import threading +import uuid + +from time import sleep + +class _INFO(Enum): + """Enum containing info UUIDs""" + SERVICE = '64A70010-F691-4B93-A6F4-0968F5B648F8' + ORGANIZATION_CHAR = '64A7000B-F691-4B93-A6F4-0968F5B648F8' + SOFTWARE_CHAR = '64A70013-F691-4B93-A6F4-0968F5B648F8' + HARDWARE_CHAR = '64A70001-F691-4B93-A6F4-0968F5B648F8' + +class _IO(Enum): + """Enum containing _IO UUIDs""" + SERVICE = '64A70012-F691-4B93-A6F4-0968F5B648F8' + BATTERY_CHAR = '64A70007-F691-4B93-A6F4-0968F5B648F8' + USER_BUTTON_CHAR = '64A7000D-F691-4B93-A6F4-0968F5B648F8' + VIBRATOR_CHAR = '64A70008-F691-4B93-A6F4-0968F5B648F8' + LED_CHAR = '64A70009-F691-4B93-A6F4-0968F5B648F8' + KEEP_ALIVE_CHAR = '64A7000F-F691-4B93-A6F4-0968F5B648F8' + +class _SENSOR(Enum): + """Enum containing sensor UUIDs""" + SERVICE = '64A70011-F691-4B93-A6F4-0968F5B648F8' + TEMP_CHAR = '64A70014-F691-4B93-A6F4-0968F5B648F8' + QUATERNIONS_CHAR = '64A70002-F691-4B93-A6F4-0968F5B648F8' + # RAW_CHAR = '64A7000A-F691-4B93-A6F4-0968F5B648F8' + # MOTION_CHAR = '64A7000C-F691-4B93-A6F4-0968F5B648F8' + MAGN_CALIBRATE_CHAR = '64A70021-F691-4B93-A6F4-0968F5B648F8' + QUATERNIONS_RESET_CHAR = '64A70004-F691-4B93-A6F4-0968F5B648F8' + +class PATTERN(Enum): + """Enum for wand vibration patterns""" + REGULAR = 1 + SHORT = 2 + BURST = 3 + LONG = 4 + SHORT_LONG = 5 + SHORT_SHORT = 6 + BIG_PAUSE = 7 + +class Wand(Peripheral, DefaultDelegate): + """A wand class to interact with the Kano wand + """ + + def __init__(self, device, debug=False): + """Create a new wand + + Arguments: + device {bluepy.ScanEntry} -- Device information + + Keyword Arguments: + debug {bool} -- Print debug messages (default: {False}) + """ + super().__init__(None) + # Meta stuff + self.debug = debug + self._dev = device + self.name = device.getValueText(9) + + if debug: + print("Wand: {}\n\rWand Mac: {}".format(self.name, device.addr)) + + # Notification stuff + self.connected = False + self._position_callbacks = {} + self._position_subscribed = False + self._button_callbacks = {} + self._button_subscribed = False + self._temperature_callbacks = {} + self._temperature_subscribed = False + self._battery_callbacks = {} + self._battery_subscribed = False + self._notification_thread = None + self._position_notification_handle = 41 + self._button_notification_handle = 33 + self._temp_notification_handle = 56 + self._battery_notification_handle = 23 + + def connect(self): + if self.debug: + print("Connecting to {}...".format(self.name)) + + super(Wand, self).connect(self._dev) + self._lock = threading.Lock() + self.connected = True + self.setDelegate(self) + self._info_service = self.getServiceByUUID(_INFO.SERVICE.value) + self._io_service = self.getServiceByUUID(_IO.SERVICE.value) + self._sensor_service = self.getServiceByUUID(_SENSOR.SERVICE.value) + + self.post_connect() + + if self.debug: + print("Connected to {}".format(self.name)) + + def post_connect(self): + """Do anything necessary after connecting + """ + pass + + def disconnect(self): + super().disconnect() + self.connected = False + self._position_subscribed = False + self._button_subscribed = False + self._temperature_subscribed = False + self._battery_subscribed = False + + self.post_disconnect() + + if self.debug: + print("Disconnected from {}".format(self.name)) + + def post_disconnect(self): + """Do anything necessary after disconnecting + """ + pass + + def get_organization(self): + """Get organization of device + + Returns {str} -- Organization name + """ + with self._lock: + if not hasattr(self, "_organization_handle"): + handle = self._info_service.getCharacteristics(_INFO.ORGANIZATION_CHAR.value)[0] + self._organization_handle = handle.getHandle() + return self.readCharacteristic(self._organization_handle).decode("utf-8") + + def get_software_version(self): + """Get software version + + Returns {str} -- Version number + """ + with self._lock: + if not hasattr(self, "_software_handle"): + handle = self._info_service.getCharacteristics(_INFO.SOFTWARE_CHAR.value)[0] + self._software_handle = handle.getHandle() + return self.readCharacteristic(self._software_handle).decode("utf-8") + + def get_hardware_version(self): + """Get hardware version + + Returns {str} -- Hardware version + """ + with self._lock: + if not hasattr(self, "_hardware_handle"): + handle = self._info_service.getCharacteristics(_INFO.HARDWARE_CHAR.value)[0] + self._hardware_handle = handle.getHandle() + return self.readCharacteristic(self._hardware_handle).decode("utf-8") + + def get_battery(self): + """Get battery level (currently only returns 0) + + Returns {str} -- Battery level + """ + with self._lock: + if not hasattr(self, "_battery_handle"): + handle = self._io_service.getCharacteristics(_IO.BATTERY_CHAR.value)[0] + self._battery_handle = handle.getHandle() + return self.readCharacteristic(self._battery_handle).decode("utf-8") + + def get_button(self): + """Get current button status + + Returns {bool} -- Button pressed status + """ + with self._lock: + if not hasattr(self, "_button_handle"): + handle = self._io_service.getCharacteristics(_IO.USER_BUTTON_CHAR.value)[0] + self._button_handle = handle.getHandle() + + data = self.readCharacteristic(self._button_handle) + return data[0] == 1 + + def get_temperature(self): + """Get temperature + + Returns {str} -- Battery level + """ + with self._lock: + if not hasattr(self, "_temperature_handle"): + handle = self._sensor_service.getCharacteristics(_SENSOR.TEMP_CHAR.value)[0] + self._temperature_handle = handle.getHandle() + return self.readCharacteristic(self._temperature_handle).decode("utf-8") + + def keep_alive(self): + """Keep the wand's connection active + + Returns {bytes} -- Status + """ + # Is not documented because it doesn't seem to work? + if self.debug: + print("Keeping wand alive.") + + with self._lock: + if not hasattr(self, "_alive_handle"): + handle = self._io_service.getCharacteristics(_IO.KEEP_ALIVE_CHAR.value)[0] + self._alive_handle = handle.getHandle() + return self.writeCharacteristic(self._alive_handle, bytes([1]), withResponse=True) + + def vibrate(self, pattern=PATTERN.REGULAR): + """Vibrate wand with pattern + + Keyword Arguments: + pattern {kano_wand.PATTERN} -- Vibration pattern (default: {PATTERN.REGULAR}) + + Returns {bytes} -- Status + """ + with self._lock: + if isinstance(pattern, PATTERN): + message = [pattern.value] + else: + message = [pattern] + + if self.debug: + print("Setting LED to {}".format(message)) + + if not hasattr(self, "_vibrator_handle"): + handle = self._io_service.getCharacteristics(_IO.VIBRATOR_CHAR.value)[0] + self._vibrator_handle = handle.getHandle() + return self.writeCharacteristic(self._vibrator_handle, bytes(message), withResponse=True) + + def set_led(self, color="0x2185d0", on=True): + """Set the LED's color + + Keyword Arguments: + color {str} -- Color hex code (default: {"0x2185d0"}) + on {bool} -- Whether light is on or off (default: {True}) + + Returns {bytes} -- Status + """ + message = [] + if on: + message.append(1) + else: + message.append(0) + + # I got this from Kano's node module + color = int(color.replace("#", ""), 16) + r = (color >> 16) & 255 + g = (color >> 8) & 255 + b = color & 255 + rgb = (((r & 248) << 8) + ((g & 252) << 3) + ((b & 248) >> 3)) + message.append(rgb >> 8) + message.append(rgb & 0xff) + + if self.debug: + print("Setting LED to {}".format(message)) + + with self._lock: + if not hasattr(self, "_led_handle"): + handle = self._io_service.getCharacteristics(_IO.LED_CHAR.value)[0] + self._led_handle = handle.getHandle() + return self.writeCharacteristic(self._led_handle, bytes(message), withResponse=True) + + # SENSORS + def on(self, event, callback): + """Add an event listener + + Arguments: + event {str} -- Event type, "position", "button", "temp", or "battery" + callback {function} -- Callback function + + Returns {str} -- ID of the callback for removal later + """ + if self.debug: + print("Adding callback for {} notification...".format(event)) + + id = None + if event == "position": + id = uuid.uuid4() + self._position_callbacks[id] = callback + self.subscribe_position() + elif event == "button": + id = uuid.uuid4() + self._button_callbacks[id] = callback + self.subscribe_button() + elif event == "temp": + id = uuid.uuid4() + self._temperature_callbacks[id] = callback + self.subscribe_temperature() + elif event == "battery": + id = uuid.uuid4() + self._battery_callbacks[id] = callback + self.subscribe_battery() + + return id + + def off(self, uuid, continue_notifications=False): + """Remove a callback + + Arguments: + uuid {str} -- Remove a callback with its id + + Keyword Arguments: + continue_notifications {bool} -- Keep notification thread running (default: {False}) + + Returns {bool} -- If removal was successful or not + """ + removed = False + if self._position_callbacks.get(uuid) != None: + removed = True + self._position_callbacks.pop(uuid) + if len(self._position_callbacks.values()) == 0: + self.unsubscribe_position(continue_notifications=continue_notifications) + elif self._button_callbacks.get(uuid) != None: + removed = True + self._button_callbacks.pop(uuid) + if len(self._button_callbacks.values()) == 0: + self.unsubscribe_button(continue_notifications=continue_notifications) + elif self._temperature_callbacks.get(uuid) != None: + removed = True + self._temperature_callbacks.pop(uuid) + if len(self._temperature_callbacks.values()) == 0: + self.unsubscribe_temperature(continue_notifications=continue_notifications) + elif self._battery_callbacks.get(uuid) != None: + removed = True + self._battery_callbacks.pop(uuid) + if len(self._battery_callbacks.values()) == 0: + self.unsubscribe_battery(continue_notifications=continue_notifications) + + if self.debug: + if removed: + print("Removed callback {}".format(uuid)) + else: + print("Could not remove callback {}".format(uuid)) + + return removed + + def subscribe_position(self): + """Subscribe to position notifications and start thread if necessary + """ + if self.debug: + print("Subscribing to position notification") + + self._position_subscribed = True + with self._lock: + if not hasattr(self, "_position_handle"): + handle = self._sensor_service.getCharacteristics(_SENSOR.QUATERNIONS_CHAR.value)[0] + self._position_handle = handle.getHandle() + + self.writeCharacteristic(self._position_handle + 1, bytes([1, 0])) + self._start_notification_thread() + + def unsubscribe_position(self, continue_notifications=False): + """Unsubscribe to position notifications + + Keyword Arguments: + continue_notifications {bool} -- Keep notification thread running (default: {False}) + """ + if self.debug: + print("Unsubscribing from position notification") + + self._position_subscribed = continue_notifications + with self._lock: + if not hasattr(self, "_position_handle"): + handle = self._sensor_service.getCharacteristics(_SENSOR.QUATERNIONS_CHAR.value)[0] + self._position_handle = handle.getHandle() + + self.writeCharacteristic(self._position_handle + 1, bytes([0, 0])) + + def subscribe_button(self): + """Subscribe to button notifications and start thread if necessary + """ + if self.debug: + print("Subscribing to button notification") + + self._button_subscribed = True + with self._lock: + if not hasattr(self, "_button_handle"): + handle = self._io_service.getCharacteristics(_IO.USER_BUTTON_CHAR.value)[0] + self._button_handle = handle.getHandle() + + self.writeCharacteristic(self._button_handle + 1, bytes([1, 0])) + self._start_notification_thread() + + def unsubscribe_button(self, continue_notifications=False): + """Unsubscribe to button notifications + + Keyword Arguments: + continue_notifications {bool} -- Keep notification thread running (default: {False}) + """ + if self.debug: + print("Unsubscribing from button notification") + + self._button_subscribed = continue_notifications + with self._lock: + if not hasattr(self, "_button_handle"): + handle = self._io_service.getCharacteristics(_IO.USER_BUTTON_CHAR.value)[0] + self._button_handle = handle.getHandle() + + self.writeCharacteristic(self._button_handle + 1, bytes([0, 0])) + + def subscribe_temperature(self): + """Subscribe to temperature notifications and start thread if necessary + """ + if self.debug: + print("Subscribing to temperature notification") + + self._temperature_subscribed = True + with self._lock: + if not hasattr(self, "_temp_handle"): + handle = self._sensor_service.getCharacteristics(_SENSOR.TEMP_CHAR.value)[0] + self._temp_handle = handle.getHandle() + + self.writeCharacteristic(self._temp_handle + 1, bytes([1, 0])) + self._start_notification_thread() + + def unsubscribe_temperature(self, continue_notifications=False): + """Unsubscribe to temperature notifications + + Keyword Arguments: + continue_notifications {bool} -- Keep notification thread running (default: {False}) + """ + if self.debug: + print("Unsubscribing from temperature notification") + + self._temperature_subscribed = continue_notifications + with self._lock: + if not hasattr(self, "_temp_handle"): + handle = self._sensor_service.getCharacteristics(_SENSOR.TEMP_CHAR.value)[0] + self._temp_handle = handle.getHandle() + + self.writeCharacteristic(self._temp_handle + 1, bytes([0, 0])) + + def subscribe_battery(self): + """Subscribe to battery notifications and start thread if necessary + """ + if self.debug: + print("Subscribing to battery notification") + + self._battery_subscribed = True + with self._lock: + if not hasattr(self, "_battery_handle"): + handle = self._io_service.getCharacteristics(_IO.BATTERY_CHAR .value)[0] + self._battery_handle = handle.getHandle() + + self.writeCharacteristic(self._battery_handle + 1, bytes([1, 0])) + self._start_notification_thread() + + def unsubscribe_battery(self, continue_notifications=False): + """Unsubscribe to battery notifications + + Keyword Arguments: + continue_notifications {bool} -- Keep notification thread running (default: {False}) + """ + if self.debug: + print("Unsubscribing from battery notification") + + self._battery_subscribed = continue_notifications + with self._lock: + if not hasattr(self, "_battery_handle"): + handle = self._io_service.getCharacteristics(_IO.BATTERY_CHAR .value)[0] + self._battery_handle = handle.getHandle() + + self.writeCharacteristic(self._battery_handle + 1, bytes([0, 0])) + + def _start_notification_thread(self): + try: + if self._notification_thread == None: + self.reset_position() + self._notification_thread = threading.Thread(target=self._notification_wait) + self._notification_thread.start() + except: + pass + + def _notification_wait(self): + if self.debug: + print("Notification thread started") + + while (self.connected and + (self._position_subscribed or + self._button_subscribed or + self._temperature_subscribed or + self._battery_subscribed)): + try: + if super().waitForNotifications(1): + continue + except: + continue + + if self.debug: + print("Notification thread stopped") + + def _on_position(self, data): + """Private function for position notification + + Arguments: + data {bytes} -- Data from device + """ + # I got part of this from Kano's node module and modified it + y = numpy.int16(numpy.uint16(int.from_bytes(data[0:2], byteorder='little'))) + x = -1 * numpy.int16(numpy.uint16(int.from_bytes(data[2:4], byteorder='little'))) + w = -1 * numpy.int16(numpy.uint16(int.from_bytes(data[4:6], byteorder='little'))) + z = numpy.int16(numpy.uint16(int.from_bytes(data[6:8], byteorder='little'))) + + if self.debug: + pitch = "Pitch: {}".format(z).ljust(16) + roll = "Roll: {}".format(w).ljust(16) + print("{}{}(x, y): ({}, {})".format(pitch, roll, x, y)) + + self.on_position(x, y, z, w) + for callback in self._position_callbacks.values(): + callback(x, y, z, w) + + def on_position(self, roll, x, y, z): + """Function called on position notification + + Arguments: + x {int} -- X position of wand (Between -1000 and 1000) + y {int} -- Y position of wand (Between -1000 and 1000) + pitch {int} -- Pitch of wand (Between -1000 and 1000) + roll {int} -- Roll of wand (Between -1000 and 1000) + """ + pass + + def reset_position(self): + """Reset the quaternains of the wand + """ + handle = self._sensor_service.getCharacteristics(_SENSOR.QUATERNIONS_RESET_CHAR.value)[0].getHandle() + with self._lock: + self.writeCharacteristic(handle, bytes([1])) + + def _on_button(self, data): + """Private function for button notification + + Arguments: + data {bytes} -- Data from device + """ + val = data[0] == 1 + + if self.debug: + print("Button: {}".format(val)) + + self.on_button(val) + for callback in self._button_callbacks.values(): + callback(val) + + def on_button(self, value): + """Function called on button notification + + Arguments: + pressed {bool} -- If button is pressed + """ + pass + + def _on_temperature(self, data): + """Private function for temperature notification + + Arguments: + data {bytes} -- Data from device + """ + val = numpy.int16(numpy.uint16(int.from_bytes(data[0:2], byteorder='little'))) + + if self.debug: + print("Temperature: {}".format(val)) + + self.on_temperature(val) + for callback in self._temperature_callbacks.values(): + callback(val) + + def on_temperature(self, value): + """Function called on temperature notification + + Arguments: + value {int} -- Temperature of the wand + """ + pass + + def _on_battery(self, data): + """Private function for battery notification + + Arguments: + data {bytes} -- Data from device + """ + val = data[0] + + if self.debug: + print("Battery: {}".format(val)) + + self.on_battery(val) + for callback in self._battery_callbacks.values(): + callback(val) + + def on_battery(self, value): + """Function called on battery notification + + Arguments: + value {int} -- Battery level of the wand + """ + + def handleNotification(self, cHandle, data): + """Handle notifications subscribed to + + Arguments: + cHandle {int} -- Handle of notification + data {bytes} -- Data from device + """ + if cHandle == self._position_notification_handle: + self._on_position(data) + elif cHandle == self._button_notification_handle: + self._on_button(data) + elif cHandle == self._temp_notification_handle: + self._on_temperature(data) + elif cHandle == self._battery_notification_handle: + self._on_battery(data) + +class Shop(DefaultDelegate): + """A scanner class to connect to wands + """ + def __init__(self, wand_class=Wand, debug=False): + """Create a new scanner + + Keyword Arguments: + wand_class {class} -- Class to use when connecting to wand (default: {Wand}) + debug {bool} -- Print debug messages (default: {False}) + """ + super().__init__() + self.wand_class = wand_class + self.debug = debug + self._name = None + self._prefix = None + self._mac = None + self._scanner = Scanner().withDelegate(self) + + def scan(self, name=None, prefix="Kano-Wand", mac=None, timeout=1.0, connect=False): + """Scan for devices + + Keyword Arguments: + name {str} -- Name of the device to scan for (default: {None}) + prefix {str} -- Prefix of name of device to scan for (default: {"Kano-Wand"}) + mac {str} -- MAC Address of the device to scan for (default: {None}) + timeout {float} -- Timeout before returning from scan (default: {1.0}) + connect {bool} -- Connect to the wands automatically (default: {False}) + + Returns {Wand[]} -- Array of wand objects + """ + + if self.debug: + print("Scanning for {} seconds...".format(timeout)) + try: + name_check = not (name is None) + prefix_check = not (prefix is None) + mac_check = not (mac is None) + assert name_check or prefix_check or mac_check + except AssertionError as e: + print("Either a name, prefix, or mac address must be provided to find a wand") + raise e + + if name is not None: + self._name = name + elif prefix is not None: + self._prefix = prefix + elif mac is not None: + self._mac = mac + + self.wands = [] + self._scanner.scan(timeout) + if connect: + for wand in self.wands: + wand.connect() + return self.wands + + def handleDiscovery(self, device, isNewDev, isNewData): + """Check if the device matches + + Arguments: + device {bluepy.ScanEntry} -- Device data + isNewDev {bool} -- Whether the device is new + isNewData {bool} -- Whether the device has already been seen + """ + + if isNewDev: + # Perform initial detection attempt + mode = 0 + found = 0 + name = device.getValueText(9) + if self._name is not None: + mode += 1 + if name == self._name: + found += 1 + if self._prefix is not None: + mode += 1 + if name is not None and name.startswith(self._prefix): + found += 1 + if self._mac is not None: + mode += 1 + if device.addr == self._mac: + found += 1 + + if found >= mode: + self.wands.append(self.wand_class(device, debug=self.debug)) + elif self.debug: + if name != "None": + print("Mac: {}\tCommon Name: {}".format(device.addr, name)) + else: + print("Mac: {}".format(device.addr)) diff --git a/collect_data.py b/collect_data.py index 54d6ee9..5357776 100644 --- a/collect_data.py +++ b/collect_data.py @@ -5,88 +5,92 @@ import platform import uuid import pandas as pd from kano_wand.ble_client import KanoBLEClient -from kano_wand.constants import NINE_AXIS, ACCELEROMETER, BUTTON from time import sleep from tkinter import * +from bleak import discover """ This is used for reading and decoding values from t he Kano Harry Potter Coding Wand - Wand sends 25 updates to gyro and accel every second -- Wand sends 3 Dimensional data as 48 bit reverse marshalled integers -- Wand seems to send smaller single dimensional data in 16bit unsigned integers """ HEIGHT = 2 WIDTH = 35 -LOOP = asyncio.get_event_loop() -class SpellCollectionGUI: - def __init__(self, root, spells, kano_ble_client): - self.root = root - self.kano_ble_client = kano_ble_client - self.data_output_file = f"recording-session-{uuid.uuid4()}.csv" +# class SpellCollectionGUI: +# def __init__(self, root, spells, kano_ble_client): +# self.root = root +# self.kano_ble_client = kano_ble_client +# self.data_output_file = f"recording-session-{uuid.uuid4()}.csv" +# +# self.root = root +# self.root.title("Microphone Recorder") +# +# self.label = Label(root, text="Select a spell, and then press start to begin recording spell data!") +# self.label.pack() +# +# self.spell_variable = StringVar(root) +# self.spell_list = OptionMenu(root, self.spell_variable, +# *["", *[spell for spell in spells] +# ], command=self.select_spell) +# self.spell_list.config(height=HEIGHT, width=WIDTH) +# self.spell_list.pack() +# +# self.recording_button = Button(root, text="Start Recording", command=self.start_recording, height=HEIGHT, +# width=WIDTH) +# self.recording_button.pack() +# +# self.close_button = Button(root, text="Close", command=root.quit, height=HEIGHT, width=WIDTH) +# self.close_button.pack() +# +# self.spell = None +# self.root.after(0, self.kano_ble_client.connect, True) +# +# def select_spell(self, selected_value): +# self.spell = selected_value +# +# def start_recording(self): +# print("Started Recording") +# self.root.after(0) +# +# self.kano_ble_client.start_recieving_data() +# +# +# def select_spell(spells): +# try: +# spell = spells[int(input("Please select a spell.\n " + +# "\n".join([f"{s} - {spells[s]}" for s in range(len(spells))]) + +# "\nenter -1 to exit\n"))] +# print(f"You have selected {spell}") +# return spell +# except Exception as e: +# print(e) +# print("Selected spell is uncorrect") +# return None - self.root = root - self.root.title("Microphone Recorder") - - self.label = Label(root, text="Select a spell, and then press start to begin recording spell data!") - self.label.pack() - - self.spell_variable = StringVar(root) - self.spell_list = OptionMenu(root, self.spell_variable, - *["", *[spell for spell in spells] - ], command=self.select_spell) - self.spell_list.config(height=HEIGHT, width=WIDTH) - self.spell_list.pack() - - self.recording_button = Button(root, text="Start Recording", command=self.start_recording, height=HEIGHT, - width=WIDTH) - self.recording_button.pack() - - self.close_button = Button(root, text="Close", command=root.quit, height=HEIGHT, width=WIDTH) - self.close_button.pack() - - self.spell = None - self.root.after(0, self.kano_ble_client.connect, True) - - def select_spell(self, selected_value): - self.spell = selected_value - - def start_recording(self): - print("Started Recording") - self.root.after(0) - - self.kano_ble_client.start_recieving_data() - - -def select_spell(spells): - try: - spell = spells[int(input("Please select a spell.\n " + - "\n".join([f"{s} - {spells[s]}" for s in range(len(spells))]) + - "\nenter -1 to exit\n"))] - print(f"You have selected {spell}") - return spell - except Exception as e: - print(e) - print("Selected spell is uncorrect") - return None async def main(loop): - device_address = "D8:9B:12:D1:08:80" - - os.environ["PYTHONASYNCIODEBUG"] = str(1) - address = ( - device_address # <--- Change to your device's address here if you are using Windows or Linux - if platform.system() != "Darwin" - else "243E23AE-4A99-406C-B317-18F1BD7B4CBE" # <--- Change to your device's address here if you are using macOS - ) + # device_address = "D8:9B:12:D1:08:80" + # + # os.environ["PYTHONASYNCIODEBUG"] = str(1) + # address = ( + # device_address # <--- Change to your device's address here if you are using Windows or Linux + # if platform.system() != "Darwin" + # else "243E23AE-4A99-406C-B317-18F1BD7B4CBE" # <--- Change to your device's address here if you are using macOS + # ) # One method for connect # One for getting all data # One for getting data when button is pressed + devices = await discover() + for d in devices: + print(d) + print(d.name, d.address) + + spells = pd.read_csv("spells.csv")["spells"] - sensors = [NINE_AXIS, ACCELEROMETER, BUTTON] + kble = KanoBLEClient(address, loop) await kble.connect(True) await kble.start_recieving_data(sensors) @@ -98,8 +102,8 @@ async def main(loop): await kble.stop_recieving_data(sensors) print("finished main") - -df = None +# +# df = None diff --git a/kano-wand-2/constants.py b/kano-wand-2/constants.py new file mode 100644 index 0000000..f32b740 --- /dev/null +++ b/kano-wand-2/constants.py @@ -0,0 +1,38 @@ +from enum import Enum + +class INFO(Enum): + """Enum containing info UUIDs""" + SERVICE = '64A70010-F691-4B93-A6F4-0968F5B648F8' + ORGANIZATION_CHAR = '64A7000B-F691-4B93-A6F4-0968F5B648F8' + SOFTWARE_CHAR = '64A70013-F691-4B93-A6F4-0968F5B648F8' + HARDWARE_CHAR = '64A70001-F691-4B93-A6F4-0968F5B648F8' + +class IO(Enum): + """Enum containing _IO UUIDs""" + SERVICE = '64A70012-F691-4B93-A6F4-0968F5B648F8' + BATTERY_CHAR = '64A70007-F691-4B93-A6F4-0968F5B648F8' + USER_BUTTON_CHAR = '64A7000D-F691-4B93-A6F4-0968F5B648F8' + VIBRATOR_CHAR = '64A70008-F691-4B93-A6F4-0968F5B648F8' + LED_CHAR = '64A70009-F691-4B93-A6F4-0968F5B648F8' + KEEP_ALIVE_CHAR = '64A7000F-F691-4B93-A6F4-0968F5B648F8' + +class SENSOR(Enum): + """Enum containing sensor UUIDs""" + SERVICE = '64A70011-F691-4B93-A6F4-0968F5B648F8' + TEMP_CHAR = '64A70014-F691-4B93-A6F4-0968F5B648F8' + QUATERNIONS_CHAR = '64A70002-F691-4B93-A6F4-0968F5B648F8' + # RAW_CHAR = '64A7000A-F691-4B93-A6F4-0968F5B648F8' + # MOTION_CHAR = '64A7000C-F691-4B93-A6F4-0968F5B648F8' + MAGN_CALIBRATE_CHAR = '64A70021-F691-4B93-A6F4-0968F5B648F8' + QUATERNIONS_RESET_CHAR = '64A70004-F691-4B93-A6F4-0968F5B648F8' + + +class PATTERN(Enum): + """Enum for wand vibration patterns""" + REGULAR = 1 + SHORT = 2 + BURST = 3 + LONG = 4 + SHORT_LONG = 5 + SHORT_SHORT = 6 + BIG_PAUSE = 7 \ No newline at end of file diff --git a/kano-wand-2/shop.py b/kano-wand-2/shop.py new file mode 100644 index 0000000..2e17fa8 --- /dev/null +++ b/kano-wand-2/shop.py @@ -0,0 +1,97 @@ +from bleak import discover +from wand import Wand + +class Shop(object): + """A scanner class to connect to wands + """ + def __init__(self, wand_class=Wand, debug=False): + """Create a new scanner + + Keyword Arguments: + wand_class {class} -- Class to use when connecting to wand (default: {Wand}) + debug {bool} -- Print debug messages (default: {False}) + """ + self.wand_class = wand_class + self.debug = debug + self._name = None + self._prefix = None + self._mac = None + + async def scan(self, name=None, prefix="Kano-Wand", mac=None, timeout=2.0, connect=False): + """Scan for devices + + Keyword Arguments: + name {str} -- Name of the device to scan for (default: {None}) + prefix {str} -- Prefix of name of device to scan for (default: {"Kano-Wand"}) + mac {str} -- MAC Address of the device to scan for (default: {None}) + timeout {float} -- Timeout before returning from scan (default: {1.0}) + connect {bool} -- Connect to the wands automatically (default: {False}) + + Returns {Wand[]} -- Array of wand objects + """ + + if self.debug: + print("Scanning for {} seconds...".format(timeout)) + try: + name_check = not (name is None) + prefix_check = not (prefix is None) + mac_check = not (mac is None) + assert name_check or prefix_check or mac_check + except AssertionError as e: + print("Either a name, prefix, or mac address must be provided to find a wand") + raise e + + if name is not None: + self._name = name + elif prefix is not None: + self._prefix = prefix + elif mac is not None: + self._mac = mac + + self.wands = [] + + devices = await discover(timeout= timeout) + + #TODO: Add in wand filtering. + + # if isNewDev: + # # Perform initial detection attempt + # mode = 0 + # found = 0 + # name = device.getValueText(9) + # if self._name is not None: + # mode += 1 + # if name == self._name: + # found += 1 + # if self._prefix is not None: + # mode += 1 + # if name is not None and name.startswith(self._prefix): + # found += 1 + # if self._mac is not None: + # mode += 1 + # if device.addr == self._mac: + # found += 1 + # + # if found >= mode: + # self.wands.append(self.wand_class(device, debug=self.debug)) + # elif self.debug: + # if name != "None": + # print("Mac: {}\tCommon Name: {}".format(device.addr, name)) + # else: + # print("Mac: {}".format(device.addr)) + print(devices) + devices = list(filter(lambda x : x.name.startswith("kano"), devices)) + print(devices) + self.wands = [Wand(d.address, d.name) for d in devices] + # for d in devices: + # #Convert to device making here. + # print(d) + # print(d.name, d.address) + print(self.wands) + + #TODO: Process device list here for the shop - > not all BLE devices are necessary + # client = BleakClient(self.wand_address, loop=self.loop) + if connect: + for wand in self.wands: + wand.connect() + return self.wands diff --git a/kano-wand-2/wand.py b/kano-wand-2/wand.py new file mode 100644 index 0000000..ca291cd --- /dev/null +++ b/kano-wand-2/wand.py @@ -0,0 +1,608 @@ +import threading +import numpy as np + +from constants import * +from bleak import BleakClient +from uuid import uuid4 + + +class Wand(object): + """A wand class to interact with the Kano wand + """ + + def __init__(self, device_addr, name, debug=True): + """Create a new wand + + Arguments: + device {bluepy.ScanEntry} -- Device information + + Keyword Arguments: + debug {bool} -- Print debug messages (default: {False}) + """ + super().__init__(None) + self.debug = debug + self._dev = BleakClient(device_addr) + self.name = name + + if debug: + print("Wand: {}\n\rWand Mac: {}".format(self.name, self._dev.address)) + + # Notification stuff + self.connected = False + self._position_callbacks = {} + self._position_subscribed = False + self._button_callbacks = {} + self._button_subscribed = False + self._temperature_callbacks = {} + self._temperature_subscribed = False + self._battery_callbacks = {} + self._battery_subscribed = False + self._notification_thread = None + self._position_notification_handle = 41 + self._button_notification_handle = 33 + self._temp_notification_handle = 56 + self._battery_notification_handle = 23 + + async def connect(self): + if self.debug: + print("Connecting to {}...".format(self.name)) + + connected = await self._dev.connect() + if not connected: + raise Exception("ERROR NOT CONNECTED TO THE DEVICE") + + self.connected = True + self.post_connect() + + if self.debug: + print("Connected to {}".format(self.name)) + + def post_connect(self): + """Do anything necessary after connecting + """ + pass + + def disconnect(self): + + self._dev.disconnect() + self.connected = False + self.post_disconnect() + + if self.debug: + print("Disconnected from {}".format(self.name)) + + def post_disconnect(self): + """Do anything necessary after disconnecting + """ + pass + + # def get_organization(self): + # """Get organization of device + # + # Returns {str} -- Organization name + # """ + # # with self._lock: + # if not hasattr(self, "_organization_handle"): + # handle = self._info_service.getCharacteristics(INFO.ORGANIZATION_CHAR.value)[0] + # self._organization_handle = handle.getHandle() + # return self.readCharacteristic(self._organization_handle).decode("utf-8") + + # def get_software_version(self): + # + # """Get software version + # + # Returns {str} -- Version number + # """ + # #with self._lock: + # if not hasattr(self, "_software_handle"): + # handle = self._info_service.getCharacteristics(INFO.SOFTWARE_CHAR.value)[0] + # self._software_handle = handle.getHandle() + # return self.readCharacteristic(self._software_handle).decode("utf-8") + + # def get_hardware_version(self): + # + # """Get hardware version + # + # Returns {str} -- Hardware version + # """ + # #with self._lock: + # if not hasattr(self, "_hardware_handle"): + # handle = self._info_service.getCharacteristics(INFO.HARDWARE_CHAR.value)[0] + # self._hardware_handle = handle.getHandle() + # return self.readCharacteristic(self._hardware_handle).decode("utf-8") + # + # + # def get_battery(self): + # """Get battery level (currently only returns 0) + # + # Returns {str} -- Battery level + # """ + # # with self._lock: + # if not hasattr(self, "_battery_handle"): + # handle = self._io_service.getCharacteristics(IO.BATTERY_CHAR.value)[0] + # self._battery_handle = handle.getHandle() + # return self.readCharacteristic(self._battery_handle).decode("utf-8") + + + async def get_button(self): + """Get current button status + + Returns {bool} -- Button pressed status + """ + # with self._lock: + # if not hasattr(self, "_button_handle"): + # handle = self._io_service.getCharacteristics(IO.USER_BUTTON_CHAR.value)[0] + # self._button_handle = handle.getHandle() + # data = self.readCharacteristic(self._button_handle) + data = await self._dev.read_gatt_char(IO.USER_BUTTON_CHAR.value) + return data[0] == 1 + + + async def get_temperature(self): + """Get temperature + + Returns {str} -- Battery level + """ + # with self._lock: + # if not hasattr(self, "_temperature_handle"): + # handle = self._sensor_service.getCharacteristics(SENSOR.TEMP_CHAR.value)[0] + # self._temperature_handle = handle.getHandle() + return await self._dev.read_gatt_char(SENSOR.TEMP_CHAR.value) + + + async def keep_alive(self): + """Keep the wand's connection active + + Returns {bytes} -- Status + """ + # Is not documented because it doesn't seem to work? + if self.debug: + print("Keeping wand alive.") + + # with self._lock: + # if not hasattr(self, "_alive_handle"): + # handle = self._io_service.getCharacteristics(IO.KEEP_ALIVE_CHAR.value)[0] + # self._alive_handle = handle.getHandle() + return await self._dev.write_gatt_char(IO.KEEP_ALIVE_CHAR.value, bytes([1]), response=True) + # writeCharacteristic(self._alive_handle, bytes([1]), withResponse=True) + + + async def vibrate(self, pattern=PATTERN.REGULAR): + """Vibrate wand with pattern + + Keyword Arguments: + pattern {kano_wand.PATTERN} -- Vibration pattern (default: {PATTERN.REGULAR}) + + Returns {bytes} -- Status + """ + # with self._lock: + if isinstance(pattern, PATTERN): + message = [pattern.value] + else: + message = [pattern] + + if self.debug: + print("Setting VibrationLed to {}".format(message)) + # + # if not hasattr(self, "_vibrator_handle"): + # handle = self._io_service.getCharacteristics(IO.VIBRATOR_CHAR.value)[0] + # self._vibrator_handle = handle.getHandle() + # self.writeCharacteristic(self._vibrator_handle, bytes(message), withResponse=True) + + return await self._dev.write_gatt_char(IO.VIBRATOR_CHAR.value, bytes(message), response=True) + + + async def set_led(self, color="0x2185d0", on=True): + """Set the LED's color + + Keyword Arguments: + color {str} -- Color hex code (default: {"0x2185d0"}) + on {bool} -- Whether light is on or off (default: {True}) + + Returns {bytes} -- Status + """ + message = [] + if on: + message.append(1) + else: + message.append(0) + + # I got this from Kano's node module + color = int(color.replace("#", ""), 16) + r = (color >> 16) & 255 + g = (color >> 8) & 255 + b = color & 255 + rgb = (((r & 248) << 8) + ((g & 252) << 3) + ((b & 248) >> 3)) + message.append(rgb >> 8) + message.append(rgb & 0xff) + + if self.debug: + print("Setting LED to {}".format(message)) + + # with self._lock: + # if not hasattr(self, "_led_handle"): + # handle = self._io_service.getCharacteristics(IO.LED_CHAR.value)[0] + # self._led_handle = handle.getHandle() + # self.writeCharacteristic(self._led_handle, bytes(message), withResponse=True) + return await self._dev.write_gatt_char(IO.LED_CHAR.value, bytes(message), response=True) + + + # SENSORS + # def on(self, event, callback): + # """Add an event listener + # + # Arguments: + # event {str} -- Event type, "position", "button", "temp", or "battery" + # callback {function} -- Callback function + # + # Returns {str} -- ID of the callback for removal later + # """ + # if self.debug: + # print("Adding callback for {} notification...".format(event)) + # + # id = None + # if event == "position": + # id = uuid4() + # self._position_callbacks[id] = callback + # self.subscribe_position() + # elif event == "button": + # id = uuid4() + # self._button_callbacks[id] = callback + # self.subscribe_button() + # elif event == "temp": + # id = uuid4() + # self._temperature_callbacks[id] = callback + # self.subscribe_temperature() + # elif event == "battery": + # id = uuid4() + # self._battery_callbacks[id] = callback + # self.subscribe_battery() + # + # return id + # + # + # def off(self, uuid, continue_notifications=False): + # """Remove a callback + # + # Arguments: + # uuid {str} -- Remove a callback with its id + # + # Keyword Arguments: + # continue_notifications {bool} -- Keep notification thread running (default: {False}) + # + # Returns {bool} -- If removal was successful or not + # """ + # removed = False + # if self._position_callbacks.get(uuid) != None: + # removed = True + # self._position_callbacks.pop(uuid) + # if len(self._position_callbacks.values()) == 0: + # self.unsubscribe_position(continue_notifications=continue_notifications) + # elif self._button_callbacks.get(uuid) != None: + # removed = True + # self._button_callbacks.pop(uuid) + # if len(self._button_callbacks.values()) == 0: + # self.unsubscribe_button(continue_notifications=continue_notifications) + # elif self._temperature_callbacks.get(uuid) != None: + # removed = True + # self._temperature_callbacks.pop(uuid) + # if len(self._temperature_callbacks.values()) == 0: + # self.unsubscribe_temperature(continue_notifications=continue_notifications) + # elif self._battery_callbacks.get(uuid) != None: + # removed = True + # self._battery_callbacks.pop(uuid) + # if len(self._battery_callbacks.values()) == 0: + # self.unsubscribe_battery(continue_notifications=continue_notifications) + # + # if self.debug: + # if removed: + # print("Removed callback {}".format(uuid)) + # else: + # print("Could not remove callback {}".format(uuid)) + # + # return removed + + + # def subscribe_position(self): + # """Subscribe to position notifications and start thread if necessary + # """ + # if self.debug: + # print("Subscribing to position notification") + # + # self._position_subscribed = True + # # with self._lock: + # if not hasattr(self, "_position_handle"): + # handle = self._sensor_service.getCharacteristics(SENSOR.QUATERNIONS_CHAR.value)[0] + # self._position_handle = handle.getHandle() + # + # self.writeCharacteristic(self._position_handle + 1, bytes([1, 0])) + # + # + # self._start_notification_thread() + # + # + # def unsubscribe_position(self, continue_notifications=False): + # """Unsubscribe to position notifications + # + # Keyword Arguments: + # continue_notifications {bool} -- Keep notification thread running (default: {False}) + # """ + # if self.debug: + # print("Unsubscribing from position notification") + # + # self._position_subscribed = continue_notifications + # # with self._lock: + # if not hasattr(self, "_position_handle"): + # handle = self._sensor_service.getCharacteristics(SENSOR.QUATERNIONS_CHAR.value)[0] + # self._position_handle = handle.getHandle() + # + # self.writeCharacteristic(self._position_handle + 1, bytes([0, 0])) + + + async def subscribe_button(self): + """Subscribe to button notifications and start thread if necessary + """ + if self.debug: + print("Subscribing to button notification") + + self._button_subscribed = True + await self._dev.start_notify(IO.USER_BUTTON_CHAR.value, self._on_button) + # with self._lock: + # if not hasattr(self, "_button_handle"): + # handle = self._io_service.getCharacteristics(IO.USER_BUTTON_CHAR.value)[0] + # self._button_handle = handle.getHandle() + + # self.writeCharacteristic(self._button_handle + 1, bytes([1, 0])) + # self._start_notification_thread() + + + + async def unsubscribe_button(self, continue_notifications=False): + """Unsubscribe to button notifications + + Keyword Arguments: + continue_notifications {bool} -- Keep notification thread running (default: {False}) + """ + if self.debug: + print("Unsubscribing from button notification") + + self._button_subscribed = continue_notifications + # with self._lock: + # if not hasattr(self, "_button_handle"): + # handle = self._io_service.getCharacteristics(IO.USER_BUTTON_CHAR.value)[0] + # self._button_handle = handle.getHandle() + # + # self.writeCharacteristic(self._button_handle + 1, bytes([0, 0])) + await self._dev.stop_notify(IO.USER_BUTTON_CHAR.value) + + + # def subscribe_temperature(self): + # """Subscribe to temperature notifications and start thread if necessary + # """ + # if self.debug: + # print("Subscribing to temperature notification") + # + # self._temperature_subscribed = True + # # with self._lock: + # if not hasattr(self, "_temp_handle"): + # handle = self._sensor_service.getCharacteristics(SENSOR.TEMP_CHAR.value)[0] + # self._temp_handle = handle.getHandle() + # + # self.writeCharacteristic(self._temp_handle + 1, bytes([1, 0])) + # + # self._start_notification_thread() + # + # + # def unsubscribe_temperature(self, continue_notifications=False): + # """Unsubscribe to temperature notifications + # + # Keyword Arguments: + # continue_notifications {bool} -- Keep notification thread running (default: {False}) + # """ + # if self.debug: + # print("Unsubscribing from temperature notification") + # + # self._temperature_subscribed = continue_notifications + # # with self._lock: + # if not hasattr(self, "_temp_handle"): + # handle = self._sensor_service.getCharacteristics(SENSOR.TEMP_CHAR.value)[0] + # self._temp_handle = handle.getHandle() + # + # self.writeCharacteristic(self._temp_handle + 1, bytes([0, 0])) + # + # + # def subscribe_battery(self): + # """Subscribe to battery notifications and start thread if necessary + # """ + # if self.debug: + # print("Subscribing to battery notification") + # + # self._battery_subscribed = True + # # with self._lock: + # if not hasattr(self, "_battery_handle"): + # handle = self._io_service.getCharacteristics(IO.BATTERY_CHAR.value)[0] + # self._battery_handle = handle.getHandle() + # + # self.writeCharacteristic(self._battery_handle + 1, bytes([1, 0])) + # self._start_notification_thread() + # + # + # def unsubscribe_battery(self, continue_notifications=False): + # """Unsubscribe to battery notifications + # + # Keyword Arguments: + # continue_notifications {bool} -- Keep notification thread running (default: {False}) + # """ + # if self.debug: + # print("Unsubscribing from battery notification") + # + # self._battery_subscribed = continue_notifications + # # with self._lock: + # if not hasattr(self, "_battery_handle"): + # handle = self._io_service.getCharacteristics(IO.BATTERY_CHAR.value)[0] + # self._battery_handle = handle.getHandle() + # + # self.writeCharacteristic(self._battery_handle + 1, bytes([0, 0])) + # + # + # def _start_notification_thread(self): + # try: + # if self._notification_thread == None: + # self.reset_position() + # self._notification_thread = threading.Thread(target=self._notification_wait) + # self._notification_thread.start() + # except: + # pass + + + # def _notification_wait(self): + # if self.debug: + # print("Notification thread started") + # + # while (self.connected and + # (self._position_subscribed or + # self._button_subscribed or + # self._temperature_subscribed or + # self._battery_subscribed)): + # try: + # if super().waitForNotifications(1): + # continue + # except: + # continue + # + # if self.debug: + # print("Notification thread stopped") + + + def _on_position(self, data): + """Private function for position notification + + Arguments: + data {bytes} -- Data from device + """ + # I got part of this from Kano's node module and modified it + y = np.int16(np.uint16(int.from_bytes(data[0:2], byteorder='little'))) + x = -1 * np.int16(np.uint16(int.from_bytes(data[2:4], byteorder='little'))) + w = -1 * np.int16(np.uint16(int.from_bytes(data[4:6], byteorder='little'))) + z = np.int16(np.uint16(int.from_bytes(data[6:8], byteorder='little'))) + + if self.debug: + pitch = "Pitch: {}".format(z).ljust(16) + roll = "Roll: {}".format(w).ljust(16) + print("{}{}(x, y): ({}, {})".format(pitch, roll, x, y)) + + self.on_position(x, y, z, w) + for callback in self._position_callbacks.values(): + callback(x, y, z, w) + + + def on_position(self, roll, x, y, z): + """Function called on position notification + + Arguments: + x {int} -- X position of wand (Between -1000 and 1000) + y {int} -- Y position of wand (Between -1000 and 1000) + pitch {int} -- Pitch of wand (Between -1000 and 1000) + roll {int} -- Roll of wand (Between -1000 and 1000) + """ + pass + + + # def reset_position(self): + # """Reset the quaternains of the wand + # """ + # handle = self._sensor_service.getCharacteristics(SENSOR.QUATERNIONS_RESET_CHAR.value)[0].getHandle() + # # with self._lock: + # self.writeCharacteristic(handle, bytes([1])) + + + def _on_button(self, data): + """Private function for button notification + + Arguments: + data {bytes} -- Data from device + """ + val = data[0] == 1 + + if self.debug: + print("Button: {}".format(val)) + + self.on_button(val) + for callback in self._button_callbacks.values(): + callback(val) + + + def on_button(self, value): + """Function called on button notification + + Arguments: + pressed {bool} -- If button is pressed + """ + pass + + + def _on_temperature(self, data): + """Private function for temperature notification + + Arguments: + data {bytes} -- Data from device + """ + val = np.int16(np.uint16(int.from_bytes(data[0:2], byteorder='little'))) + + if self.debug: + print("Temperature: {}".format(val)) + + self.on_temperature(val) + for callback in self._temperature_callbacks.values(): + callback(val) + + + def on_temperature(self, value): + """Function called on temperature notification + + Arguments: + value {int} -- Temperature of the wand + """ + pass + + + def _on_battery(self, data): + """Private function for battery notification + + Arguments: + data {bytes} -- Data from device + """ + val = data[0] + + if self.debug: + print("Battery: {}".format(val)) + + self.on_battery(val) + for callback in self._battery_callbacks.values(): + callback(val) + + + def on_battery(self, value): + """Function called on battery notification + + Arguments: + value {int} -- Battery level of the wand + """ + pass + + + def handleNotification(self, cHandle, data): + """Handle notifications subscribed to + + Arguments: + cHandle {int} -- Handle of notification + data {bytes} -- Data from device + """ + if cHandle == SENSOR.QUATERNIONS_CHAR.value: + self._on_position(data) + elif cHandle == IO.USER_BUTTON_CHAR.value: + self._on_button(data) + elif cHandle == SENSOR.TEMP_CHAR.value: + self._on_temperature(data) + elif cHandle == IO.BATTERY_CHAR.value: + self._on_battery(data) diff --git a/kano-wand-test.py b/kano-wand-test.py new file mode 100644 index 0000000..18dc3c1 --- /dev/null +++ b/kano-wand-test.py @@ -0,0 +1,65 @@ +from shop import Shop +from wand import Wand +from constants import * +import asyncio +import sys + +# Custom wand class extending the default wand +class MyWand(Wand): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.colors = ["#a333c8", "2185d0", "0x21ba45", "#fbbd08", "#f2711c", "#db2828"] + self.position_id = None + + # Do some functions after connecting + def post_connect(self): + print("Connected to {}".format(self.name)) + # # Vibrate the wand and set its color to red + # self.set_led(self.colors.pop()) + # # Subscribe to notifications + # self.subscribe_button() + + # Button callback, automatically called after connecting to wand + def on_button(self, pressed): + if pressed: + # Unsubscribe from the position callback + # NOTE You could pass `continue_notifications=True` + # to continue using the wand's `on_position` function + + self.set_led(self.colors.pop()) + # Disconnect if we run out of colors + if len(self.colors) == 0: + self.disconnect() + +async def main(): + + # Create a new wand scanner + shop = Shop(wand_class=MyWand, debug=True) + wands = [] + try: + # While we don't have any wands + while len(wands) == 0: + # Scan for wands and automatically connect + print("Scanning...") + wands = await shop.scan(connect=True) + # For each wand (Only tested with one) + for wand in wands: + # Vibrate the wand and set its color to red + wand.vibrate(PATTERN.BURST) + + # Callback for position + # def onPos(x, y, pitch, roll): + # pitch = "Pitch: {}".format(pitch).ljust(16) + # roll = "Roll: {}".format(roll).ljust(16) + # print("{}{}(x, y): ({}, {})".format(pitch, roll, x, y)) + + # Add the event callback to the wand + # wand.position_id = wand.on("position", onPos) + + # Detect keyboard interrupt disconnect + except KeyboardInterrupt as e: + for wand in wands: + wand.disconnect() + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/kano_wand/constants.py b/kano_wand/constants.py index cc34b4a..8b66bab 100644 --- a/kano_wand/constants.py +++ b/kano_wand/constants.py @@ -1,3 +1,5 @@ +from enum import Enum + BUTTON = 1 NINE_AXIS = 2 ACCELEROMETER = 3 @@ -15,3 +17,5 @@ CHARACTERISTIC_UUIDS = { TEMPERATURE:("64a70014-f691-4b93-a6f4-0968f5b648f8"), MAGNETOMETER:("64a70021-f691-4b93-a6f4-0968f5b648f8") } # <--- Change to the characteristic you want to enable notifications from. + +