from kanowandasync import Shop, Wand from kanowandasync.constants import * import asyncio # Custom wand class extending the default wand class MyWand(Wand): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.colors = ["#a333c8", "2185d0", "0x21ba45", "#fbbd08", "#f2711c", "#db2828"] self.position_id = None # Do some functions after connecting async def post_connect(self): print("Connected to {}".format(self.name)) # # Vibrate the wand and set its color to red # self.set_led(self.colors.pop()) # # Subscribe to notifications await self.subscribe_button() await self.subscribe_position() await self.get_organization() await self.subscribe_battery() await self.subscribe_temperature() # Button callback, automatically called after connecting to wand async def on_button(self, pressed): if pressed: await self.vibrate(PATTERN.BURST) # Unsubscribe from the position callback await self.set_led(self.colors.pop()) await self.reset_position() # Disconnect if we run out of colors if len(self.colors) == 0: await self.disconnect() async def on_position(self, x, y, pitch, roll): print(x, y, pitch, roll) async def on_battery(self, value): print("Battery:", value) async def on_temperature(self, value): print("Temperature", value) async def main(): # Create a new wand scanner shop = Shop(asyncio.get_running_loop(), wand_class=MyWand, debug=True) wands = [] try: # While we don't have any wands while len(wands) == 0: # Scan for wands and automatically connect print("Scanning...") wands = await shop.scan(connect=True) # For each wand (Only tested with one) for wand in wands: # Vibrate the wand and set its color to red await wand.vibrate(PATTERN.BURST) await asyncio.sleep(20) # Detect keyboard interrupt disconnect except KeyboardInterrupt as e: for wand in wands: wand.disconnect() if __name__ == "__main__": asyncio.run(main())