WEEK14 Interface and application programming
WEEK14 Interface and application programming
Group Assignment
compare as many tool options as possible
Interface and application programming
Individual Assignment
write an application that interfaces a user with an input &/or output device that you made
For this assignment, we will continue to work on the content related to the Final Project. The MCU of the final project was changed from ESC32C3 to nRF52840.
ChatGPT Prompt
Please modify the code from Assignment 13, which sends accelerometer and gyroscope data from the MPU-6050 via Bluetooth from the ESC32C3, to work on the nRF52840.
#include <bluefruit.h>
#include <Wire.h>
#include <LSM6DS3.h>
LSM6DS3 IMU(I2C_MODE, 0x6A);
BLEUart bleuart; // BLE UART Service
void setup() {
// Initialization code remains as it is
Wire.begin();
if (IMU.begin() != 0) {
// Remain in infinite loop if IMU initialization fails
while (true); // Infinite loop to halt operation
}
IMU.settings.gyroRange = 2000; // Gyroscope range to 2000 dps
IMU.settings.accelRange = 4; // Accelerometer range to ±4G
// Setup BLE
Bluefruit.begin();
Bluefruit.setTxPower(4); // Set maximum transmission power
Bluefruit.setName("LSM6DS3 Sensor");
// Start BLE UART service
bleuart.begin();
// Setup and start advertising
Bluefruit.Advertising.addFlags(BLE_GAP_ADV_FLAGS_LE_ONLY_GENERAL_DISC_MODE);
Bluefruit.Advertising.addTxPower();
Bluefruit.Advertising.addService(bleuart); // Advertise UART service
Bluefruit.Advertising.addName();
Bluefruit.Advertising.restartOnDisconnect(true);
Bluefruit.Advertising.start(0); // 0 = Advertise indefinitely
}
void loop() {
if (Bluefruit.connected()) {
// Read from the sensor
float gyroX = IMU.readFloatGyroX();
float gyroY = IMU.readFloatGyroY();
float gyroZ = IMU.readFloatGyroZ();
float accelX = IMU.readFloatAccelX();
float accelY = IMU.readFloatAccelY();
float accelZ = IMU.readFloatAccelZ();
// Prepare accelerometer data string
char bufferAccel[64];
snprintf(bufferAccel, sizeof(bufferAccel), "A,%.2f,%.2f,%.2f",
accelX, accelY, accelZ);
// Send accelerometer data via BLE UART
bleuart.write(bufferAccel);
// Wait briefly to prevent data concatenation
delay(200);
// Prepare gyroscope data string
char bufferGyro[64];
snprintf(bufferGyro, sizeof(bufferGyro), "G,%.2f,%.2f,%.2f",
gyroX, gyroY, gyroZ);
// Send gyroscope data via BLE UART
bleuart.write(bufferGyro);
// Delay before the next transmission
delay(1000);
}
}
ChatGPT Prompt
Please write a Python code to display data obtained from running the above code on a Windows PC via Bluetooth. Use the Kivy library for the GUI. Display a menu bar at the top of the screen, and the program will terminate through a Quit menu that appears when clicked.
from kivy.app import App
from kivy.uix.dropdown import DropDown
from kivy.uix.button import Button
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.label import Label
from kivy.uix.scrollview import ScrollView
from kivy.core.window import Window
from kivy.clock import Clock
import asyncio
from bleak import BleakClient
import threading
# Set window size
Window.size = (720, 600)
# Custom dropdown menu class
class CustomDropDown(DropDown):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.auto_width = False
self.width = Window.width
self.max_height = 200
def add_widget(self, widget):
# Handle button widgets specifically
if isinstance(widget, Button):
widget.halign = 'center'
widget.valign = 'middle'
widget.text_size = (None, None)
widget.bind(size=self.update_button_padding, texture_size=self.update_button_padding)
widget.bind(text=self.update_button_padding)
super().add_widget(widget)
def update_button_padding(self, button, *args):
# Update button text padding
button.text_size = (button.width, None)
button.texture_update()
button.padding_x = (button.width - button.texture_size[0]) / 2
# Main application class
class GetAccGyrodata_app(App):
def build(self):
# Create main layout
main_layout = BoxLayout(orientation='vertical')
# Create main button
self.mainbutton = Button(
text='MENU', size_hint=(1, None),
height=75
)
main_layout.add_widget(self.mainbutton)
# Create custom dropdown menu
self.dropdown = CustomDropDown()
# Create quit button
btn = Button(text='QUIT', size_hint_y=None, height=40)
btn.bind(on_release=self.stop)
self.dropdown.add_widget(btn)
# Bind events to open dropdown menu
self.mainbutton.bind(on_release=self.open_dropdown)
self.dropdown.bind(on_select=lambda instance, x: setattr(self.mainbutton, 'text', x))
# Create header label for column names
self.header_label = Label(text='AccX AccY AccZ GyX GyY GyZ', size_hint=(1, None), bold=True)
self.header_label.halign = 'left'
self.header_label.valign = 'top'
self.header_label.padding = (10, 10) # Add 10 pixel padding on the left
# Create label to display data
self.data_label = Label(text='', size_hint=(1, None))
self.data_label.halign = 'left'
self.data_label.valign = 'top'
self.data_label.padding = (10, 0) # Add 10 pixel padding on the left
# Create scroll view to contain the data label
self.scroll_view = ScrollView(size_hint=(1, 1))
self.scroll_view.add_widget(self.data_label)
main_layout.add_widget(self.header_label)
main_layout.add_widget(self.scroll_view)
return main_layout
def open_dropdown(self, widget):
# Open dropdown menu
self.dropdown.open(widget)
def stop(self, *args):
# Stop the application
self.running = False
App.get_running_app().stop()
def on_start(self):
# Start BLE connection thread on application start
self.running = True
self.acc_buffer = None
threading.Thread(target=self.bleak_thread, daemon=True).start()
def bleak_thread(self):
# Run BLE connection and data reception in asyncio runtime
asyncio.run(self.connect_and_receive_data())
async def connect_and_receive_data(self):
# BLE device address and service UUIDs
DEVICE_ADDRESS = "DE:7C:F5:14:DF:65"
UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
UART_RX_CHARACTERISTIC_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
found_device = False
async with BleakClient(DEVICE_ADDRESS) as client:
try:
connected = await client.is_connected()
if not connected:
print("Unable to connect to the device!")
await asyncio.sleep(1)
return
print(f"Connected to device with address {DEVICE_ADDRESS}!")
found_device = True
# Data reception handler
def handle_notification(char_uuid, data):
if self.running:
self.buffer_data(data.decode().strip())
await client.start_notify(UART_RX_CHARACTERISTIC_UUID, handle_notification)
print("Receiving data...")
while self.running:
await asyncio.sleep(1)
await client.stop_notify(UART_RX_CHARACTERISTIC_UUID)
print("Disconnected from device.")
except Exception as e:
print(f"Error: {str(e)}")
await asyncio.sleep(1)
if not found_device:
print(f"Device with address {DEVICE_ADDRESS} was not found.")
def buffer_data(self, data):
try:
print(f"Received data: {data}") # Debugging output
if data.startswith("A"):
self.acc_buffer = data[2:] # Remove "A,"
elif data.startswith("G") and self.acc_buffer:
gyro_data = data[2:] # Remove "G,"
complete_data = f"{self.acc_buffer},{gyro_data}" # Separate by comma
Clock.schedule_once(lambda dt: self.update_data_label(self.format_data(complete_data)), 0)
self.acc_buffer = None
except Exception as e:
print(f"Data parsing error: {str(e)}")
def format_data(self, data):
# Format data with fixed width
parts = data.split(',')
print(f"Formatted parts: {parts}") # Debugging output
# Fill with empty strings if less than 6 parts
while len(parts) < 6:
parts.append('')
formatted_data = "{:<10} {:<10} {:<10} {:<10} {:<10} {:<10}".format(*parts)
print(f"Formatted data: {formatted_data}") # Debugging output
return formatted_data
def update_data_label(self, data):
# Add received data to label
print(f"Updating label with data: {data}") # Debugging output
self.data_label.text = data + '\n' + self.data_label.text
self.data_label.texture_update()
self.data_label.size = self.data_label.texture_size
if self.data_label.height > self.scroll_view.height:
self.scroll_view.scroll_y = 1
if __name__ == "__main__":
# Run the application
GetAccGyrodata_app().run()
Changed the receiving sensor data display from two lines to one line. Improved readability by converting commas to spaces.
Ideally, I would like to present the UI for the Final Project, but since the training data has not been obtained yet, I displayed the data in the environment where Python with Kivy & Bleak libraries, which are decided to be used in the Final Project, is installed.
I was concerned about displaying the GUI in Python, but this assignment has shown that receiving and displaying data via BLE is not an issue.
However, the compatibility between the graphing library Matplotlib and Kivy seems to be lacking at this stage. Specifically, there were issues with handling resize events when displaying Matplotlib graphs. In the future, it should be possible to create a 3D graph of the swing trajectory.
The next step is to investigate solutions for the integration issues between Matplotlib and Kivy and, if needed, consider alternative graphing libraries. Once the training data is obtained, I will proceed with the development of the UI for the final project.
Developing a GUI application using Kivy was a bit time-consuming, and it was difficult to identify errors. I learned the importance of patiently debugging.
Execution Result
UI for my final project
Each time the racket is swung, the swing type, speed, and the number of shadow swings for each swing are displayed.
Code
In this final project, I develop for the PC environment using Python with Kivy and Bleak installed. In recent times, with the frequent use of ChatGPT, I have learned that when creating code, starting by sending prompts to create small sample programs according to the purpose can make the process smoother.
from kivy.app import App
from kivy.uix.dropdown import DropDown
from kivy.uix.button import Button
from kivy.uix.floatlayout import FloatLayout
from kivy.uix.label import Label
from kivy.uix.image import Image
from kivy.core.window import Window
from kivy.clock import Clock, mainthread
import asyncio
from bleak import BleakClient, BleakError
import threading
Window.size = (720, 600)
class CustomDropDown(DropDown):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.auto_width = False
self.width = Window.width
self.max_height = 200
def add_widget(self, widget):
if isinstance(widget, Button):
widget.halign = 'center'
widget.valign = 'middle'
widget.text_size = (None, None)
widget.bind(size=self.update_button_padding, texture_size=self.update_button_padding)
widget.bind(text=self.update_button_padding)
super().add_widget(widget)
def update_button_padding(self, button, *args):
button.text_size = (button.width, None)
button.texture_update()
button.padding_x = (button.width - button.texture_size[0]) / 2
class GetAccGyrodata_app(App):
def build(self):
self.counts = {1: 0, 2: 0, 3: 0, 4: 0}
self.data_buffer = {"S": None, "V": None}
self.is_sleeping = False
self.client = None
self.image_paths = {
1: "/mnt/data/1.jpg",
2: "/mnt/data/2.jpg",
3: "/mnt/data/3.jpg",
4: "/mnt/data/4.jpg"
}
main_layout = FloatLayout()
self.mainbutton = Button(
text='MENU', size_hint=(1, None),
height=75, pos_hint={'top': 1}
)
main_layout.add_widget(self.mainbutton)
self.dropdown = CustomDropDown()
sleep_btn = Button(text='SLEEP', size_hint_y=None, height=40)
sleep_btn.bind(on_release=self.sleep)
self.dropdown.add_widget(sleep_btn)
quit_btn = Button(text='QUIT', size_hint_y=None, height=40)
quit_btn.bind(on_release=self.stop)
self.dropdown.add_widget(quit_btn)
self.mainbutton.bind(on_release=self.open_dropdown)
self.dropdown.bind(on_select=lambda instance, x: setattr(self.mainbutton, 'text', x))
self.status_label = Label(text='Connecting to device...', size_hint=(1, None), font_size=48, pos_hint={'top': 0.95})
self.smash_label = Label(text='Waiting for data...', size_hint=(1, None), font_size=48, pos_hint={'top': 0.8})
self.speed_label = Label(text='', size_hint=(1, None), font_size=48, pos_hint={'top': 0.65})
self.reps_label = Label(text='', size_hint=(1, None), font_size=48, pos_hint={'top': 0.5})
self.image = Image(size_hint=(1, None), height=200, pos_hint={'top': 0.35})
main_layout.add_widget(self.status_label)
main_layout.add_widget(self.smash_label)
main_layout.add_widget(self.speed_label)
main_layout.add_widget(self.reps_label)
main_layout.add_widget(self.image)
return main_layout
def open_dropdown(self, widget):
self.dropdown.open(widget)
def sleep(self, instance):
print("Entering sleep mode...")
self.is_sleeping = True
self.update_status_label("Sleeping...")
# Disconnect the BLE client if connected
if self.client and self.client.is_connected:
asyncio.run_coroutine_threadsafe(self.client.disconnect(), asyncio.get_event_loop())
def wakeup(self):
print("Waking up from sleep mode...")
self.is_sleeping = False
self.update_status_label("Reconnecting...")
asyncio.run_coroutine_threadsafe(self.reconnect(), asyncio.get_event_loop())
def stop(self, *args):
self.running = False
if self.client:
asyncio.run_coroutine_threadsafe(self.client.disconnect(), asyncio.get_event_loop())
App.get_running_app().stop()
def on_start(self):
self.running = True
threading.Thread(target=self.bleak_thread, daemon=True).start()
def bleak_thread(self):
asyncio.run(self.connect_and_receive_data())
async def connect_and_receive_data(self):
DEVICE_ADDRESS = "E6:B0:FA:69:60:0B"
UART_RX_CHARACTERISTIC_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
while self.running:
if self.is_sleeping:
await asyncio.sleep(1)
continue
try:
self.client = BleakClient(DEVICE_ADDRESS, disconnected_callback=self.on_disconnected)
await self.client.connect()
if not self.client.is_connected:
print("Unable to connect to the device!")
self.update_status_label("Disconnected from device")
await asyncio.sleep(5)
continue
print(f"Connected to device with address {DEVICE_ADDRESS}!")
self.update_status_label("Connected to device")
def handle_notification(char_uuid, data):
if self.running and not self.is_sleeping:
decoded_data = data.decode()
print(f"Received data: {decoded_data}") # Print received data to terminal
self.process_received_data(decoded_data)
await self.client.start_notify(UART_RX_CHARACTERISTIC_UUID, handle_notification)
print("Receiving data...")
while self.running and not self.is_sleeping:
await asyncio.sleep(1)
await self.client.stop_notify(UART_RX_CHARACTERISTIC_UUID)
print("Disconnected from device.")
self.update_status_label("Disconnected from device")
except BleakError as e:
print(f"Error: {str(e)}")
self.update_status_label("Disconnected from device")
await asyncio.sleep(5)
def on_disconnected(self, client):
if self.is_sleeping:
return
print("Device was disconnected, callback activated.")
self.update_status_label("Disconnected from device")
if self.running and not self.is_sleeping:
asyncio.run_coroutine_threadsafe(self.reconnect(), asyncio.get_event_loop())
async def reconnect(self):
while self.running and not self.is_sleeping:
try:
print("Attempting to reconnect...")
self.client = BleakClient("E6:B0:FA:69:60:0B", disconnected_callback=self.on_disconnected)
await self.client.connect()
if self.client.is_connected:
print("Reconnected successfully!")
self.update_status_label("Reconnected to device")
def handle_notification(char_uuid, data):
if self.running and not self.is_sleeping:
decoded_data = data.decode()
print(f"Received data: {decoded_data}") # Print received data to terminal
self.process_received_data(decoded_data)
await self.client.start_notify("6E400003-B5A3-F393-E0A9-E50E24DCCA9E", handle_notification)
return
except BleakError as e:
print(f"Reconnect error: {str(e)}")
await asyncio.sleep(5)
@mainthread
def update_status_label(self, status):
self.status_label.text = status
def process_received_data(self, data):
print(f"Processing data: {data}") # Debugging statement to print data to terminal
if "S:" in data:
number = int(data.split("S:")[1].split("-")[0])
self.data_buffer["S"] = number
self.update_image(number) # Update the image whenever "S:" data is received
elif "V:" in data:
speed = data.split("V:")[1].split("-")[0]
self.data_buffer["V"] = speed
if self.data_buffer["S"] is not None and self.data_buffer["V"] is not None:
self.update_data_label()
@mainthread
def update_image(self, number):
if number in self.image_paths:
self.image.source = self.image_paths[number]
self.image.reload()
@mainthread
def update_data_label(self):
number = self.data_buffer["S"]
speed = self.data_buffer["V"]
if number in self.counts:
self.counts[number] += 1
smash_type = ['Smash', 'Net', 'Drive', 'Lift'][number - 1]
smash_text = smash_type if smash_type else "unmeasurable"
speed_text = f"Speed: {speed}km/h" if speed else "unmeasurable"
reps_text = f"Reps: {self.counts[number]}"
self.smash_label.text = smash_text
self.speed_label.text = speed_text
self.reps_label.text = reps_text
# Clear the status label once data is received
self.status_label.text = ""
# Clear the buffer
self.data_buffer = {"S": None, "V": None}
if __name__ == "__main__":
GetAccGyrodata_app().run()