Adding in project

This commit is contained in:
Lucas Oskorep
2020-04-20 11:36:20 -05:00
commit 915faf38fe
8 changed files with 579 additions and 0 deletions
+17
View File
@@ -0,0 +1,17 @@
MIT License
Copyright (c) 2018 YOUR NAME
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
View File
+3
View File
@@ -0,0 +1,3 @@
from kanowandasync.shop import Shop
from kanowandasync.wand import Wand
from kanowandasync.constants import *
+38
View File
@@ -0,0 +1,38 @@
from enum import Enum
class INFO(Enum):
"""Enum containing info UUIDs"""
SERVICE = '64a70010-f691-4b93-a6f4-0968f5b648f8'
ORGANIZATION_CHAR = '64a7000b-f691-4b93-a6f4-0968f5b648f8'
SOFTWARE_CHAR = '64a70013-f691-4b93-a6f4-0968f5b648f8'
HARDWARE_CHAR = '64a70001-f691-4b93-a6f4-0968f5b648f8'
class IO(Enum):
"""Enum containing _IO UUIDs"""
SERVICE = '64a70012-f691-4b93-a6f4-0968f5b648f8'
BATTERY_CHAR = '64a70007-f691-4b93-a6f4-0968f5b648f8'
USER_BUTTON_CHAR = '64a7000d-f691-4b93-a6f4-0968f5b648f8'
VIBRATOR_CHAR = '64a70008-f691-4b93-a6f4-0968f5b648f8'
LED_CHAR = '64a70009-f691-4b93-a6f4-0968f5b648f8'
KEEP_ALIVE_CHAR = '64a7000f-f691-4b93-a6f4-0968f5b648f8'
class SENSOR(Enum):
"""Enum containing sensor UUIDs"""
SERVICE = '64a70011-f691-4b93-a6f4-0968f5b648f8'
TEMP_CHAR = '64a70014-f691-4b93-a6f4-0968f5b648f8'
QUATERNIONS_CHAR = '64a70002-f691-4b93-a6f4-0968f5b648f8'
# RAW_CHAR = '64a7000a-f691-4b93-a6f4-0968f5b648f8'
# MOTION_CHAR = '64a7000c-f691-4b93-a6f4-0968f5b648f8'
MAGN_CALIBRATE_CHAR = '64a70021-f691-4b93-a6f4-0968f5b648f8'
QUATERNIONS_RESET_CHAR = '64a70004-f691-4b93-a6f4-0968f5b648f8'
class PATTERN(Enum):
"""Enum for wand vibration patterns"""
REGULAR = 1
SHORT = 2
BURST = 3
LONG = 4
SHORT_LONG = 5
SHORT_SHORT = 6
BIG_PAUSE = 7
+64
View File
@@ -0,0 +1,64 @@
from bleak import discover
from .wand import Wand
class Shop(object):
"""
A scanner class to connect to wands
"""
def __init__(self, shop_loop, wand_class=Wand, debug=False):
"""
Create a new scanner
Keyword Arguments:
wand_class {class} -- Class to use when connecting to wand (default: {Wand})
debug {bool} -- Print debug messages (default: {False})
"""
self.shop_loop = shop_loop
self.wand_class = wand_class
self.debug = debug
self._name = None
self._prefix = None
self._mac = None
async def scan(self, prefix="Kano-Wand", mac=None, timeout=2.0, connect=False):
"""
Scan for devices
Keyword Arguments:
name {str} -- Name of the device to scan for (default: {None})
prefix {str} -- Prefix of name of device to scan for (default: {"Kano-Wand"})
mac {str} -- MAC Address of the device to scan for (default: {None})
timeout {float} -- Timeout before returning from scan (default: {1.0})
connect {bool} -- Connect to the wands automatically (default: {False})
Returns {Wand[]} -- Array of wand objects
"""
if self.debug:
print("Scanning for {} seconds...".format(timeout))
try:
prefix_check = not (prefix is None)
mac_check = not (mac is None)
assert prefix_check or mac_check
except AssertionError as e:
print("Either a name, prefix, or mac address must be provided to find a wand")
raise e
if prefix is not None:
self._prefix = prefix
elif mac is not None:
self._mac = mac
self.wands = []
devices = await discover(timeout= timeout)
print(devices)
if self._prefix:
devices = list(filter(lambda x : x.name.startswith(self._prefix), devices))
if self._mac:
devices = list(filter(lambda x : x.address == self.mac, devices))
print(devices)
self.wands = [self.wand_class(d.address, d.name, self.shop_loop) for d in devices]
print(self.wands)
if connect:
for wand in self.wands:
await wand.connect()
return self.wands
+422
View File
@@ -0,0 +1,422 @@
import asyncio
import numpy as np
from .constants import *
from bleak import BleakClient
class Wand(object):
"""
A wand class to interact with the Kano wand
"""
def __init__(self, device_addr, name, bot_loop, debug=True):
"""
Create a new wand
Arguments:
device {bluepy.ScanEntry} -- Device information
Keyword Arguments:
debug {bool} -- Print debug messages (default: {False})
"""
self.debug = debug
self._dev = BleakClient(device_addr)
self.name = name
self.bot_loop = bot_loop
if debug:
print("Wand: {}\n\rWand Mac: {}".format(self.name, self._dev.address))
# Notification stuff
self.connected = False
self._position_callbacks = {}
self._position_subscribed = False
self._button_callbacks = {}
self._button_subscribed = False
self._temperature_callbacks = {}
self._temperature_subscribed = False
self._battery_callbacks = {}
self._battery_subscribed = False
self._notification_thread = None
self._position_notification_handle = 41
self._button_notification_handle = 33
self._temp_notification_handle = 56
self._battery_notification_handle = 23
async def connect(self):
if self.debug:
print("Connecting to {}...".format(self.name))
connected = await self._dev.connect()
if not connected:
raise Exception("ERROR NOT CONNECTED TO THE DEVICE")
self.connected = True
await self.post_connect()
if self.debug:
print("Connected to {}".format(self.name))
async def post_connect(self):
"""
Do anything necessary after connecting
"""
pass
async def disconnect(self):
await self._dev.disconnect()
self.connected = False
await self.post_disconnect()
if self.debug:
print("Disconnected from {}".format(self.name))
async def post_disconnect(self):
"""
Do anything necessary after disconnecting
"""
pass
async def get_organization(self):
"""
Get organization of device
Returns {str} -- Organization name
"""
result = await self._dev.read_gatt_char(INFO.ORGANIZATION_CHAR.value)
return result.decode("utf-8")
async def get_software_version(self):
"""
Get software version
Returns {str} -- Version number
"""
result = await self._dev.read_gatt_char(INFO.SOFTWARE_CHAR.value)
return result.decode("utf-8")
async def get_hardware_version(self):
"""
Get hardware version
Returns {str} -- Hardware version
"""
result = await self._dev.read_gatt_char(INFO.HARDWARE_CHAR.value)
return result.decode("utf-8")
async def get_battery(self):
"""
Get battery level (currently only returns 0)
Returns {str} -- Battery level
"""
result = await self._dev.read_gatt_char(IO.BATTERY_CHAR.value)
print(f"battery is {result}")
print(f"battery is {result.decode('utf-8')}")
return result.decode("utf-8")
async def get_button(self):
"""
Get current button status
Returns {bool} -- Button pressed status
"""
data = await self._dev.read_gatt_char(IO.USER_BUTTON_CHAR.value)
return data[0] == 1
async def get_temperature(self):
"""
Get temperature
Returns {str} -- Battery level
"""
# with self._lock:
# if not hasattr(self, "_temperature_handle"):
# handle = self._sensor_service.getCharacteristics(SENSOR.TEMP_CHAR.value)[0]
# self._temperature_handle = handle.getHandle()
result = await self._dev.read_gatt_char(SENSOR.TEMP_CHAR.value)
print(f"temp is {result}")
return result
async def keep_alive(self):
"""
Keep the wand's connection active
Returns {bytes} -- Status
"""
if self.debug:
print("Keeping wand alive.")
return await self._dev.write_gatt_char(IO.KEEP_ALIVE_CHAR.value, bytes([1]), response=True)
async def vibrate(self, pattern=PATTERN.REGULAR):
"""
Vibrate wand with pattern
Keyword Arguments:
pattern {kano_wand.PATTERN} -- Vibration pattern (default: {PATTERN.REGULAR})
Returns {bytes} -- Status
"""
if isinstance(pattern, PATTERN):
message = [pattern.value]
else:
message = [pattern]
if self.debug:
print("Setting Vibration to {}".format(message))
return await self._dev.write_gatt_char(IO.VIBRATOR_CHAR.value, bytes(message), response=True)
async def set_led(self, color="0x2185d0", on=True):
"""
Set the LED's color
Keyword Arguments:
color {str} -- Color hex code (default: {"0x2185d0"})
on {bool} -- Whether light is on or off (default: {True})
Returns {bytes} -- Status
"""
message = []
if on:
message.append(1)
else:
message.append(0)
color = int(color.replace("#", ""), 16)
r = (color >> 16) & 255
g = (color >> 8) & 255
b = color & 255
rgb = (((r & 248) << 8) + ((g & 252) << 3) + ((b & 248) >> 3))
message.append(rgb >> 8)
message.append(rgb & 0xff)
if self.debug:
print("Setting LED to {}".format(message))
return await self._dev.write_gatt_char(IO.LED_CHAR.value, bytes(message), response=True)
async def subscribe_position(self):
"""
Subscribe to position notifications and start thread if necessary
"""
if self.debug:
print("Subscribing to position notification")
self._position_subscribed = True
await self._dev.start_notify(SENSOR.QUATERNIONS_CHAR.value, self.handle_notification)
async def unsubscribe_position(self, continue_notifications=False):
"""
Unsubscribe to position notifications
Keyword Arguments:
continue_notifications {bool} -- Keep notification thread running (default: {False})
"""
if self.debug:
print("Unsubscribing from position notification")
self._position_subscribed = continue_notifications
await self._dev.stop_notify(SENSOR.QUATERNIONS_CHAR.value)
async def subscribe_button(self):
"""
Subscribe to button notifications and start thread if necessary
"""
if self.debug:
print("Subscribing to button notification")
self._button_subscribed = True
await self._dev.start_notify(IO.USER_BUTTON_CHAR.value, self.handle_notification)
async def unsubscribe_button(self, continue_notifications=False):
"""
Unsubscribe to button notifications
Keyword Arguments:
continue_notifications {bool} -- Keep notification thread running (default: {False})
"""
if self.debug:
print("Unsubscribing from button notification")
self._button_subscribed = continue_notifications
await self._dev.stop_notify(IO.USER_BUTTON_CHAR.value)
async def subscribe_temperature(self):
"""
Subscribe to temperature notifications and start thread if necessary
"""
if self.debug:
print("Subscribing to temperature notification")
self._temperature_subscribed = True
await self._dev.start_notify(SENSOR.TEMP_CHAR.value, self.handle_notification)
async def unsubscribe_temperature(self, continue_notifications=False):
"""
Unsubscribe to temperature notifications
Keyword Arguments:
continue_notifications {bool} -- Keep notification thread running (default: {False})
"""
if self.debug:
print("Unsubscribing from temperature notification")
self._temperature_subscribed = continue_notifications
await self._dev.stop_notify(SENSOR.TEMP_CHAR.value)
#
#
async def subscribe_battery(self):
"""
Subscribe to battery notifications and start thread if necessary
"""
if self.debug:
print("Subscribing to battery notification")
self._battery_subscribed = True
await self._dev.start_notify(IO.BATTERY_CHAR.value, self.handle_notification)
async def unsubscribe_battery(self, continue_notifications=False):
"""
Unsubscribe to battery notifications
Keyword Arguments:
continue_notifications {bool} -- Keep notification thread running (default: {False})
"""
if self.debug:
print("Unsubscribing from battery notification")
self._battery_subscribed = continue_notifications
await self._dev.stop_notify(IO.BATTERY_CHAR.value)
async def _on_position(self, data):
"""
Private function for position notification
Arguments:
data {bytes} -- Data from device
"""
# I got part of this from Kano's node module and modified it
y = np.int16(np.uint16(int.from_bytes(data[0:2], byteorder='little')))
x = -1 * np.int16(np.uint16(int.from_bytes(data[2:4], byteorder='little')))
w = -1 * np.int16(np.uint16(int.from_bytes(data[4:6], byteorder='little')))
z = np.int16(np.uint16(int.from_bytes(data[6:8], byteorder='little')))
if self.debug:
pitch = "Pitch: {}".format(z).ljust(16)
roll = "Roll: {}".format(w).ljust(16)
# print("{}{}(x, y): ({}, {})".format(pitch, roll, x, y))
await self.on_position(x, y, z, w)
for callback in self._position_callbacks.values():
await callback(x, y, z, w)
async def on_position(self, x, y, pitch, roll):
"""
Function called on position notification
Arguments:
x {int} -- X position of wand (Between -1000 and 1000)
y {int} -- Y position of wand (Between -1000 and 1000)
pitch {int} -- Pitch of wand (Between -1000 and 1000)
roll {int} -- Roll of wand (Between -1000 and 1000)
"""
pass
async def reset_position(self):
"""
Reset the quaternains of the wand
"""
if self.debug:
print("resetting the quarternion position")
return await self._dev.write_gatt_char(SENSOR.QUATERNIONS_RESET_CHAR.value, bytes([1]), response=True)
async def _on_button(self, data):
"""
Private function for button notification
Arguments:
data {bytes} -- Data from device
"""
val = data[0] == 1
if self.debug:
print("Button: {}".format(val))
await self.on_button(val)
for callback in self._button_callbacks.values():
await callback(val)
async def on_button(self, value):
"""
Function called on button notification
Arguments:
pressed {bool} -- If button is pressed
"""
pass
async def _on_temperature(self, data):
"""
Private function for temperature notification
Arguments:
data {bytes} -- Data from device
"""
val = np.int16(np.uint16(int.from_bytes(data[0:2], byteorder='little')))
if self.debug:
print("Temperature: {}".format(val))
await self.on_temperature(val)
for callback in self._temperature_callbacks.values():
await callback(val)
async def on_temperature(self, value):
"""
Function called on temperature notification
Arguments:
value {int} -- Temperature of the wand
"""
pass
async def _on_battery(self, data):
"""
Private function for battery notification
Arguments:
data {bytes} -- Data from device
"""
val = data[0]
if self.debug:
print("Battery: {}".format(val))
print("subscribing to the")
await self.on_battery(val)
for callback in self._battery_callbacks.values():
await callback(val)
async def on_battery(self, value):
"""
Function called on battery notification
Arguments:
value {int} -- Battery level of the wand
"""
pass
def handle_notification(self, sender, data):
"""
Handle notifications subscribed to
Arguments:
cHandle {int} -- Handle of notification
data {bytes} -- Data from device
"""
future = None
if sender == SENSOR.QUATERNIONS_CHAR.value:
future = asyncio.run_coroutine_threadsafe(self._on_position(data), self.bot_loop)
elif sender == IO.USER_BUTTON_CHAR.value:
future = asyncio.run_coroutine_threadsafe(self._on_button(data), self.bot_loop)
elif sender == SENSOR.TEMP_CHAR.value:
future = asyncio.run_coroutine_threadsafe(self._on_temperature(data), self.bot_loop)
elif sender == IO.BATTERY_CHAR.value:
future = asyncio.run_coroutine_threadsafe(self._on_battery(data), self.bot_loop)
if future != None:
future.result()
+3
View File
@@ -0,0 +1,3 @@
# Inside of setup.cfg
[metadata]
description-file = README.md
+32
View File
@@ -0,0 +1,32 @@
from distutils.core import setup
setup(
name = 'kanowandasync', # How you named your package folder (MyLib)
packages = ['kanowandasync'], # Chose the same as "name"
version = '0.1', # Start with a small number and increase it with every change you make
license='MIT', # Chose a license from here: https://help.github.com/articles/licensing-a-repository
description =
"""
This package allows users to access the BLE capabilities of their Kano coding wand using an asynchronous python API.
API design taken from - https://github.com/GammaGames/kano_wand and then adapted and updated for asynchronous python.
New version is asynchronous and in addition is not cross playform due to its usage of pybluez instead of BLEAK for BLE access.
""", # Give a short description about your library
author = 'Lucas Oskorep', # Type in your name
author_email = 'lucas.oskorep@gmail.com', # Type in your E-Mail
url = '', # Provide either the link to your github or to your website
download_url = 'https://github.com/user/reponame/archive/v_01.tar.gz', # I explain this later on
keywords = ['smarthome', 'smartwand', 'smart home', 'smart wand', 'wand', 'kano', 'kit', 'kano wand kit', 'async', 'kano wand async'], # Keywords that define your package best
install_requires=[ # I get to this in a second
'bleak>=0.5.0',
],
classifiers=[
'Development Status :: 3 - Alpha', # Chose either "3 - Alpha", "4 - Beta" or "5 - Production/Stable" as the current state of your package
'Intended Audience :: Developers', # Define that your audience are developers
'Topic :: Software Development :: Build Tools',
'License :: OSI Approved :: MIT License', # Again, pick a license
'Programming Language :: Python :: 3', #Specify which pyhton versions that you want to support
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
],
)