Networking

This week I got two XIAO RP2040 microcontrollers to communicate through a wired connection over the UART protocol and communicated between two Raspberry Pi Pico W microcontrollers wirelessly over Wi-Fi. These accomplishments were also a part of the group project, as I worked with David Tian on several aspects of the process (see the group documentation here).

Wired (UART)

David Tian pointed to this library for using UART communication in MicroPython. Here are two scrips I saved to a XIAO RP2040 microcontroller using the Thonny IDE. With the help of this forum I slightly edited the code. I used MicroPython 1.20.0 and made sure to select MicroPython (RP2040) not MicroPython (Pico), although I'm not sure whether it would have made a diffrence.

uart_test.py

from easy_comms import Easy_comms
from time import sleep

com1 = Easy_comms(uart_id=0, baud_rate=9600)
com1.start()

while True:
    message = ""
    message = com1.read()

    if message is not None:
        print(f"message received: {message.strip('\n')}")
    sleep(1)

easy_comms.py

from machine import UART, Pin
from time import time_ns

class Easy_comms:

    uart_id = 0
    baud_rate = 9600
    timeout = 1000 # milliseconds

    def __init__(self, uart_id:int, baud_rate:int=None):
        self.uart_id = uart_id
        if baud_rate: self.baud_rate = baud_rate

        # set the baud rate
        self.uart = UART(self.uart_id,self.baud_rate)

        # Initialise the UART serial port
        self.uart.init()

    def send(self, message:str):
        print(f'sending message: {message}')
        message = message + '\n'
        self.uart.write(bytes(message,'utf-8'))

    def start(self):
        message = "ahoy\n"
        print(message)
        self.send(message)

    def read(self)->str:
        start_time = time_ns()
        current_time = start_time
        new_line = False
        message = ""
        while (not new_line) or (current_time <= (start_time + self.timeout)):
            if (self.uart.any() > 0):
                message = message + self.uart.read().decode('utf-8')
                if '\n' in message:
                    new_line = True
                    message = message.strip('\n')
                    # print(f'received message: {message}')
                    return message
            else:
                pass
                #print("No message recieved")
        #else:
        #    return None

I also uploaded easy_comms.py to the other board, but instead of uart_test.py I uploaded the following.

uart_sender.py

from easy_comms import Easy_comms
from time import sleep

com1 = Easy_comms(uart_id=0, baud_rate=9600)
#com1.start()

count = 0
while True:
    # send a message
    com1.send(f'hello, {count}')

    #check for messages
    message = com1.read()

    if message is not None:
        print(f"message received: {message.strip('\n')}")
    sleep(1)
    count += 1

easy_comms.py

from machine import UART, Pin
from time import time_ns

class Easy_comms:

    uart_id = 0
    baud_rate = 9600
    timeout = 1000 # milliseconds

    def __init__(self, uart_id:int, baud_rate:int=None):
        self.uart_id = uart_id
        if baud_rate: self.baud_rate = baud_rate

        # set the baud rate
        self.uart = UART(self.uart_id,self.baud_rate)

        # Initialise the UART serial port
        self.uart.init()

    def send(self, message:str):
        print(f'sending message: {message}')
        message = message + '\n'
        self.uart.write(bytes(message,'utf-8'))

    def start(self):
        message = "ahoy\n"
        print(message)
        self.send(message)

    def read(self)->str:
        start_time = time_ns()
        current_time = start_time
        new_line = False
        message = ""
        while (not new_line) or (current_time <= (start_time + self.timeout)):
            if (self.uart.any() > 0):
                message = message + self.uart.read().decode('utf-8')
                if '\n' in message:
                    new_line = True
                    message = message.strip('\n')
                    # print(f'received message: {message}')
                    return message

I wired the connections as follows:

RX (sender) <---> TX (reciever)
TX (sender) <---> RX (reciever)
GND <---> GND

It worked!

Click here to see a milled board for this connection on the group site.

Wireless

The SSID is visible as the network name in the hotspot settings on the iPhone.

secret.py

ssid = **MY HOTSPOT NAME**
password = **MY HOTSPOT PASSWORD**

wifi_scan.py

import network
import time

# Initialize Wi-Fi in station mode
wlan = network.WLAN(network.STA_IF)
wlan.active(True)

# Scan for available Wi-Fi networks
print("Scanning for Wi-Fi networks...")
networks = wlan.scan()

# Print the SSIDs of the detected networks
print("Available Wi-Fi networks:")
for network in networks:
    ssid = network[0].decode('utf-8')
    print(ssid)

# Deactivate Wi-Fi after scanning
wlan.active(False)

server.py (just trying to connect - written by ChatGPT)

import network
import socket
import time
from secret import ssid, password

wlan = network.WLAN(network.STA_IF)
wlan.active(True)

wlan.connect(ssid, password)

max_wait = 10
while not wlan.isconnected() and max_wait > 0:
    max_wait -= 1
    print('waiting for connection...')
    time.sleep(1)

if not wlan.isconnected():
    print('network connection failed')
    print('Wi-Fi status:', wlan.status())
    print('Wi-Fi config:', wlan.ifconfig())
else:
    print('connected')
    status = wlan.ifconfig()
    print('ip = ' + status[0])

server.py (connected to David's hotspot)

import network
import socket
import time
from secret import ssid, password

wlan = network.WLAN(network.STA_IF)
wlan.active(True)

wlan.connect(ssid, password)

max_wait = 20
while not wlan.isconnected() and max_wait > 0:
    max_wait -= 1
    print('waiting for connection... Wi-Fi status:', wlan.status())
    time.sleep(1)

if not wlan.isconnected():
    print('network connection failed')
    print('Wi-Fi status:', wlan.status())
    print('Wi-Fi config:', wlan.ifconfig())
else:
    print('connected')
    status = wlan.ifconfig()
    print('ip = ' + status[0])

server.py (from tutorial - worked!)

# Webserver to send RGB data
# Tony Goodhew 5 July 2022
import network
import socket
import time
from machine import Pin, ADC
from secret import ssid,password
import random
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)

# Wait for connect or fail
max_wait = 10
while max_wait > 0:
    if wlan.status() < 0 or wlan.status() >= 3:
        break
    max_wait -= 1
    print('waiting for connection...')
    time.sleep(1)

# Handle connection error
if wlan.status() != 3:
    raise RuntimeError('network connection failed')
else:
    print('connected')
    status = wlan.ifconfig()
    print( 'ip = ' + status[0] )

# Open socket
addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1]

s = socket.socket()
s.bind(addr)
s.listen(1)

print('listening on', addr)

# Listen for connections
while True:
    try:
        cl, addr = s.accept()
        print('client connected from', addr)
        request = cl.recv(1024)
        print(request)
        # Do not unpack request
        # We reply to any request the same way
        # Generate 3 values to send back
        r = random.randint(0,255)
        g = random.randint(0,255)
        b = random.randint(0,255)
        # Join to make a simple string with commas as separators
        rgb = str(r) + "," + str(g) + ","+str(b)

        response = rgb # This is what we send in reply

        cl.send(response)
        print("Sent:" + rgb)
        cl.close()

    except OSError as e:
        cl.close()
        print('connection closed')

Error on client when I ran client.py

Waiting to connect:
('0.0.0.0', '255.255.255.0', '192.168.0.1', '8.8.8.8')
Traceback (most recent call last):
  File "<stdin>", line 27, in <module>
OSError: [Errno 113] EHOSTUNREACH

client.py

# Program to read RGB values from a local Pico Web Server
# Tony Goodhew 5th July 2022
# Connect to network
import network
import time
from secret import ssid, password
import socket

wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)
while not wlan.isconnected() and wlan.status() >= 0:
    print("Waiting to connect:")
    time.sleep(1)

# Should be connected and have an IP address
wlan.status() # 3 == success
wlan.ifconfig()
print(wlan.ifconfig())

while True:
    ai = socket.getaddrinfo("172.20.10.6", 80) # Address of Web Server
    addr = ai[0][-1]

    # Create a socket and make a HTTP request
    s = socket.socket() # Open socket
    s.connect(addr)
    s.send(b"GET Data") # Send request
    ss=str(s.recv(512)) # Store reply
    # Print what we received
    print(ss)
    # Split into RGB components
    l = len(ss)
    ss = ss[2:l-1]     # Strip to essentials  
    p = ss.find(",")   # Find first comma
    r = int(ss[0:p])   # Extract RED value
    ss = ss[p+1:]      # Remove red part
    p = ss.find(",")   # Find comma separator
    g = int(ss[0:p])   # Extract GREEN value
    b = int(ss[p+1:])  # Extract BLUE value
    print(r,g,b)       # Print RGB values
    print()
    # Set RGB LED here
    s.close()          # Close socket
    time.sleep(0.2)    # wait

No error on server side - this time it worked!