commit 915faf38fefe8b833005ecb2d4c3d006c786cbd5 Author: Lucas Oskorep Date: Mon Apr 20 11:36:20 2020 -0500 Adding in project diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..3754756 --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,17 @@ +MIT License +Copyright (c) 2018 YOUR NAME +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/kanowandasync/__init__.py b/kanowandasync/__init__.py new file mode 100644 index 0000000..d4c987c --- /dev/null +++ b/kanowandasync/__init__.py @@ -0,0 +1,3 @@ +from kanowandasync.shop import Shop +from kanowandasync.wand import Wand +from kanowandasync.constants import * \ No newline at end of file diff --git a/kanowandasync/constants.py b/kanowandasync/constants.py new file mode 100644 index 0000000..7d6aca7 --- /dev/null +++ b/kanowandasync/constants.py @@ -0,0 +1,38 @@ +from enum import Enum + +class INFO(Enum): + """Enum containing info UUIDs""" + SERVICE = '64a70010-f691-4b93-a6f4-0968f5b648f8' + ORGANIZATION_CHAR = '64a7000b-f691-4b93-a6f4-0968f5b648f8' + SOFTWARE_CHAR = '64a70013-f691-4b93-a6f4-0968f5b648f8' + HARDWARE_CHAR = '64a70001-f691-4b93-a6f4-0968f5b648f8' + +class IO(Enum): + """Enum containing _IO UUIDs""" + SERVICE = '64a70012-f691-4b93-a6f4-0968f5b648f8' + BATTERY_CHAR = '64a70007-f691-4b93-a6f4-0968f5b648f8' + USER_BUTTON_CHAR = '64a7000d-f691-4b93-a6f4-0968f5b648f8' + VIBRATOR_CHAR = '64a70008-f691-4b93-a6f4-0968f5b648f8' + LED_CHAR = '64a70009-f691-4b93-a6f4-0968f5b648f8' + KEEP_ALIVE_CHAR = '64a7000f-f691-4b93-a6f4-0968f5b648f8' + +class SENSOR(Enum): + """Enum containing sensor UUIDs""" + SERVICE = '64a70011-f691-4b93-a6f4-0968f5b648f8' + TEMP_CHAR = '64a70014-f691-4b93-a6f4-0968f5b648f8' + QUATERNIONS_CHAR = '64a70002-f691-4b93-a6f4-0968f5b648f8' + # RAW_CHAR = '64a7000a-f691-4b93-a6f4-0968f5b648f8' + # MOTION_CHAR = '64a7000c-f691-4b93-a6f4-0968f5b648f8' + MAGN_CALIBRATE_CHAR = '64a70021-f691-4b93-a6f4-0968f5b648f8' + QUATERNIONS_RESET_CHAR = '64a70004-f691-4b93-a6f4-0968f5b648f8' + + +class PATTERN(Enum): + """Enum for wand vibration patterns""" + REGULAR = 1 + SHORT = 2 + BURST = 3 + LONG = 4 + SHORT_LONG = 5 + SHORT_SHORT = 6 + BIG_PAUSE = 7 \ No newline at end of file diff --git a/kanowandasync/shop.py b/kanowandasync/shop.py new file mode 100644 index 0000000..ff3ba09 --- /dev/null +++ b/kanowandasync/shop.py @@ -0,0 +1,64 @@ +from bleak import discover +from .wand import Wand + +class Shop(object): + """ + A scanner class to connect to wands + """ + def __init__(self, shop_loop, wand_class=Wand, debug=False): + """ + Create a new scanner + + Keyword Arguments: + wand_class {class} -- Class to use when connecting to wand (default: {Wand}) + debug {bool} -- Print debug messages (default: {False}) + """ + self.shop_loop = shop_loop + self.wand_class = wand_class + self.debug = debug + self._name = None + self._prefix = None + self._mac = None + + async def scan(self, prefix="Kano-Wand", mac=None, timeout=2.0, connect=False): + """ + Scan for devices + + Keyword Arguments: + name {str} -- Name of the device to scan for (default: {None}) + prefix {str} -- Prefix of name of device to scan for (default: {"Kano-Wand"}) + mac {str} -- MAC Address of the device to scan for (default: {None}) + timeout {float} -- Timeout before returning from scan (default: {1.0}) + connect {bool} -- Connect to the wands automatically (default: {False}) + + Returns {Wand[]} -- Array of wand objects + """ + + if self.debug: + print("Scanning for {} seconds...".format(timeout)) + try: + prefix_check = not (prefix is None) + mac_check = not (mac is None) + assert prefix_check or mac_check + except AssertionError as e: + print("Either a name, prefix, or mac address must be provided to find a wand") + raise e + if prefix is not None: + self._prefix = prefix + elif mac is not None: + self._mac = mac + self.wands = [] + devices = await discover(timeout= timeout) + print(devices) + if self._prefix: + devices = list(filter(lambda x : x.name.startswith(self._prefix), devices)) + if self._mac: + devices = list(filter(lambda x : x.address == self.mac, devices)) + print(devices) + + self.wands = [self.wand_class(d.address, d.name, self.shop_loop) for d in devices] + print(self.wands) + if connect: + for wand in self.wands: + await wand.connect() + return self.wands diff --git a/kanowandasync/wand.py b/kanowandasync/wand.py new file mode 100644 index 0000000..d32c62d --- /dev/null +++ b/kanowandasync/wand.py @@ -0,0 +1,422 @@ +import asyncio +import numpy as np + +from .constants import * +from bleak import BleakClient + + +class Wand(object): + """ + A wand class to interact with the Kano wand + """ + + def __init__(self, device_addr, name, bot_loop, debug=True): + """ + Create a new wand + + Arguments: + device {bluepy.ScanEntry} -- Device information + + Keyword Arguments: + debug {bool} -- Print debug messages (default: {False}) + """ + self.debug = debug + self._dev = BleakClient(device_addr) + self.name = name + self.bot_loop = bot_loop + + if debug: + print("Wand: {}\n\rWand Mac: {}".format(self.name, self._dev.address)) + + # Notification stuff + self.connected = False + self._position_callbacks = {} + self._position_subscribed = False + self._button_callbacks = {} + self._button_subscribed = False + self._temperature_callbacks = {} + self._temperature_subscribed = False + self._battery_callbacks = {} + self._battery_subscribed = False + self._notification_thread = None + self._position_notification_handle = 41 + self._button_notification_handle = 33 + self._temp_notification_handle = 56 + self._battery_notification_handle = 23 + + async def connect(self): + if self.debug: + print("Connecting to {}...".format(self.name)) + + connected = await self._dev.connect() + if not connected: + raise Exception("ERROR NOT CONNECTED TO THE DEVICE") + + self.connected = True + await self.post_connect() + + if self.debug: + print("Connected to {}".format(self.name)) + + async def post_connect(self): + """ + Do anything necessary after connecting + """ + pass + + async def disconnect(self): + + await self._dev.disconnect() + self.connected = False + await self.post_disconnect() + + if self.debug: + print("Disconnected from {}".format(self.name)) + + async def post_disconnect(self): + """ + Do anything necessary after disconnecting + """ + pass + + async def get_organization(self): + """ + Get organization of device + Returns {str} -- Organization name + """ + result = await self._dev.read_gatt_char(INFO.ORGANIZATION_CHAR.value) + return result.decode("utf-8") + + async def get_software_version(self): + """ + Get software version + Returns {str} -- Version number + """ + result = await self._dev.read_gatt_char(INFO.SOFTWARE_CHAR.value) + return result.decode("utf-8") + + async def get_hardware_version(self): + + """ + Get hardware version + Returns {str} -- Hardware version + """ + result = await self._dev.read_gatt_char(INFO.HARDWARE_CHAR.value) + return result.decode("utf-8") + + async def get_battery(self): + """ + Get battery level (currently only returns 0) + Returns {str} -- Battery level + """ + result = await self._dev.read_gatt_char(IO.BATTERY_CHAR.value) + print(f"battery is {result}") + print(f"battery is {result.decode('utf-8')}") + return result.decode("utf-8") + + async def get_button(self): + """ + Get current button status + Returns {bool} -- Button pressed status + """ + data = await self._dev.read_gatt_char(IO.USER_BUTTON_CHAR.value) + return data[0] == 1 + + async def get_temperature(self): + """ + Get temperature + + Returns {str} -- Battery level + """ + # with self._lock: + # if not hasattr(self, "_temperature_handle"): + # handle = self._sensor_service.getCharacteristics(SENSOR.TEMP_CHAR.value)[0] + # self._temperature_handle = handle.getHandle() + result = await self._dev.read_gatt_char(SENSOR.TEMP_CHAR.value) + print(f"temp is {result}") + return result + + async def keep_alive(self): + """ + Keep the wand's connection active + + Returns {bytes} -- Status + """ + if self.debug: + print("Keeping wand alive.") + return await self._dev.write_gatt_char(IO.KEEP_ALIVE_CHAR.value, bytes([1]), response=True) + + async def vibrate(self, pattern=PATTERN.REGULAR): + """ + Vibrate wand with pattern + + Keyword Arguments: + pattern {kano_wand.PATTERN} -- Vibration pattern (default: {PATTERN.REGULAR}) + + Returns {bytes} -- Status + """ + if isinstance(pattern, PATTERN): + message = [pattern.value] + else: + message = [pattern] + if self.debug: + print("Setting Vibration to {}".format(message)) + return await self._dev.write_gatt_char(IO.VIBRATOR_CHAR.value, bytes(message), response=True) + + async def set_led(self, color="0x2185d0", on=True): + """ + Set the LED's color + + Keyword Arguments: + color {str} -- Color hex code (default: {"0x2185d0"}) + on {bool} -- Whether light is on or off (default: {True}) + + Returns {bytes} -- Status + """ + message = [] + if on: + message.append(1) + else: + message.append(0) + + color = int(color.replace("#", ""), 16) + r = (color >> 16) & 255 + g = (color >> 8) & 255 + b = color & 255 + rgb = (((r & 248) << 8) + ((g & 252) << 3) + ((b & 248) >> 3)) + message.append(rgb >> 8) + message.append(rgb & 0xff) + + if self.debug: + print("Setting LED to {}".format(message)) + return await self._dev.write_gatt_char(IO.LED_CHAR.value, bytes(message), response=True) + + async def subscribe_position(self): + """ + Subscribe to position notifications and start thread if necessary + """ + if self.debug: + print("Subscribing to position notification") + self._position_subscribed = True + await self._dev.start_notify(SENSOR.QUATERNIONS_CHAR.value, self.handle_notification) + + async def unsubscribe_position(self, continue_notifications=False): + """ + Unsubscribe to position notifications + + Keyword Arguments: + continue_notifications {bool} -- Keep notification thread running (default: {False}) + """ + if self.debug: + print("Unsubscribing from position notification") + + self._position_subscribed = continue_notifications + await self._dev.stop_notify(SENSOR.QUATERNIONS_CHAR.value) + + async def subscribe_button(self): + """ + Subscribe to button notifications and start thread if necessary + """ + if self.debug: + print("Subscribing to button notification") + + self._button_subscribed = True + await self._dev.start_notify(IO.USER_BUTTON_CHAR.value, self.handle_notification) + + async def unsubscribe_button(self, continue_notifications=False): + """ + Unsubscribe to button notifications + + Keyword Arguments: + continue_notifications {bool} -- Keep notification thread running (default: {False}) + """ + if self.debug: + print("Unsubscribing from button notification") + self._button_subscribed = continue_notifications + await self._dev.stop_notify(IO.USER_BUTTON_CHAR.value) + + async def subscribe_temperature(self): + """ + Subscribe to temperature notifications and start thread if necessary + """ + if self.debug: + print("Subscribing to temperature notification") + + self._temperature_subscribed = True + await self._dev.start_notify(SENSOR.TEMP_CHAR.value, self.handle_notification) + + async def unsubscribe_temperature(self, continue_notifications=False): + """ + Unsubscribe to temperature notifications + + Keyword Arguments: + continue_notifications {bool} -- Keep notification thread running (default: {False}) + """ + if self.debug: + print("Unsubscribing from temperature notification") + + self._temperature_subscribed = continue_notifications + await self._dev.stop_notify(SENSOR.TEMP_CHAR.value) + + # + # + async def subscribe_battery(self): + """ + Subscribe to battery notifications and start thread if necessary + """ + if self.debug: + print("Subscribing to battery notification") + + self._battery_subscribed = True + await self._dev.start_notify(IO.BATTERY_CHAR.value, self.handle_notification) + + async def unsubscribe_battery(self, continue_notifications=False): + """ + Unsubscribe to battery notifications + + Keyword Arguments: + continue_notifications {bool} -- Keep notification thread running (default: {False}) + """ + if self.debug: + print("Unsubscribing from battery notification") + + self._battery_subscribed = continue_notifications + await self._dev.stop_notify(IO.BATTERY_CHAR.value) + + async def _on_position(self, data): + """ + Private function for position notification + + Arguments: + data {bytes} -- Data from device + """ + # I got part of this from Kano's node module and modified it + y = np.int16(np.uint16(int.from_bytes(data[0:2], byteorder='little'))) + x = -1 * np.int16(np.uint16(int.from_bytes(data[2:4], byteorder='little'))) + w = -1 * np.int16(np.uint16(int.from_bytes(data[4:6], byteorder='little'))) + z = np.int16(np.uint16(int.from_bytes(data[6:8], byteorder='little'))) + + if self.debug: + pitch = "Pitch: {}".format(z).ljust(16) + roll = "Roll: {}".format(w).ljust(16) + # print("{}{}(x, y): ({}, {})".format(pitch, roll, x, y)) + + await self.on_position(x, y, z, w) + for callback in self._position_callbacks.values(): + await callback(x, y, z, w) + + async def on_position(self, x, y, pitch, roll): + """ + Function called on position notification + + Arguments: + x {int} -- X position of wand (Between -1000 and 1000) + y {int} -- Y position of wand (Between -1000 and 1000) + pitch {int} -- Pitch of wand (Between -1000 and 1000) + roll {int} -- Roll of wand (Between -1000 and 1000) + """ + pass + + async def reset_position(self): + """ + Reset the quaternains of the wand + """ + if self.debug: + print("resetting the quarternion position") + return await self._dev.write_gatt_char(SENSOR.QUATERNIONS_RESET_CHAR.value, bytes([1]), response=True) + + async def _on_button(self, data): + """ + Private function for button notification + + Arguments: + data {bytes} -- Data from device + """ + val = data[0] == 1 + + if self.debug: + print("Button: {}".format(val)) + + await self.on_button(val) + for callback in self._button_callbacks.values(): + await callback(val) + + async def on_button(self, value): + """ + Function called on button notification + + Arguments: + pressed {bool} -- If button is pressed + """ + pass + + async def _on_temperature(self, data): + """ + Private function for temperature notification + + Arguments: + data {bytes} -- Data from device + """ + val = np.int16(np.uint16(int.from_bytes(data[0:2], byteorder='little'))) + + if self.debug: + print("Temperature: {}".format(val)) + + await self.on_temperature(val) + for callback in self._temperature_callbacks.values(): + await callback(val) + + async def on_temperature(self, value): + """ + Function called on temperature notification + + Arguments: + value {int} -- Temperature of the wand + """ + pass + + async def _on_battery(self, data): + """ + Private function for battery notification + + Arguments: + data {bytes} -- Data from device + """ + val = data[0] + + if self.debug: + print("Battery: {}".format(val)) + print("subscribing to the") + + await self.on_battery(val) + for callback in self._battery_callbacks.values(): + await callback(val) + + async def on_battery(self, value): + """ + Function called on battery notification + + Arguments: + value {int} -- Battery level of the wand + """ + pass + + def handle_notification(self, sender, data): + """ + Handle notifications subscribed to + + Arguments: + cHandle {int} -- Handle of notification + data {bytes} -- Data from device + """ + future = None + if sender == SENSOR.QUATERNIONS_CHAR.value: + future = asyncio.run_coroutine_threadsafe(self._on_position(data), self.bot_loop) + elif sender == IO.USER_BUTTON_CHAR.value: + future = asyncio.run_coroutine_threadsafe(self._on_button(data), self.bot_loop) + elif sender == SENSOR.TEMP_CHAR.value: + future = asyncio.run_coroutine_threadsafe(self._on_temperature(data), self.bot_loop) + elif sender == IO.BATTERY_CHAR.value: + future = asyncio.run_coroutine_threadsafe(self._on_battery(data), self.bot_loop) + if future != None: + future.result() diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..9d5f797 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,3 @@ +# Inside of setup.cfg +[metadata] +description-file = README.md \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..4eedb43 --- /dev/null +++ b/setup.py @@ -0,0 +1,32 @@ +from distutils.core import setup +setup( + name = 'kanowandasync', # How you named your package folder (MyLib) + packages = ['kanowandasync'], # Chose the same as "name" + version = '0.1', # Start with a small number and increase it with every change you make + license='MIT', # Chose a license from here: https://help.github.com/articles/licensing-a-repository + description = + """ + This package allows users to access the BLE capabilities of their Kano coding wand using an asynchronous python API. + + API design taken from - https://github.com/GammaGames/kano_wand and then adapted and updated for asynchronous python. + + New version is asynchronous and in addition is not cross playform due to its usage of pybluez instead of BLEAK for BLE access. + """, # Give a short description about your library + author = 'Lucas Oskorep', # Type in your name + author_email = 'lucas.oskorep@gmail.com', # Type in your E-Mail + url = '', # Provide either the link to your github or to your website + download_url = 'https://github.com/user/reponame/archive/v_01.tar.gz', # I explain this later on + keywords = ['smarthome', 'smartwand', 'smart home', 'smart wand', 'wand', 'kano', 'kit', 'kano wand kit', 'async', 'kano wand async'], # Keywords that define your package best + install_requires=[ # I get to this in a second + 'bleak>=0.5.0', + ], + classifiers=[ + 'Development Status :: 3 - Alpha', # Chose either "3 - Alpha", "4 - Beta" or "5 - Production/Stable" as the current state of your package + 'Intended Audience :: Developers', # Define that your audience are developers + 'Topic :: Software Development :: Build Tools', + 'License :: OSI Approved :: MIT License', # Again, pick a license + 'Programming Language :: Python :: 3', #Specify which pyhton versions that you want to support + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', + ], +) \ No newline at end of file