Fab Academy 2024 Website

  • Home
  • About
  • WEEK1 Project Management / Principles and Practices
  • WEEK2 Computer Aided Design
  • WEEK3 Computer-controlled cutting
  • WEEK4 Electronics production
  • WEEK5 3D scanning and printing
  • WEEK6 Embedded programming
  • WEEK7 Computer-controlled machining
  • WEEK8 Electronics design
  • WEEK9 Output devices
  • WEEK10 Mechanical design, Machine design
  • WEEK11 Input devices
  • WEEK12 Molding and casting
  • WEEK13 Networking and communications
  • WEEK14 Interface and application programming
  • WEEK15 Wildcard week
  • WEEK16 System integration
  • WEEK17 Applications and implications, project development
  • WEEK18 Invention, intellectual property, and income
  • Final
目次
  • WEEK14 Interface and application programming
    • Group Assignment
    • Individual Assignment
      • ChatGPT Prompt
      • ChatGPT Prompt
      • Execution Result
      • UI for my final project
        • Code

WEEK14 Interface and application programming

  1. Fab Academy 2024 Website
  2. WEEK14 Interface and application programming
hito |


WEEK14 Interface and application programming

Group Assignment

compare as many tool options as possible

Interface and application programming

Individual Assignment

write an application that interfaces a user with an input &/or output device that you made

For this assignment, we will continue to work on the content related to the Final Project. The MCU of the final project was changed from ESC32C3 to nRF52840.

nRF52840 datasheet
alt text
alt text
alt text

ChatGPT Prompt

Please modify the code from Assignment 13, which sends accelerometer and gyroscope data from the MPU-6050 via Bluetooth from the ESC32C3, to work on the nRF52840.

#include <bluefruit.h>
#include <Wire.h>
#include <LSM6DS3.h>

LSM6DS3 IMU(I2C_MODE, 0x6A);

BLEUart bleuart;  // BLE UART Service

void setup() {
  // Initialization code remains as it is
  Wire.begin();
  if (IMU.begin() != 0) {
    // Remain in infinite loop if IMU initialization fails
    while (true); // Infinite loop to halt operation
  }

  IMU.settings.gyroRange = 2000;   // Gyroscope range to 2000 dps
  IMU.settings.accelRange = 4;     // Accelerometer range to ±4G

  // Setup BLE
  Bluefruit.begin();
  Bluefruit.setTxPower(4);    // Set maximum transmission power
  Bluefruit.setName("LSM6DS3 Sensor");

  // Start BLE UART service
  bleuart.begin();

  // Setup and start advertising
  Bluefruit.Advertising.addFlags(BLE_GAP_ADV_FLAGS_LE_ONLY_GENERAL_DISC_MODE);
  Bluefruit.Advertising.addTxPower();
  Bluefruit.Advertising.addService(bleuart);  // Advertise UART service
  Bluefruit.Advertising.addName();
  Bluefruit.Advertising.restartOnDisconnect(true);
  Bluefruit.Advertising.start(0);  // 0 = Advertise indefinitely
}

void loop() {
  if (Bluefruit.connected()) {
    // Read from the sensor
    float gyroX = IMU.readFloatGyroX();
    float gyroY = IMU.readFloatGyroY();
    float gyroZ = IMU.readFloatGyroZ();
    float accelX = IMU.readFloatAccelX();
    float accelY = IMU.readFloatAccelY();
    float accelZ = IMU.readFloatAccelZ();

    // Prepare accelerometer data string
    char bufferAccel[64];
    snprintf(bufferAccel, sizeof(bufferAccel), "A,%.2f,%.2f,%.2f",
             accelX, accelY, accelZ);

    // Send accelerometer data via BLE UART
    bleuart.write(bufferAccel);

    // Wait briefly to prevent data concatenation
    delay(200);

    // Prepare gyroscope data string
    char bufferGyro[64];
    snprintf(bufferGyro, sizeof(bufferGyro), "G,%.2f,%.2f,%.2f",
             gyroX, gyroY, gyroZ);

    // Send gyroscope data via BLE UART
    bleuart.write(bufferGyro);

    // Delay before the next transmission
    delay(1000);
  }
}
ChatGPT Prompt

Please write a Python code to display data obtained from running the above code on a Windows PC via Bluetooth. Use the Kivy library for the GUI. Display a menu bar at the top of the screen, and the program will terminate through a Quit menu that appears when clicked.

from kivy.app import App
from kivy.uix.dropdown import DropDown
from kivy.uix.button import Button
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.label import Label
from kivy.uix.scrollview import ScrollView
from kivy.core.window import Window
from kivy.clock import Clock
import asyncio
from bleak import BleakClient
import threading

# Set window size
Window.size = (720, 600)

# Custom dropdown menu class
class CustomDropDown(DropDown):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.auto_width = False
        self.width = Window.width
        self.max_height = 200

    def add_widget(self, widget):
        # Handle button widgets specifically
        if isinstance(widget, Button):
            widget.halign = 'center'
            widget.valign = 'middle'
            widget.text_size = (None, None)
            widget.bind(size=self.update_button_padding, texture_size=self.update_button_padding)
            widget.bind(text=self.update_button_padding)
        super().add_widget(widget)

    def update_button_padding(self, button, *args):
        # Update button text padding
        button.text_size = (button.width, None)
        button.texture_update()
        button.padding_x = (button.width - button.texture_size[0]) / 2

# Main application class
class GetAccGyrodata_app(App):
    def build(self):
        # Create main layout
        main_layout = BoxLayout(orientation='vertical')

        # Create main button
        self.mainbutton = Button(
            text='MENU', size_hint=(1, None),
            height=75
        )
        main_layout.add_widget(self.mainbutton)

        # Create custom dropdown menu
        self.dropdown = CustomDropDown()

        # Create quit button
        btn = Button(text='QUIT', size_hint_y=None, height=40)
        btn.bind(on_release=self.stop)
        self.dropdown.add_widget(btn)

        # Bind events to open dropdown menu
        self.mainbutton.bind(on_release=self.open_dropdown)
        self.dropdown.bind(on_select=lambda instance, x: setattr(self.mainbutton, 'text', x))

        # Create header label for column names
        self.header_label = Label(text='AccX       AccY       AccZ       GyX       GyY       GyZ', size_hint=(1, None), bold=True)
        self.header_label.halign = 'left'
        self.header_label.valign = 'top'
        self.header_label.padding = (10, 10)  # Add 10 pixel padding on the left

        # Create label to display data
        self.data_label = Label(text='', size_hint=(1, None))
        self.data_label.halign = 'left'
        self.data_label.valign = 'top'
        self.data_label.padding = (10, 0)  # Add 10 pixel padding on the left

        # Create scroll view to contain the data label
        self.scroll_view = ScrollView(size_hint=(1, 1))
        self.scroll_view.add_widget(self.data_label)

        main_layout.add_widget(self.header_label)
        main_layout.add_widget(self.scroll_view)

        return main_layout

    def open_dropdown(self, widget):
        # Open dropdown menu
        self.dropdown.open(widget)

    def stop(self, *args):
        # Stop the application
        self.running = False
        App.get_running_app().stop()

    def on_start(self):
        # Start BLE connection thread on application start
        self.running = True
        self.acc_buffer = None
        threading.Thread(target=self.bleak_thread, daemon=True).start()

    def bleak_thread(self):
        # Run BLE connection and data reception in asyncio runtime
        asyncio.run(self.connect_and_receive_data())

    async def connect_and_receive_data(self):
        # BLE device address and service UUIDs
        DEVICE_ADDRESS = "DE:7C:F5:14:DF:65"
        UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
        UART_RX_CHARACTERISTIC_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"

        found_device = False

        async with BleakClient(DEVICE_ADDRESS) as client:
            try:
                connected = await client.is_connected()
                if not connected:
                    print("Unable to connect to the device!")
                    await asyncio.sleep(1)
                    return

                print(f"Connected to device with address {DEVICE_ADDRESS}!")
                found_device = True

                # Data reception handler
                def handle_notification(char_uuid, data):
                    if self.running:
                        self.buffer_data(data.decode().strip())

                await client.start_notify(UART_RX_CHARACTERISTIC_UUID, handle_notification)

                print("Receiving data...")
                while self.running:
                    await asyncio.sleep(1)

                await client.stop_notify(UART_RX_CHARACTERISTIC_UUID)
                print("Disconnected from device.")

            except Exception as e:
                print(f"Error: {str(e)}")
                await asyncio.sleep(1)

        if not found_device:
            print(f"Device with address {DEVICE_ADDRESS} was not found.")

    def buffer_data(self, data):
        try:
            print(f"Received data: {data}")  # Debugging output
            if data.startswith("A"):
                self.acc_buffer = data[2:]  # Remove "A,"
            elif data.startswith("G") and self.acc_buffer:
                gyro_data = data[2:]  # Remove "G,"
                complete_data = f"{self.acc_buffer},{gyro_data}"  # Separate by comma
                Clock.schedule_once(lambda dt: self.update_data_label(self.format_data(complete_data)), 0)
                self.acc_buffer = None
        except Exception as e:
            print(f"Data parsing error: {str(e)}")

    def format_data(self, data):
        # Format data with fixed width
        parts = data.split(',')
        print(f"Formatted parts: {parts}")  # Debugging output
        # Fill with empty strings if less than 6 parts
        while len(parts) < 6:
            parts.append('')
        formatted_data = "{:<10} {:<10} {:<10} {:<10} {:<10} {:<10}".format(*parts)
        print(f"Formatted data: {formatted_data}")  # Debugging output
        return formatted_data

    def update_data_label(self, data):
        # Add received data to label
        print(f"Updating label with data: {data}")  # Debugging output
        self.data_label.text = data + '\n' + self.data_label.text
        self.data_label.texture_update()
        self.data_label.size = self.data_label.texture_size
        if self.data_label.height > self.scroll_view.height:
            self.scroll_view.scroll_y = 1

if __name__ == "__main__":
    # Run the application
    GetAccGyrodata_app().run()

Changed the receiving sensor data display from two lines to one line. Improved readability by converting commas to spaces.

Ideally, I would like to present the UI for the Final Project, but since the training data has not been obtained yet, I displayed the data in the environment where Python with Kivy & Bleak libraries, which are decided to be used in the Final Project, is installed.

I was concerned about displaying the GUI in Python, but this assignment has shown that receiving and displaying data via BLE is not an issue.

However, the compatibility between the graphing library Matplotlib and Kivy seems to be lacking at this stage. Specifically, there were issues with handling resize events when displaying Matplotlib graphs. In the future, it should be possible to create a 3D graph of the swing trajectory.

The next step is to investigate solutions for the integration issues between Matplotlib and Kivy and, if needed, consider alternative graphing libraries. Once the training data is obtained, I will proceed with the development of the UI for the final project.

Developing a GUI application using Kivy was a bit time-consuming, and it was difficult to identify errors. I learned the importance of patiently debugging.

Execution Result

alt text


UI for my final project

Each time the racket is swung, the swing type, speed, and the number of shadow swings for each swing are displayed.

alt text alt text

Code

In this final project, I develop for the PC environment using Python with Kivy and Bleak installed. In recent times, with the frequent use of ChatGPT, I have learned that when creating code, starting by sending prompts to create small sample programs according to the purpose can make the process smoother.

from kivy.app import App
from kivy.uix.dropdown import DropDown
from kivy.uix.button import Button
from kivy.uix.floatlayout import FloatLayout
from kivy.uix.label import Label
from kivy.uix.image import Image
from kivy.core.window import Window
from kivy.clock import Clock, mainthread
import asyncio
from bleak import BleakClient, BleakError
import threading

Window.size = (720, 600)

class CustomDropDown(DropDown):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.auto_width = False
        self.width = Window.width
        self.max_height = 200

    def add_widget(self, widget):
        if isinstance(widget, Button):
            widget.halign = 'center'
            widget.valign = 'middle'
            widget.text_size = (None, None)
            widget.bind(size=self.update_button_padding, texture_size=self.update_button_padding)
            widget.bind(text=self.update_button_padding)
        super().add_widget(widget)

    def update_button_padding(self, button, *args):
        button.text_size = (button.width, None)
        button.texture_update()
        button.padding_x = (button.width - button.texture_size[0]) / 2

class GetAccGyrodata_app(App):
    def build(self):
        self.counts = {1: 0, 2: 0, 3: 0, 4: 0}
        self.data_buffer = {"S": None, "V": None}
        self.is_sleeping = False
        self.client = None
        self.image_paths = {
            1: "/mnt/data/1.jpg",
            2: "/mnt/data/2.jpg",
            3: "/mnt/data/3.jpg",
            4: "/mnt/data/4.jpg"
        }

        main_layout = FloatLayout()

        self.mainbutton = Button(
            text='MENU', size_hint=(1, None),
            height=75, pos_hint={'top': 1}
        )
        main_layout.add_widget(self.mainbutton)

        self.dropdown = CustomDropDown()

        sleep_btn = Button(text='SLEEP', size_hint_y=None, height=40)
        sleep_btn.bind(on_release=self.sleep)
        self.dropdown.add_widget(sleep_btn)

        quit_btn = Button(text='QUIT', size_hint_y=None, height=40)
        quit_btn.bind(on_release=self.stop)
        self.dropdown.add_widget(quit_btn)

        self.mainbutton.bind(on_release=self.open_dropdown)
        self.dropdown.bind(on_select=lambda instance, x: setattr(self.mainbutton, 'text', x))

        self.status_label = Label(text='Connecting to device...', size_hint=(1, None), font_size=48, pos_hint={'top': 0.95})
        self.smash_label = Label(text='Waiting for data...', size_hint=(1, None), font_size=48, pos_hint={'top': 0.8})
        self.speed_label = Label(text='', size_hint=(1, None), font_size=48, pos_hint={'top': 0.65})
        self.reps_label = Label(text='', size_hint=(1, None), font_size=48, pos_hint={'top': 0.5})
        self.image = Image(size_hint=(1, None), height=200, pos_hint={'top': 0.35})

        main_layout.add_widget(self.status_label)
        main_layout.add_widget(self.smash_label)
        main_layout.add_widget(self.speed_label)
        main_layout.add_widget(self.reps_label)
        main_layout.add_widget(self.image)

        return main_layout

    def open_dropdown(self, widget):
        self.dropdown.open(widget)

    def sleep(self, instance):
        print("Entering sleep mode...")
        self.is_sleeping = True
        self.update_status_label("Sleeping...")
        # Disconnect the BLE client if connected
        if self.client and self.client.is_connected:
            asyncio.run_coroutine_threadsafe(self.client.disconnect(), asyncio.get_event_loop())

    def wakeup(self):
        print("Waking up from sleep mode...")
        self.is_sleeping = False
        self.update_status_label("Reconnecting...")
        asyncio.run_coroutine_threadsafe(self.reconnect(), asyncio.get_event_loop())

    def stop(self, *args):
        self.running = False
        if self.client:
            asyncio.run_coroutine_threadsafe(self.client.disconnect(), asyncio.get_event_loop())
        App.get_running_app().stop()

    def on_start(self):
        self.running = True
        threading.Thread(target=self.bleak_thread, daemon=True).start()

    def bleak_thread(self):
        asyncio.run(self.connect_and_receive_data())

    async def connect_and_receive_data(self):
        DEVICE_ADDRESS = "E6:B0:FA:69:60:0B"
        UART_RX_CHARACTERISTIC_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"

        while self.running:
            if self.is_sleeping:
                await asyncio.sleep(1)
                continue

            try:
                self.client = BleakClient(DEVICE_ADDRESS, disconnected_callback=self.on_disconnected)
                await self.client.connect()
                if not self.client.is_connected:
                    print("Unable to connect to the device!")
                    self.update_status_label("Disconnected from device")
                    await asyncio.sleep(5)
                    continue

                print(f"Connected to device with address {DEVICE_ADDRESS}!")
                self.update_status_label("Connected to device")

                def handle_notification(char_uuid, data):
                    if self.running and not self.is_sleeping:
                        decoded_data = data.decode()
                        print(f"Received data: {decoded_data}")  # Print received data to terminal
                        self.process_received_data(decoded_data)

                await self.client.start_notify(UART_RX_CHARACTERISTIC_UUID, handle_notification)

                print("Receiving data...")
                while self.running and not self.is_sleeping:
                    await asyncio.sleep(1)

                await self.client.stop_notify(UART_RX_CHARACTERISTIC_UUID)
                print("Disconnected from device.")
                self.update_status_label("Disconnected from device")

            except BleakError as e:
                print(f"Error: {str(e)}")
                self.update_status_label("Disconnected from device")
                await asyncio.sleep(5)

    def on_disconnected(self, client):
        if self.is_sleeping:
            return
        print("Device was disconnected, callback activated.")
        self.update_status_label("Disconnected from device")
        if self.running and not self.is_sleeping:
            asyncio.run_coroutine_threadsafe(self.reconnect(), asyncio.get_event_loop())

    async def reconnect(self):
        while self.running and not self.is_sleeping:
            try:
                print("Attempting to reconnect...")
                self.client = BleakClient("E6:B0:FA:69:60:0B", disconnected_callback=self.on_disconnected)
                await self.client.connect()
                if self.client.is_connected:
                    print("Reconnected successfully!")
                    self.update_status_label("Reconnected to device")

                    def handle_notification(char_uuid, data):
                        if self.running and not self.is_sleeping:
                            decoded_data = data.decode()
                            print(f"Received data: {decoded_data}")  # Print received data to terminal
                            self.process_received_data(decoded_data)

                    await self.client.start_notify("6E400003-B5A3-F393-E0A9-E50E24DCCA9E", handle_notification)
                    return
            except BleakError as e:
                print(f"Reconnect error: {str(e)}")
            await asyncio.sleep(5)

    @mainthread
    def update_status_label(self, status):
        self.status_label.text = status

    def process_received_data(self, data):
        print(f"Processing data: {data}")  # Debugging statement to print data to terminal
        if "S:" in data:
            number = int(data.split("S:")[1].split("-")[0])
            self.data_buffer["S"] = number
            self.update_image(number)  # Update the image whenever "S:" data is received
        elif "V:" in data:
            speed = data.split("V:")[1].split("-")[0]
            self.data_buffer["V"] = speed

        if self.data_buffer["S"] is not None and self.data_buffer["V"] is not None:
            self.update_data_label()

    @mainthread
    def update_image(self, number):
        if number in self.image_paths:
            self.image.source = self.image_paths[number]
            self.image.reload()

    @mainthread
    def update_data_label(self):
        number = self.data_buffer["S"]
        speed = self.data_buffer["V"]

        if number in self.counts:
            self.counts[number] += 1

        smash_type = ['Smash', 'Net', 'Drive', 'Lift'][number - 1]
        smash_text = smash_type if smash_type else "unmeasurable"
        speed_text = f"Speed: {speed}km/h" if speed else "unmeasurable"
        reps_text = f"Reps: {self.counts[number]}"

        self.smash_label.text = smash_text
        self.speed_label.text = speed_text
        self.reps_label.text = reps_text

        # Clear the status label once data is received
        self.status_label.text = ""

        # Clear the buffer
        self.data_buffer = {"S": None, "V": None}

if __name__ == "__main__":
    GetAccGyrodata_app().run()

alt text alt text alt text alt text

Copyright 2024 Hajime Ito - Creative Commons Attribution Non Commercial

検索
キーワードを入力して検索を始めましょう