Ading data collection script for spell capturing
This commit is contained in:
@@ -0,0 +1,422 @@
|
||||
import asyncio
|
||||
import numpy as np
|
||||
|
||||
from .constants import *
|
||||
from bleak import BleakClient
|
||||
|
||||
|
||||
class Wand(object):
|
||||
"""
|
||||
A wand class to interact with the Kano wand
|
||||
"""
|
||||
|
||||
def __init__(self, device_addr, name, bot_loop, debug=True):
|
||||
"""
|
||||
Create a new wand
|
||||
|
||||
Arguments:
|
||||
device {bluepy.ScanEntry} -- Device information
|
||||
|
||||
Keyword Arguments:
|
||||
debug {bool} -- Print debug messages (default: {False})
|
||||
"""
|
||||
self.debug = debug
|
||||
self._dev = BleakClient(device_addr)
|
||||
self.name = name
|
||||
self.bot_loop = bot_loop
|
||||
|
||||
if debug:
|
||||
print("Wand: {}\n\rWand Mac: {}".format(self.name, self._dev.address))
|
||||
|
||||
# Notification stuff
|
||||
self.connected = False
|
||||
self._position_callbacks = {}
|
||||
self._position_subscribed = False
|
||||
self._button_callbacks = {}
|
||||
self._button_subscribed = False
|
||||
self._temperature_callbacks = {}
|
||||
self._temperature_subscribed = False
|
||||
self._battery_callbacks = {}
|
||||
self._battery_subscribed = False
|
||||
self._notification_thread = None
|
||||
self._position_notification_handle = 41
|
||||
self._button_notification_handle = 33
|
||||
self._temp_notification_handle = 56
|
||||
self._battery_notification_handle = 23
|
||||
|
||||
async def connect(self):
|
||||
if self.debug:
|
||||
print("Connecting to {}...".format(self.name))
|
||||
|
||||
connected = await self._dev.connect()
|
||||
if not connected:
|
||||
raise Exception("ERROR NOT CONNECTED TO THE DEVICE")
|
||||
|
||||
self.connected = True
|
||||
await self.post_connect()
|
||||
|
||||
if self.debug:
|
||||
print("Connected to {}".format(self.name))
|
||||
|
||||
async def post_connect(self):
|
||||
"""
|
||||
Do anything necessary after connecting
|
||||
"""
|
||||
pass
|
||||
|
||||
async def disconnect(self):
|
||||
|
||||
await self._dev.disconnect()
|
||||
self.connected = False
|
||||
await self.post_disconnect()
|
||||
|
||||
if self.debug:
|
||||
print("Disconnected from {}".format(self.name))
|
||||
|
||||
async def post_disconnect(self):
|
||||
"""
|
||||
Do anything necessary after disconnecting
|
||||
"""
|
||||
pass
|
||||
|
||||
async def get_organization(self):
|
||||
"""
|
||||
Get organization of device
|
||||
Returns {str} -- Organization name
|
||||
"""
|
||||
result = await self._dev.read_gatt_char(INFO.ORGANIZATION_CHAR.value)
|
||||
return result.decode("utf-8")
|
||||
|
||||
async def get_software_version(self):
|
||||
"""
|
||||
Get software version
|
||||
Returns {str} -- Version number
|
||||
"""
|
||||
result = await self._dev.read_gatt_char(INFO.SOFTWARE_CHAR.value)
|
||||
return result.decode("utf-8")
|
||||
|
||||
async def get_hardware_version(self):
|
||||
|
||||
"""
|
||||
Get hardware version
|
||||
Returns {str} -- Hardware version
|
||||
"""
|
||||
result = await self._dev.read_gatt_char(INFO.HARDWARE_CHAR.value)
|
||||
return result.decode("utf-8")
|
||||
|
||||
async def get_battery(self):
|
||||
"""
|
||||
Get battery level (currently only returns 0)
|
||||
Returns {str} -- Battery level
|
||||
"""
|
||||
result = await self._dev.read_gatt_char(IO.BATTERY_CHAR.value)
|
||||
print(f"battery is {result}")
|
||||
print(f"battery is {result.decode('utf-8')}")
|
||||
return result.decode("utf-8")
|
||||
|
||||
async def get_button(self):
|
||||
"""
|
||||
Get current button status
|
||||
Returns {bool} -- Button pressed status
|
||||
"""
|
||||
data = await self._dev.read_gatt_char(IO.USER_BUTTON_CHAR.value)
|
||||
return data[0] == 1
|
||||
|
||||
async def get_temperature(self):
|
||||
"""
|
||||
Get temperature
|
||||
|
||||
Returns {str} -- Battery level
|
||||
"""
|
||||
# with self._lock:
|
||||
# if not hasattr(self, "_temperature_handle"):
|
||||
# handle = self._sensor_service.getCharacteristics(SENSOR.TEMP_CHAR.value)[0]
|
||||
# self._temperature_handle = handle.getHandle()
|
||||
result = await self._dev.read_gatt_char(SENSOR.TEMP_CHAR.value)
|
||||
print(f"temp is {result}")
|
||||
return result
|
||||
|
||||
async def keep_alive(self):
|
||||
"""
|
||||
Keep the wand's connection active
|
||||
|
||||
Returns {bytes} -- Status
|
||||
"""
|
||||
if self.debug:
|
||||
print("Keeping wand alive.")
|
||||
return await self._dev.write_gatt_char(IO.KEEP_ALIVE_CHAR.value, bytes([1]), response=True)
|
||||
|
||||
async def vibrate(self, pattern=PATTERN.REGULAR):
|
||||
"""
|
||||
Vibrate wand with pattern
|
||||
|
||||
Keyword Arguments:
|
||||
pattern {kano_wand.PATTERN} -- Vibration pattern (default: {PATTERN.REGULAR})
|
||||
|
||||
Returns {bytes} -- Status
|
||||
"""
|
||||
if isinstance(pattern, PATTERN):
|
||||
message = [pattern.value]
|
||||
else:
|
||||
message = [pattern]
|
||||
if self.debug:
|
||||
print("Setting Vibration to {}".format(message))
|
||||
return await self._dev.write_gatt_char(IO.VIBRATOR_CHAR.value, bytes(message), response=True)
|
||||
|
||||
async def set_led(self, color="0x2185d0", on=True):
|
||||
"""
|
||||
Set the LED's color
|
||||
|
||||
Keyword Arguments:
|
||||
color {str} -- Color hex code (default: {"0x2185d0"})
|
||||
on {bool} -- Whether light is on or off (default: {True})
|
||||
|
||||
Returns {bytes} -- Status
|
||||
"""
|
||||
message = []
|
||||
if on:
|
||||
message.append(1)
|
||||
else:
|
||||
message.append(0)
|
||||
|
||||
color = int(color.replace("#", ""), 16)
|
||||
r = (color >> 16) & 255
|
||||
g = (color >> 8) & 255
|
||||
b = color & 255
|
||||
rgb = (((r & 248) << 8) + ((g & 252) << 3) + ((b & 248) >> 3))
|
||||
message.append(rgb >> 8)
|
||||
message.append(rgb & 0xff)
|
||||
|
||||
if self.debug:
|
||||
print("Setting LED to {}".format(message))
|
||||
return await self._dev.write_gatt_char(IO.LED_CHAR.value, bytes(message), response=True)
|
||||
|
||||
async def subscribe_position(self):
|
||||
"""
|
||||
Subscribe to position notifications and start thread if necessary
|
||||
"""
|
||||
if self.debug:
|
||||
print("Subscribing to position notification")
|
||||
self._position_subscribed = True
|
||||
await self._dev.start_notify(SENSOR.QUATERNIONS_CHAR.value, self.handle_notification)
|
||||
|
||||
async def unsubscribe_position(self, continue_notifications=False):
|
||||
"""
|
||||
Unsubscribe to position notifications
|
||||
|
||||
Keyword Arguments:
|
||||
continue_notifications {bool} -- Keep notification thread running (default: {False})
|
||||
"""
|
||||
if self.debug:
|
||||
print("Unsubscribing from position notification")
|
||||
|
||||
self._position_subscribed = continue_notifications
|
||||
await self._dev.stop_notify(SENSOR.QUATERNIONS_CHAR.value)
|
||||
|
||||
async def subscribe_button(self):
|
||||
"""
|
||||
Subscribe to button notifications and start thread if necessary
|
||||
"""
|
||||
if self.debug:
|
||||
print("Subscribing to button notification")
|
||||
|
||||
self._button_subscribed = True
|
||||
await self._dev.start_notify(IO.USER_BUTTON_CHAR.value, self.handle_notification)
|
||||
|
||||
async def unsubscribe_button(self, continue_notifications=False):
|
||||
"""
|
||||
Unsubscribe to button notifications
|
||||
|
||||
Keyword Arguments:
|
||||
continue_notifications {bool} -- Keep notification thread running (default: {False})
|
||||
"""
|
||||
if self.debug:
|
||||
print("Unsubscribing from button notification")
|
||||
self._button_subscribed = continue_notifications
|
||||
await self._dev.stop_notify(IO.USER_BUTTON_CHAR.value)
|
||||
|
||||
async def subscribe_temperature(self):
|
||||
"""
|
||||
Subscribe to temperature notifications and start thread if necessary
|
||||
"""
|
||||
if self.debug:
|
||||
print("Subscribing to temperature notification")
|
||||
|
||||
self._temperature_subscribed = True
|
||||
await self._dev.start_notify(SENSOR.TEMP_CHAR.value, self.handle_notification)
|
||||
|
||||
async def unsubscribe_temperature(self, continue_notifications=False):
|
||||
"""
|
||||
Unsubscribe to temperature notifications
|
||||
|
||||
Keyword Arguments:
|
||||
continue_notifications {bool} -- Keep notification thread running (default: {False})
|
||||
"""
|
||||
if self.debug:
|
||||
print("Unsubscribing from temperature notification")
|
||||
|
||||
self._temperature_subscribed = continue_notifications
|
||||
await self._dev.stop_notify(SENSOR.TEMP_CHAR.value)
|
||||
|
||||
#
|
||||
#
|
||||
async def subscribe_battery(self):
|
||||
"""
|
||||
Subscribe to battery notifications and start thread if necessary
|
||||
"""
|
||||
if self.debug:
|
||||
print("Subscribing to battery notification")
|
||||
|
||||
self._battery_subscribed = True
|
||||
await self._dev.start_notify(IO.BATTERY_CHAR.value, self.handle_notification)
|
||||
|
||||
async def unsubscribe_battery(self, continue_notifications=False):
|
||||
"""
|
||||
Unsubscribe to battery notifications
|
||||
|
||||
Keyword Arguments:
|
||||
continue_notifications {bool} -- Keep notification thread running (default: {False})
|
||||
"""
|
||||
if self.debug:
|
||||
print("Unsubscribing from battery notification")
|
||||
|
||||
self._battery_subscribed = continue_notifications
|
||||
await self._dev.stop_notify(IO.BATTERY_CHAR.value)
|
||||
|
||||
async def _on_position(self, data):
|
||||
"""
|
||||
Private function for position notification
|
||||
|
||||
Arguments:
|
||||
data {bytes} -- Data from device
|
||||
"""
|
||||
# I got part of this from Kano's node module and modified it
|
||||
y = np.int16(np.uint16(int.from_bytes(data[0:2], byteorder='little')))
|
||||
x = -1 * np.int16(np.uint16(int.from_bytes(data[2:4], byteorder='little')))
|
||||
w = -1 * np.int16(np.uint16(int.from_bytes(data[4:6], byteorder='little')))
|
||||
z = np.int16(np.uint16(int.from_bytes(data[6:8], byteorder='little')))
|
||||
|
||||
if self.debug:
|
||||
pitch = "Pitch: {}".format(z).ljust(16)
|
||||
roll = "Roll: {}".format(w).ljust(16)
|
||||
# print("{}{}(x, y): ({}, {})".format(pitch, roll, x, y))
|
||||
|
||||
await self.on_position(x, y, z, w)
|
||||
for callback in self._position_callbacks.values():
|
||||
await callback(x, y, z, w)
|
||||
|
||||
async def on_position(self, x, y, pitch, roll):
|
||||
"""
|
||||
Function called on position notification
|
||||
|
||||
Arguments:
|
||||
x {int} -- X position of wand (Between -1000 and 1000)
|
||||
y {int} -- Y position of wand (Between -1000 and 1000)
|
||||
pitch {int} -- Pitch of wand (Between -1000 and 1000)
|
||||
roll {int} -- Roll of wand (Between -1000 and 1000)
|
||||
"""
|
||||
pass
|
||||
|
||||
async def reset_position(self):
|
||||
"""
|
||||
Reset the quaternains of the wand
|
||||
"""
|
||||
if self.debug:
|
||||
print("resetting the quarternion position")
|
||||
return await self._dev.write_gatt_char(SENSOR.QUATERNIONS_RESET_CHAR.value, bytes([1]), response=True)
|
||||
|
||||
async def _on_button(self, data):
|
||||
"""
|
||||
Private function for button notification
|
||||
|
||||
Arguments:
|
||||
data {bytes} -- Data from device
|
||||
"""
|
||||
val = data[0] == 1
|
||||
|
||||
if self.debug:
|
||||
print("Button: {}".format(val))
|
||||
|
||||
await self.on_button(val)
|
||||
for callback in self._button_callbacks.values():
|
||||
await callback(val)
|
||||
|
||||
async def on_button(self, value):
|
||||
"""
|
||||
Function called on button notification
|
||||
|
||||
Arguments:
|
||||
pressed {bool} -- If button is pressed
|
||||
"""
|
||||
pass
|
||||
|
||||
async def _on_temperature(self, data):
|
||||
"""
|
||||
Private function for temperature notification
|
||||
|
||||
Arguments:
|
||||
data {bytes} -- Data from device
|
||||
"""
|
||||
val = np.int16(np.uint16(int.from_bytes(data[0:2], byteorder='little')))
|
||||
|
||||
if self.debug:
|
||||
print("Temperature: {}".format(val))
|
||||
|
||||
await self.on_temperature(val)
|
||||
for callback in self._temperature_callbacks.values():
|
||||
await callback(val)
|
||||
|
||||
async def on_temperature(self, value):
|
||||
"""
|
||||
Function called on temperature notification
|
||||
|
||||
Arguments:
|
||||
value {int} -- Temperature of the wand
|
||||
"""
|
||||
pass
|
||||
|
||||
async def _on_battery(self, data):
|
||||
"""
|
||||
Private function for battery notification
|
||||
|
||||
Arguments:
|
||||
data {bytes} -- Data from device
|
||||
"""
|
||||
val = data[0]
|
||||
|
||||
if self.debug:
|
||||
print("Battery: {}".format(val))
|
||||
print("subscribing to the")
|
||||
|
||||
await self.on_battery(val)
|
||||
for callback in self._battery_callbacks.values():
|
||||
await callback(val)
|
||||
|
||||
async def on_battery(self, value):
|
||||
"""
|
||||
Function called on battery notification
|
||||
|
||||
Arguments:
|
||||
value {int} -- Battery level of the wand
|
||||
"""
|
||||
pass
|
||||
|
||||
def handle_notification(self, sender, data):
|
||||
"""
|
||||
Handle notifications subscribed to
|
||||
|
||||
Arguments:
|
||||
cHandle {int} -- Handle of notification
|
||||
data {bytes} -- Data from device
|
||||
"""
|
||||
future = None
|
||||
if sender == SENSOR.QUATERNIONS_CHAR.value:
|
||||
future = asyncio.run_coroutine_threadsafe(self._on_position(data), self.bot_loop)
|
||||
elif sender == IO.USER_BUTTON_CHAR.value:
|
||||
future = asyncio.run_coroutine_threadsafe(self._on_button(data), self.bot_loop)
|
||||
elif sender == SENSOR.TEMP_CHAR.value:
|
||||
future = asyncio.run_coroutine_threadsafe(self._on_temperature(data), self.bot_loop)
|
||||
elif sender == IO.BATTERY_CHAR.value:
|
||||
future = asyncio.run_coroutine_threadsafe(self._on_battery(data), self.bot_loop)
|
||||
if future != None:
|
||||
future.result()
|
||||
Reference in New Issue
Block a user