15. Wildcard Week

This week was wildcard week which means we were meant to explore a topic that we haven't previously touched in other weeks. I decided to explore one of the topics mentioned which was "tiny ML" and create an AI for the pong game I created in a previous week for my mini game console.

Tensorflow and Tensorflow-lite

Tensorflow is a library used to create neural nets for a variety of applications like object detection, classification, etc..Tensorflow-lite is a project under its umbrella to run on less powerful devices like microcontrollers. I decided I would create a small model, train it, and then convert it into an tensorflow-lite model so it can fit and run on the esp32 microcontroller I'm using. The model would be an AI pong player that a user could play against. Ideally, various levels of difficulty could be created to allow the player to progressively play harder and harder opponents.

Reinforcement learning

The first question was how I would train the model. In short, I decided on a method known as reinforcement learning, following in the footsteps of google that used this technique to train many game AIs (including one for pong).

So How Does It Work?

You can think of reinforcement learning similar to training a dog (disclaimer: I don't actually think this is exactly how dogs are trained). When the dog does something good you give them a treat. When the dog does something bad you give the dog a punishment (I feel bad even writing that). Over time and experiences the dog will converge on what the "correct" behavior is and begin to do the things that lead to treats and avoid the behavior that leads to punishement. In reinforcement learning this is decided through a "reward function" that the developer defines. A good reward function will get the AI agent to converge on the desired behavior.

In a game of pong each point can be seen as an "episode". Every X episodes you train your model on the episodes' results. Ideally it learns from prior mistakes and over many many training epochs learns to play pong.

What is the Pong AI model's reward function

I decided to go with the "if it ain't broke" philosophy and use a reward function similar to Google's. When the AI wins a point it would be rewarded +1 points for that "step" and a decayed reward for all prior steps/movements that lead to winning that point. This means that if it took 25 movements or commands for the AI to win the point, the 25th would get a +1 point and the prior 24 would get progressively smaller (but positive) rewards. Intuitively this makes sense. The step right before the winning one probably is more directly connected to winning the point that the one 1000 steps before. This works the same when losing a point as well. When a point is lost a decayed negative reward is given to all steps leading up to the final one. This leads to an interesting reward output in some cases. It is "better" (or less bad) to lose a point quickly than to lose a very long running point. This is still something I'm grappling with. I'd like to explore different reward functions to see if this makes the AI pong paddle converge slower than it could (or read about how it makes sense).

What are the Pong AI model's inputs?

In the case of Google's work/experiments with these games, they used all of the screen's pixels as an input. This is nice as it allows the AI to learn completely on its own without distilling any information. Unfortunately all of the pixels of the screen means higher training time (because there is more data to look at). I didn't have time or setup to run such a training round. I decided to distill the information down to the important pieces of game state. This is the ball's velocity and position and the players' paddle positions.

Who/What would the AI play against?

Well, I could resign myself to play thousands and thousands of pong games in a row and die a sad and lonely man with a mediocre pong AI OR I could have it play another AI. Initially I considered having it play itself and get 2x the game data (I believe this is what Google did) at once but after briefly thinking about it I realized it would be a bit more code than I would want to write. I ended up writing a very simple AI that probabilistically went in the direction of the ball X% of the time (X being 75% in this case).

How would the model play??

As no human would be playing, I turned off the delay on each game step so the games would be sped up and not need to be seen. The probabilistic AI was run on the ESP-32 microcontroller and would send the relevant data over serial to my computer. The data would then be fed to the model which would output 2 numbers can be thought of as its desire to move the paddle "up" or "down". The action is then taken based on this weighted probability. This is done to give some variability to the decision making of the model and bakes in uncertainty. If the action with the higher probabilty were always chosen then it may never learn that the other action leaves to a better outcome. After a certain number of points (episodes) the model would be trained on these prior points. This is repeated a lot. To monitor if the model was actually learning I monitored the reward output after each batch. Ideally if the model is learning the model's aggregate reward should increase. I also monitored point length out of curiosity.

Code breakdown

Python trainer

First I import all required libraries


          import os
          os.environ["AUTOGRAPH_VERBOSITY"] = "0"
          import serial
          import time
          import numpy as np
          import gym
           
          from keras.models import Sequential
          from keras.layers import Dense, Activation, Flatten
          from keras import initializers
          from keras.activations import sigmoid, softmax
          from keras.optimizers import Adam
          from keras import regularizers
          import tensorflow as tf
          import json

          import random
          import numpy as np
          from collections import deque
          import datetime
          from keras.models import load_model
          from matplotlib import pyplot as plt
          from everywhereml.code_generators.tensorflow import convert_model

        

Then I set some constants and create a few helper functions to normalize the data for training

          
            MIN_REWARD = -1
            MAX_REWARD = 1
            paddle_width = 4
            paddle_height = 20
            ball_radius = 4
            fig, axs = plt.subplots(2)
            plt.ion()
            plt.show()

            def normalize_y(pos):
                screen_height = 80
                return pos / screen_height

            def normalize_x(pos):
                screen_width = 160
                return pos / screen_width
          
        

Next comes my AIPong classes. First I created one called AIPongHeuristic. This was used to just check that serial communication was being done properly between the ESP and my computer and to ensure the paddle movements being sent were being interpreted correctly by my Arduino code.

          
            class AIPongHeuristic():
                def __init__(self):
                    self.rid = 0
                    self.training = False

                def move(self, state):
                    ball_y = state[4]
                    self.rid = state[0]
                    curr_paddle_y = state[5]
                    return  0 if ball_y - curr_paddle_y < 0 else 1
          
        

Next is the AIPong class which handles both training and responding using the model

          
            class AIPong():
              
                def __init__(self, state_size, action_space=2, model_name="AIPong", cp_model=None, training=True):
                
                    self.state_size = state_size
                    self.action_space = action_space
                    self.memory = deque(maxlen=2000)
                    self.inventory = []
                    self.model_name = model_name
                    
                    self.gamma = 0.95
                    self.epsilon = 1.0
                    self.epsilon_final = 0.01
                    self.epsilon_decay = 0.99#0.995
                    self.cp_model = cp_model
                    self.model = self.model_builder()
                    self.checkpoint_path = "checkpoints/cp-{epoch:04d}.weights.h5"
                    self.epoch = 1
                    self.epochs = []
                    self.epoch_rewards = []
                    self.game_lens = []
                    self.training = training
                    
                  
                def model_builder(self):
              
                    #model = tf.keras.models.Sequential()
                    model = tf.keras.models.Sequential([
                      tf.keras.layers.Dense(units=16, activation='relu', input_dim=self.state_size, kernel_initializer=initializers.RandomUniform(minval=-1.0, maxval=1.0)),
                      tf.keras.layers.Dense(16, activation='relu', kernel_initializer=initializers.RandomUniform(minval=-1.0, maxval=1.0)),
                      tf.keras.layers.Dropout(0.2),
                      tf.keras.layers.Dense(units=self.action_space, activity_regularizer=regularizers.L2(1e-5), activation='softmax', kernel_initializer=initializers.RandomUniform(minval=-1.0, maxval=1.0))
                    ])
                    
                    model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(learning_rate=0.001))
                    if self.cp_model:
                        model.load_weights(self.cp_model)
                    return model

                def move(self, state):
                    actions = self.model.predict(np.array([state,]))[0]
                    #return np.argmax(actions)
                    sigmoid_actions = actions
                    #sigmoid_actions = sigmoid(actions)
                    print(actions)
                    try:
                        return np.random.binomial(1, sigmoid_actions[1] / sum(sigmoid_actions))
                    except:
                        print("argmax", actions)
                        return np.argmax(sigmoid_actions)


                def _check_collision(self, state):
                    return state[1] < state[8] + paddle_width and state[8] < state[1] + ball_radius and state[2] < (state[6] + paddle_height) and state[6] < state[2] + ball_radius
                
                def reward(self, action, state):
                    reward = 0
                    # if self._check_collision(state):
                    #     reward += 1
                    #     print(state)
                    #     print("collision!")
                    if state[7] == 1:
                        reward += 1.0
                    elif state[7] == -1:
                        reward -= 1.0

                    return reward


                def discount_rewards(self, batch):
                    """ take 1D float array of rewards and compute discounted reward """
                    r = [item[2] for item in batch]
                    r = np.array(r)
                    discounted_r = np.zeros_like(r)
                    running_add = 0
                    for t in reversed(range(0, r.size)):
                        if r[t] != 0: running_add = 0 # reset the sum, since this was a game boundary (pong specific!)
                        running_add = running_add * self.gamma + r[t]
                        discounted_r[t] = running_add

                    discounted_r -= np.mean(discounted_r)
                    discounted_r /= np.std(discounted_r)
                    adjusted_batch = []
                    for new_r, item in zip(discounted_r.tolist(), batch):
                        state, action, _, next_state, done = item
                        new_item = (state, action, new_r, next_state, done)
                        adjusted_batch.append(new_item)
                    return adjusted_batch
                        
                    
                def batch_train(self, batch_size):
                    # file_name = "experiences/replay.{epoch:04d}.csv"
                    tot_reward = 0
                    batch_len = 0
                    for i in range(len(self.memory) - batch_size + 1, len(self.memory)):
                        batch = self.memory[i]
                        batch_len += len(batch)

                        discounted_batch = self.discount_rewards(batch)

                        for state, action, reward, next_state, done in discounted_batch:
                            # with open(file_name.format(epoch=self.epoch), "a") as f:
                            #     f.write()
                            # if not done:
                            #     reward = reward + self.gamma * np.amax(self.model.predict(np.array([next_state]), verbose=0)[0], )
                            tot_reward += reward
                            target = self.model.predict(np.array([state,]))
                            #print(target)
                            target[0][action] = reward

                            self.model.fit(np.array([state,]), target, epochs=1, verbose=0)


                    if self.epsilon > self.epsilon_final:
                      self.epsilon *= self.epsilon_decay

                    self.save_model()

                    self.epochs.append(self.epoch)
                    self.epoch_rewards.append(tot_reward)
                    self.game_lens.append(batch_len/batch_size)
                    axs[0].plot(self.epochs, self.epoch_rewards)
                    axs[1].plot(self.epochs, self.game_lens)
                    plt.draw()
                    plt.pause(0.1)
                    tot_reward = 0
                    
                    self.epoch += 1

                def save_model(self):
                    self.model.save_weights(self.checkpoint_path.format(epoch=self.epoch))

                def save(self):
                    # converter = tf.lite.TFLiteConverter.from_keras_model(self.model)
                    # tflite_model = converter.convert()

                    tflite_model = convert_model(self.model,np.array([[0,0,0,0,0,0]]), np.array([[1,3]]))
                    #tflite_model = tf.saved_model.save(self.model, "model")
                    with open('model.tflite', 'wb') as f:
                      f.write(bytes(tflite_model, "utf-8"))
                    # porter = tf_porter(self.model, np.array([[0,0,0,0,0,0]]), np.array([1,3]))
                    # cpp_code = porter.to_cpp(instance_name='modelo_seno', arena_size=4096)
                    # with open("model.cpp", "w") as f:
                    #     f.write(cpp_code)
                    # print("done")
          
        

There is a lot going on in there. Excuse its ugliness.

Next comes the big loop that actually receives serial data from the microcontroller, processes it, and responds.


          ai_pong = AIPong(6, training=False,cp_model="checkpoints/backup/cp-0009.weights.h5")
          # ai_pong.save()
          
          # ai_pong = AIPongHeuristic()
          ser = serial.Serial('/dev/cu.usbmodem145101', 115200, timeout=2)  # open serial port
          mems = []
          last = ()
          batch_size = 5
          last_update = 0
          last_time = time.time()
          tot_reward = 0
          epoch = 1
          while 1:
              line = ser.readline().decode("utf-8").rstrip()
              if time.time() - last_time > 1:
                  ser.write(b"10000000000,0\n")
                  last_time = time.time()
                  continue
              if "debug:" in line:
                  #print(line)
                  continue
              if "s:" not in line:
                  continue
              try:
                  header, data = line.split(":")
              except:
                  continue
              data_list = data.split(",")
              state = [int(d) for d in data_list]
              rid = state[0]
              saved_state = state[1:]
              #state[3], [4] are dx,dy
              saved_state = [state[3], state[4], normalize_x(state[1]), normalize_y(state[5]), normalize_y(state[2]), normalize_y(state[6])]
              #curr_paddle_y = ai_pong.move(state)
              action = ai_pong.move(saved_state)

              if ai_pong.training:
                  if last:
                      reward = ai_pong.reward(last[0], state)
                      last = last + (reward, saved_state, state[7] != 0)
                      mems.append(last)

                  if state[7] != 0:
                      print("result", state)
                      ai_pong.memory.append(mems)
                      mems = []
                      if len(ai_pong.memory) - last_update >= batch_size:
                          ai_pong.batch_train(batch_size)
                          last_update = len(ai_pong.memory)

              if action == 0:
                  curr_paddle_y = -5
              elif action == 1:
                  curr_paddle_y = 5
              
              resp = (str(rid) + "," + str(curr_paddle_y) + "\n").encode("utf-8")
              last = (saved_state, action)
              ser.write(resp)
        

Once again, a lot of messy code. Let's go through it. First the serial port connection is initialized with the microcontroller's serial port. The AIPong class is also initialized. Batch size is also set (which dictates how often training occurs.


          ai_pong = AIPong(6, training=False,cp_model="checkpoints/backup/cp-0009.weights.h5")
          ser = serial.Serial('/dev/cu.usbmodem145101', 115200, timeout=2)  # open serial port
          mems = []
          last = ()
          batch_size = 5
          last_update = 0
          last_time = time.time()
          tot_reward = 0
          epoch = 1
        

Finally some hacky code to see debug statements are put at the top of the loop. This helps to find issues and ensure the correct data is being sent.

          
            line = ser.readline().decode("utf-8").rstrip()
            if time.time() - last_time > 1:
                ser.write(b"10000000000,0\n")
                last_time = time.time()
                continue
            if "debug:" in line:
                #print(line)
                continue
            if "s:" not in line:
                continue
            try:
                header, data = line.split(":")
            except:
                continue
          
        

Next we "deserialize" the data sent by the microcontroller which is just a comma separated list that corresponds to where the ball is, the velocity, etc...ie the game state. Some of this data is normalized as well for training.

          
            data_list = data.split(",")
            state = [int(d) for d in data_list]
            rid = state[0]
            saved_state = state[1:]
            #state[3], [4] are dx,dy
            saved_state = [state[3], state[4], normalize_x(state[1]), normalize_y(state[5]), normalize_y(state[2]), normalize_y(state[6])]
          
        

Finally if the paddle is actually being trained each piece of state and corresponding action are recorded to be trained on after the batches have been completed for this epoch (5 game points for now). The movement from the model is then sent to the microcontroller so the pong game can continue.

          
            if ai_pong.training:
              if last:
                  reward = ai_pong.reward(last[0], state)
                  last = last + (reward, saved_state, state[7] != 0)
                  mems.append(last)

              if state[7] != 0:
                  print("result", state)
                  ai_pong.memory.append(mems)
                  mems = []
                  if len(ai_pong.memory) - last_update >= batch_size:
                      ai_pong.batch_train(batch_size)
                      last_update = len(ai_pong.memory)

          if action == 0:
              curr_paddle_y = -5
          elif action == 1:
              curr_paddle_y = 5
          
          resp = (str(rid) + "," + str(curr_paddle_y) + "\n").encode("utf-8")
          last = (saved_state, action)
          ser.write(resp)
          
        

Arduio code

For the arduino code I will only be focusing on the relevant functions for training and communication. The two functions updateAIState and getAIState send the current game data to the serial port and receive the response of the AI. They are generally just serializing and deserializing data. This is more or less the extent of the code added for the AI training.

          
            void updateAIState(){
              Ball *ball = (Ball*)gameObjects[BALL_IDX];
              Scoreboard *scoreboard = (Scoreboard*)gameObjects[SCOREBOARD_IDX];
              Paddle *you = myPaddle;
              Paddle *other = opponentPaddle;
              currRid+=1;
              Serial.printf("s:%d,%d,%d,%d,%d,%d,%d,%d,%d\n", currRid, ball->x, ball->y, ball->dx, ball->dy, you->y, other->y, gameState, other->x);
            }

            void getAIState(){
              String resp = "";
              int rid = 0;
              int commaIdx = 0;
              int timeElapsed = millis();
              while (rid < currRid){
                resp = Serial.readStringUntil('\n');
                commaIdx = resp.indexOf(',');
                rid = resp.substring(0, commaIdx).toInt();
              }

              int newY = resp.substring(commaIdx+1).toInt();
              Paddle *other = opponentPaddle;
              opponentPaddle->move(0, newY);
            }
          
        

Next I had to take my model and convert it to a C object to be used on the ESP32. Details can be found here on how that is done. My model files and C array files are listed below.

After that the model is run using a tf micro library that takes the C array as an input. The support for this is currently very wishy washy. More or less this is created like thus


          Eloquent::TF::Sequential tf;
        

And initialized like this


          tf.setNumInputs(6);
         tf.setNumOutputs(1);


            while (!tf.begin(tfData).isOk()) {
               Serial.println(tf.exception.toString());
             }
        

Although I was not able to successfully get it to work yet.

DIdiiddiiddit Work?

Yes, the system did work and the model did gradually get better but only to a point. After a certain number of training terations the AI actually plummeted and got signficantly worse. Monitoring the layers of the model it seemed to suffer from saturation. The outputs for up and down would start trending toward being 0 and 1 which means one path would become almost unused. Even after losing point after point it wasn't able to correct for this. I didn't have a ton of time to investigate so I decided to stop training shorter than I would've liked and settle. Overall the results were pretty positive and the trained AI model won about 2x the points the probabilistic model would win.

RL paddle beats probabilistic paddle!

Getting the model on the danged ESP-32

I thought I was home free and all I'd have to do was convert the model to tensorflow-lite with Tensorflow's little utility tool and have me a micro model. This was not the case as I found out that there isn't the kind of support you'd like to see for tensorflow models on microcontrollers. Although I was able to get my micro model compiled it didn't run properly on the microcontroller. I didn't leave myself enough time to debug and so this is still an open problem I need to solve.

Model files