77 lines
2.3 KiB
Python
77 lines
2.3 KiB
Python
from KanoWandAsync import Shop, Wand
|
|
from KanoWandAsync.constants import *
|
|
import asyncio
|
|
import pandas as pd
|
|
from uuid import uuid4
|
|
from pathlib import Path
|
|
import os
|
|
global spell
|
|
def save_dataframe(data):
|
|
df = pd.DataFrame(data)
|
|
df.columns = ["x", "y", "pitch", "roll"]
|
|
path = Path(f"./data/{spell}/{uuid4()}.csv")
|
|
if not path.parent.exists():
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
df.to_csv(str(path), index=False)
|
|
|
|
|
|
class DataCollectWand(Wand):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.colors = ["#a333c8", "2185d0", "0x21ba45", "#fbbd08", "#f2711c", "#db2828"]
|
|
self.position_id = None
|
|
self.data = []
|
|
|
|
async def post_connect(self):
|
|
print("Connected to {}".format(self.name))
|
|
await self.subscribe_button()
|
|
|
|
async def on_button(self, pressed):
|
|
if pressed:
|
|
self.data = []
|
|
print("Button pressed")
|
|
await self.vibrate(PATTERN.SHORT)
|
|
await self.reset_position()
|
|
await self.subscribe_position()
|
|
else:
|
|
save_dataframe(self.data)
|
|
await self.unsubscribe_position()
|
|
|
|
async def on_position(self, x, y, pitch, roll):
|
|
# print(x, y, pitch, roll)
|
|
self.data.append([x, y, pitch, roll])
|
|
|
|
def get_prompt_text(spells):
|
|
strings = [f"{spells[spells == spell].index[0]} - {spell} " for spell in spells]
|
|
return "Enter number to select spell- \n" + "\n".join(strings)
|
|
|
|
async def main():
|
|
spells = pd.read_csv("spells.csv")["spells"]
|
|
shop = Shop(asyncio.get_running_loop(), wand_class=DataCollectWand, debug=True)
|
|
wands = []
|
|
x = input(get_prompt_text(spells))
|
|
|
|
while x != "quit":
|
|
try:
|
|
global spell
|
|
spell = spells[int(x)]
|
|
print(f"Currently chosen spell - {spell}")
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
try:
|
|
while len(wands) == 0:
|
|
print("Scanning...")
|
|
wands = await shop.scan(connect=True)
|
|
for wand in wands:
|
|
await wand.vibrate(PATTERN.BURST)
|
|
await asyncio.sleep(60)
|
|
except Exception as e:
|
|
print(e)
|
|
for wand in wands:
|
|
wand.disconnect()
|
|
x = input(get_prompt_text(spells))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |