November 09, 2025

Systematic Reed Solomon erasure code in Python

In part 1, I've visually demonstrated a version of the Reed Solomon erasure code.

Let's turn that into a little library to encode and decode chunks of data. It should be something that can be used in a large scale cloud data storage system with millions of storage devices.

API

encode_chunks(K, N, chunks)

K is the number of input chunks. 

N is the total number of chunks wanted.

chunks is a dictionary containing the data to encode. The keys has to be exactly 0 to K-1. The values should be a list of numbers, each between 0 to 255. All the chunks must be of the same length L.

The function will return a dictionary of N-K redundant chunks in the same format as chunks. The keys are from K to N-1.

decode_chunks(K, N, chunks)

K is the number of chunks.

N is the total number of chunks.

chunks is a dictionary of at least K out of N chunks. The keys has to be from 0 to N-1

The function returns a dictionary containing all missing chunks between 0 to K-1.

The Code

The code from the previous post looks slow. There are about 2*K*K "multiplications/divisions" and 3*K "additions/subtractions" for each output byte in the redundant chunks. Well, the code was written that way to match the mathematical equation of Lagrange Interpolation and to plot the curves. Let's make that faster.

Notice that we get to pick the x's for the polynomials. The x's are the keys for the chunks and result dictionaries. It is 0 to K-1 for the input chunks and K to N-1 for the redundant chunks. We loop through L slices of the chunks (one byte from each chunk) using the same x's. We extract this loop invariant part into lag_interp_weights(xs, x). This returns a vector of weights for the y's. These weights are actually the coefficients of the Lagrange polynomials.

For each redundant chunk, we compute this vector of weights once. Then we take a slice (one byte from each of the K chunks at the same offset) as the y values,  multiply each y with the corresponding weight in the vector and sum them together. In other words, the output byte for the slice is the dot product of the weight vector and a slice of the K chunks. Do that for all L slices to compute the redundant chunk. We only need to do K "multiplications" and K "additions" for each output byte.

Decoding the chunks is similar to encoding the chunks. As long as we have K of the N chunks, we use the keys for those chunks as the x's, use the same math to compute the data for any missing key between 0 to K-1

Here's the code, including a test function test_random() to check that it actually works.


import random
import time

# GF(2**8)
gf256_log = [0]*256 # zeroth entry is invalid
gf256_antilog = [0]*255 # yes, only 255 entries

def gf256_add(x, y):
    return x^y

gf256_sub = gf256_add

def gf256_slow_mul(x, y): # long multiplication
    r = 0
    while y>0:
        if y&1:
            r ^= x # add
        if x&0x80:
            x = (x<<1)^0x11b # reduce by polynomial
        else:
            x <<= 1
        y >>= 1
    return r

def gf256_init_tables():
    z = 1
    for x in range(255):
        gf256_antilog[x] = z
        gf256_log[z] = x
        z =  gf256_slow_mul(z, 3)

def gf256_div(x, y):
    assert y != 0
    if x == 0:
        return 0
    return gf256_antilog[(gf256_log[x] - gf256_log[y])]

def gf256_mul(x, y):
    if x == 0 or y == 0:
        return 0
    return gf256_antilog[(gf256_log[x] + gf256_log[y]) % 255]

gf256_init_tables()

def lag_interp_weights(xs, x):
    n = len(xs)
    w = n * [0]
    for i in range(n):
        t = 1
        for j in range(n):
            if i != j:
                t = gf256_mul(t,
                              gf256_div(gf256_sub(x, xs[j]),
                                        gf256_sub(xs[i], xs[j])))
        w[i] = t
    return w

def compute_chunks(K, xs, ts, chunks):
    assert K == len(xs)
    results = {}
    L = len(chunks[xs[0]])
    inputs = [chunks[xs[i]] for i in range(K)]
    for t in ts:
        assert not t in xs
        w = lag_interp_weights(xs, t)
        r = L * [0]
        for z in range(L):
            y = 0
            for i in range(K):
                y = gf256_add(y, gf256_mul(w[i], inputs[i][z]))
            r[z] = y
        results[t] = r
    return results

def encode_chunks(K, N, chunks):
    assert K > 0 and K < N and N <= 256
    L = len(chunks[0])
    for z in range(K):
        assert len(chunks[z]) == L
    xs = list(range(K))
    return compute_chunks(K, xs, list(range(K, N)), chunks)

def decode_chunks(K, N, chunks):
    assert K > 0 and K < N and N <= 256
    xs = []
    for x in range(N):
        if x in chunks:
            xs.append(x)
        if len(xs) >= K:
            break
    assert len(xs) == K
    ts = [x for x in range(K) if not x in chunks]
    return compute_chunks(K, xs, ts, chunks)

def recover_original_chunks(K, N, chunks):
    temp = decode_chunks(K, N, chunks)
    temp.update(chunks)
    for x in [j for j in temp.keys() if j >= K]:
        del temp[x]
    return temp

def test_random(K, N, L, rounds):
    chunks = gen_data(K, L)
    rr = random.Random(2)
    estart = time.time()
    enc = encode_chunks(K, N, chunks)
    encode_time = time.time() - estart;
    print(f"Encode time: {encode_time:f}s  input rate: {K*L/1024/1024/encode_time:.3f}MB/s")
    enc.update(chunks)
    dstart = time.time()
    for round in range(rounds):
        valid = list(range(N))
        while len(valid) > K:
            valid.pop(rr.randint(0, len(valid)-1))
        temp = {}
        for x in valid:
            temp[x] = enc[x]
        r = recover_original_chunks(K, N, temp)
        assert r == chunks, f"Recovered chunks are not equal the original chunks: {valid}"
    decode_time = time.time() - dstart;
    print(f"Decode time: {decode_time:f}s {rounds} rounds {decode_time/rounds:f}s/round  input rate: {K*L*rounds/1024/1024/decode_time:.3f}MB/s")
    print(f"random_{K}_{N}_{L} succeeded")

def gen_data(K, L):
    rr = random.Random(1)
    chunks = { x: list(rr.randbytes(L)) for x in range(K) }
    return chunks

test_random(16, 24, 65536, 1)

That's all. Just over 100 lines of Python code not counting the test functions.

No solving of systems of linear equations. No Vandermoonde matrix. No Gaussian elimination over Galios Fields to invert matrices. No dependencies. Just plain Python.

With K=16, N=24, encoding 1 MB (16 chunks of 64 KB) takes 1.7s (600 KB/s) on my M2 MacBook Air.

With K=17, N=20, encoding 17 MB (17 chunks of 1 MB) takes 11.2s (1.5 MB/s).

I have not seen an erasure code algorithm that does less work than one dot product per output byte.
Certainly, I haven't found one that is so easy to code!


Update 2:

See fast C version and fast Go lang version.


Update:

I added test_all() to test that the data can be recovered from all possible N-choose-K subsets of chunks.


def start_comb(K, v):
    for i in range(K):
        v[i] = i

def next_comb(K, N, v):
    x = K-1
    while x >= 0:
        if v[x] < N-1:
            v[x] += 1
            for j in range(x+1, K):
                v[j] = v[j-1] + 1
            if v[K-1] < N:
                return False
        x -= 1
    return True

def test_all(K, N, L):
    chunks = gen_data(K, L)
    estart = time.time()
    enc = encode_chunks(K, N, chunks)
    encode_time = time.time() - estart;
    print(f"Encode time: {encode_time:f}s  input rate: {K*L/1024/1024/encode_time:.3f}MB/s")
    enc.update(chunks)
    dstart = time.time()
    valid = K * [0]
    count = 0
    done = False
    start_comb(K, valid)
    while not done:
        temp = {}
        count += 1
        for x in valid:
            temp[x] = enc[x]
        r = recover_original_chunks(K, N, temp)
        assert r == chunks, f"Recovered chunks are not equal the original chunks: {valid}"
        done = next_comb(K, N, valid)
    decode_time = time.time() - dstart;
    print(f"Decode time: {decode_time:f}s {count} combinations {decode_time/count:f}s/combination  input rate: {K*L*count/1024/1024/decode_time:.3f}MB/s")
    print(f"Encode time plus decode time: {encode_time+decode_time:f}s")
    print(f"all_{K}_{N}_{L} {count} combinations succeeded")

test_all(17, 20, 1024) # 1140 combinations. 17KB input. 10 seconds.
test_all(10, 16, 1024) # 8008 combinations. 10KB input. 1 minute.
test_all(16, 24, 16) # 735471 combinations. 256B input. 9.5 minutes.
test_all(16, 24, 1024) # 735471 combinations. 16KB input. 5 hours.


November 03, 2025

Systematic Reed Solomon erasure code

Problem

Given K bytes (message), compute N bytes (encoded message or codeword) such that the original K bytes can be recovered from any K subset of the N bytes. Assume we are only dealing with missing data, not corrupted data.

Motivation

Why is this useful? There are many practical applications of for this. It can be used to build a data storage system that tolerates N-K storage node failures with only N/K times the space. Over unreliable messaging systems, a message can be sent in N parts and then can be recovered by the receiver if any K out of N parts are received. The new Signal protocol proposes to do that in parts of the protocol.

How?

Conceptually, we can fit a curve through K points with a unique polynomial of degree less than K. Pick another N-K points along the curve. Together, we have N points on the same curve and any K points can be used to recalculate the same curve and recover any of the missing original K points.

Let's pick the output of the polynomial at x = 1 to K to be the K bytes in the message. The encoded message is the output of the polynomial at x = 1 to N. The first K bytes of the encoded message will be identical to the message (systematic). This is a useful property. When bytes 1 to K are not missing, no work needs to be done. If any of these bytes are missing, recompute only those bytes.

One way of doing this does not require explicit calculation of the coefficients of the polynomial directly. With K points (x and y values) on the curve, use Lagrange Interpolation to calculate the value of the polynomial at any x value.

Let's turn that into working code in plain Python:

def lagrange_interpolate(xs, ys):
    def f(x):
        n = len(xs)
        y = 0.0
        for i in range(n):
            t = ys[i]
            for j in range(n):
                if i != j:
                    t *= (x - xs[j]) / (xs[i] - xs[j])
            y += t
        return y
    assert len(xs) == len(ys)
    return f

K = 4
N = 6

a = list(range(1, K+1))
b = [82, 233, 217, 241] # message

poly = lagrange_interpolate(a, b) # polynomial

c = list(range(1, N+1))
d = [poly(x) for x in c] # encoded message
assert b == d[:K] # first K bytes are identical to message

e = c.copy()
f = d.copy()
for i in [3, 1]: # drop some data
    e.pop(i)
    f.pop(i)

poly2 = lagrange_interpolate(e[:K], f[:K]) # recalculate polynomial

g = list(range(1, K+1))
h = [round(poly2(x)) for x in g] # recovered original message

assert b == h, "Recovered message is not equal the original message"

if 1: # plot the data               
    import matplotlib.pyplot as plt
    cc = [x/10 for x in range(0, 100*N)]
    dd = [poly(x) for x in cc] # for plotting                                                                                                                                                                   gg = [x/10 for x in range(0, 100*N+1)]
    hh = [poly2(x) for x in gg] # for plotting                                                                                                                                                                   fig, axs = plt.subplots(4)
    axs[0].scatter(a, b, color="blue")
    for i in range(len(a)):
        axs[0].text(a[i], b[i], b[i])
    axs[1].scatter(cc, dd, color="pink")
    axs[1].scatter(a, b, color="blue")
    axs[1].scatter(c, d, color="red")
    for i in range(len(c)):
        axs[1].text(c[i], d[i], round(d[i]))
    axs[2].scatter(cc, dd, color="pink")
    axs[2].scatter(e, f, color="green")
    for i in range(len(e)):
        axs[2].text(e[i], f[i], round(f[i]))
    axs[3].scatter(gg, hh, color="yellow")
    axs[3].scatter(g, h, color="brown")
    for i in range(len(g)):
        axs[3].text(g[i], h[i], h[i])
    axs[0].set_title("Message")
    axs[1].set_title("Encoded message")
    axs[2].set_title("Encoded message with dropped data")
    axs[3].set_title("Recovered message")
    for ax in axs:
        ax.set_ylim([-500, 1500])
        ax.set_xlim([ 0, N+1])
        if ax != axs[-1]:
            ax.get_xaxis().set_visible(False)
    plt.subplots_adjust(hspace=0.5)
    plt.show()
The pink curve was calculated using the original message (blue) and the yellow curve is recalculated from the partially missing encoded message (green).
This works except that the encoded values are unlikely to be within the range of 0 to 255 if we choose x values less than 1 or greater than K. If we choose non-integer x values between 1 and K, then the encoded values are unlikely to be integers.

Instead of computing using real numbers, we switch to computing using Galois field (2**8). Think of it as a special type of "numbers" where "addition", "subtraction", "multiplication" and "division" are well defined (differently) and there are only 256 possible "numbers". Most importantly, all encoded message values are between 0 and 255, and Lagrange Interpolation only uses these four operations! That's a Reed Solomon code for handling missing data.

It is hard to believe that it is possible to define fancy math and then it would just work. Nothing beats a working example:
import random

# GF(2**8)
log_table = [0]*256 # zeroth entry is invalid
antilog_table = [0]*255 # yes, only 255 entries

def gf256_add(x, y):
    return x^y

gf256_sub = gf256_add

def gf256_smul(x, y): # long multiplication
    r = 0
    while y>0:
        if y&1:
            r ^= x # add
        if x&0x80:
            x = (x<<1)^0x11b # reduce by polynomial                                                                       
        else:
            x <<= 1
        y >>= 1
    return r

def init_tables():
    z = 1
    for x in range(255):
        antilog_table[x] = z
        log_table[z] = x
        z =  gf256_smul(z, 3)

def gf256_div(x, y):
    assert y != 0
    if x == 0:
        return 0
    return antilog_table[(log_table[x] - log_table[y])]

def gf256_mul(x, y):
    if x == 0 or y == 0:
        return 0
    return antilog_table[(log_table[x] + log_table[y]) % 255]

init_tables()

def lagrange_interpolate(xs, ys):
    def f(x):
        n = len(xs)
        y = 0
        for i in range(n):
            t = ys[i]
            for j in range(n):
                if i != j:
                    t = gf256_mul(t,
                                  gf256_div(gf256_sub(x, xs[j]),
                                            gf256_sub(xs[i], xs[j])))
            y = gf256_add(y, t)
        return y
    assert len(xs) == len(ys)
    return f

K = 20 # message length
N = 30 # encoded message length (message + redundancy) (<=255)

random.seed(324)

# a: 1..K. Fixed.
# b: message (K bytes)
a = list(range(1, K+1))
b = [random.randint(0, 255) for x in range(K)]

# c: 1..N. Fixed.
# d: encoded message (N bytes)
poly = lagrange_interpolate(a, b)
c = list(range(1, N+1))
d = [poly(x) for x in c]
assert b == d[:K] # first K bytes are identical to message

# e, f: simulate data loss by dropping random parts of c & d                                                      
e = c.copy()
f = d.copy()
while len(e) > K:
    i = random.randint(0, len(e)-1)
    e.pop(i)
    f.pop(i)

# g: 1..K. Fixed.                                                                 
# h: recovered message (K bytes)                                                               
assert(len(e) >= K)
poly2 = lagrange_interpolate(e[:K], f[:K])
g = list(range(1, K+1))
h = [poly2(x) for x in g]

assert b == h, "Recovered message is not equal the original message"

if 1:
    import matplotlib.pyplot as plt
    fig, axs = plt.subplots(4)
    axs[0].scatter(a, b, color="blue")
    axs[1].scatter(c, d, color="red")
    axs[2].scatter(e, f, color="green")
    axs[3].scatter(g, h, color="brown")
    axs[0].set_title("Message")
    axs[1].set_title("Encoded message")
    axs[2].set_title("Encoded message with dropped data")
    axs[3].set_title("Recovered message")
    for ax in axs:
        ax.set_ylim([0, 255])
        ax.set_xlim([ 0, N+1])
        if ax != axs[-1]:
            ax.get_xaxis().set_visible(False)
    plt.subplots_adjust(hspace=0.5)
    plt.show()
If you want to know how Galois field (2**8) math works, find an Abstract Algebra textbook or read section 2 of the Rijndael AES paper.

I hope somebody finds this useful!

Update:



March 17, 2025

Put your monitor 5 feet away

If we use our computer or laptop screen for 10 hours a day for decades, our eyes are going to optimize for that distance (1 to 1.5 feet). Over time, we are going to lose our distance vision and near vision.

This is the "use it or lose it" theory of how our body works. This happens for many muscles in our body so why wouldn't it happen with our eyes?

So, I set out to find out.

A few months ago, I set my 27 inch monitor about 5 feet away but left the fonts at the usual size. At first, it was a little uncomfortable. But now my eyes seem to have adapted to it. In fact, my far vision seems to have improved (tested against a mini eye chart at 10').

I am going to move the monitor further away gradually and also intentionally read books at a close distance.

Check back in a year.

Update 11/02/2025:

It has been more than six months. I can program in Emacs using 11 pt terminal font sitting 5 feet from a 24" screen. Even the eye charts at DMV are easy to read. 

Will I get my 20/20 vision back? Check back in a few years.

YMMV.


November 01, 2024

Arduino balancing bot



I built an Arduino version of the balancing bot from old parts in my parts bin:
  • Arduino Mega2560
  • Motor Shield V1
  • 3v-to-5v level converter
  • MPU-9255
  • toy DC motors and wheels
  • 9V battery holder for Arduino
  • 4x AA battery holder for the motors

The Motor Shield V1 is really obsolete. The I2C pins are not passed through. That makes using it with the Arduino UNO impossible. So I switched to the Mega2560 that has the I2C pins duplicated at a different location. The 3.3v pin is still not passed through. That got fixed with a Dremel. The L293D chip on the motor shield is really inefficient and the voltage drop is significant.

There is a newer Motor Shield V2 without any of these problems. But I didn't want to spend $20. In addition, the new version uses I2C and I didn't feel like troubleshooting multiple devices on the I2C bus with a level converter.

The Arduino UNO R3 and to some extent the Mega2560 are obsolete too. The new Arduino UNO R4 has a far more efficient switching regulator, LED matrix display, and optional bluetooth and wifi support.

The MPU-9255 has been out of production for years.

The 3v-to-5v level converter is needed because the Arduino is a 5v device and the MPU-9255 is a 3v device. You really want to respect the voltages when it comes to I2C communications.

Connecting it all up is easy, right? Just add a few more power and ground pins on the motor shield and use wires with DuPont connectors:


Actually, no. 

DuPont connectors do not connect securely. Whenever the bot falls, some wires risk coming loose. Cheap pre-made wires with DuPont connectors also fail at an alarmingly high rate. 

Well, I found an alternative:


Wire wrapping to the rescue! Those tiny white wires are 30 AWG Kynar wire wrapping wires. You can wire wrap any rectangular post that is small enough to fit the wire wrapping tool. You can wire wrap DuPont pins! For female headers, wire wrap one side of a double male header and plug it in.

Wire wrapping is actually more secure than soldering. It is also easier to wrap or unwrap than to solder or de-solder. The Cray 1 was wire wrapped. That's good enough for my toy robot. It has gone out of fashion many decades ago though. Fortunately, you can still buy proper Kynar wire wrapping wires and a wire wrapping tool off your favorite online store. Get the tool that does modified wraps.

As for the code, I just ported my Javascript Microbit code to Arduino C.

The Motor Shield V1 can be controlled using the Adafruit Motor Shield library v1.0.1. That seems to work fine.

However, I couldn't find a MPU-9255 library that suited my needs. I ported my MPU-9255 code and even added accelerometer bias adjustments.

Tuning the PID parameters took a few hours. The PID scaling constants seem to depend on the mounting height of the sensor. The bot to stayed up for 2 hours before I shut it down.





Here's the code:

#include <Wire.h>
#include <AFMotor.h>

// Tuning Parameters
float TARGET_ANGLE = -3.1; // forward tilt in degrees
float SMOOTHING_FACTOR = 200;

float KE = 15;
float KD = KE/20;
float KI = KD/5;

const int MPU_ADDR = 0x68;
const int WHO_AM_I = 0x75;
const int REG_PWR_MGMT_1 = 0x6b;
const int REG_SIGNAL_PATH_RESET = 0x68;
const int REG_USER_CTRL = 0x6a;
const int REG_ACCEL_XOUT_H = 0x3b;
const int REG_ACCEL_YOUT_H = 0x3d;
const int REG_ACCEL_ZOUT_H = 0x3f;
const int REG_GYRO_XOUT_H = 0x43;
const int REG_GYRO_YOUT_H = 0x45;
const int REG_GYRO_ZOUT_H = 0x47;
const int REG_CONFIG = 0x1a;
const int REG_GYRO_CONFIG = 0x1b;
const int REG_ACCEL_CONFIG = 0x1c;
const int REG_ACCEL_CONFIG2 = 0x1d;
const int XG_OFFSET_H = 0x13;
const int YG_OFFSET_H = 0x15;
const int ZG_OFFSET_H = 0x17;
const int XA_OFFSET_H = 0x77;
const int YA_OFFSET_H = 0x7A;
const int ZA_OFFSET_H = 0x7D;

bool mpu_read(int reg, byte *data) {
Wire.beginTransmission(MPU_ADDR);
Wire.write(reg);
Wire.endTransmission(false);
int n = Wire.requestFrom(MPU_ADDR, 1);
if (n != 1) {
return false;
}
*data = Wire.read();
return true;
}

bool mpu_read_int16(int reg, short *data) {
Wire.beginTransmission(MPU_ADDR);
Wire.write(reg);
Wire.endTransmission(false);
int n = Wire.requestFrom(MPU_ADDR, 2);
if (n != 2) {
return false;
}
int h = Wire.read();
int l = Wire.read();
*data = (h << 8) | l;
return true;
}

void mpu_write(int reg, int data) {
Wire.beginTransmission(MPU_ADDR);
Wire.write(reg);
Wire.write(data & 0xff);
Wire.endTransmission();
}

void mpu_write_int16(int reg, int data) {
Wire.beginTransmission(MPU_ADDR);
Wire.write(reg);
Wire.write((data >> 8) & 0xff);
Wire.write(data & 0xff);
Wire.endTransmission();
}

/**
* Reset the MPU and configure the gyroscope to +- 2000 degrees/second and the accelerometer to +-16g.
* The low pass filter settings control how sensitive the sensors are to quick changes.
* In order of increasing sensitivity: 6, 5, 4, 3, 2, 1, 0, 7
* Info from MPU-9250 register map document.
* @param gyro_lpf Gyroscope low pass filter setting, eg: 0
* 7: 8kHz sampling rate, 3600Hz bandwidth, 0.17ms delay.
* 0: 8kHz sampling rate, 250Hz bandwidth, 0.97ms delay.
* 1: 1kHz sampling rate, 184Hz bandwidth, 2.9ms delay.
* 2: 1kHz sampling rate, 92Hz bandwidth, 3.9ms delay.
* 3: 1kHz sampling rate, 41Hz bandwidth, 5.9ms delay.
* 4: 1kHz sampling rate, 20Hz bandwidth, 9.9ms delay.
* 5: 1kHz sampling rate, 10Hz bandwidth, 17.85ms delay.
* 6: 1kHz sampling rate, 5Hz bandwidth, 33.48ms delay.
* @param accel_lpf Accelerometer low pass filter setting, eg: 0
* 7: 1kHz sampling rate, 420Hz 3dB bandwidth, 1.38ms delay.
* 0: 1kHz sampling rate, 218.1Hz 3dB bandwidth, 1.88ms delay.
* 1: 1kHz sampling rate, 218.1Hz 3dB bandwidth, 1.88ms delay.
* 2: 1kHz sampling rate, 99Hz 3dB bandwidth, 2.88ms delay.
* 3: 1kHz sampling rate, 44.8Hz 3dB bandwidth, 4.88ms delay.
* 4: 1kHz sampling rate, 21.2Hz 3dB bandwidth, 8.87ms delay.
* 5: 1kHz sampling rate, 10.2Hz 3dB bandwidth, 16.83ms delay.
* 6: 1kHz sampling rate, 5.05Hz 3dB bandwidth, 32.48ms delay.
*/

bool reset_mpu(int gyro_lpf = 0, int accel_lpf = 0) {
mpu_write(REG_PWR_MGMT_1, 0x80); // H_RESET, internal 20MHz clock
delay(200); // must delay after reset

mpu_write(REG_SIGNAL_PATH_RESET, 0x7); // GYRO_RST | ACCEL_RST | TEMP_RST
mpu_write(REG_USER_CTRL, 0x1); // SIG_COND_RST
mpu_write(REG_CONFIG, gyro_lpf);
mpu_write(REG_GYRO_CONFIG, 0x10); // GYRO_FS_SEL = 3, +-1000 dps, DLPF on
mpu_write(REG_ACCEL_CONFIG, 0x18); // ACCEL_FS_SEL = 3, +-16g, DLPF on
mpu_write(REG_ACCEL_CONFIG2, accel_lpf);
delay(200);
byte x = 255;
if (!mpu_read(REG_GYRO_CONFIG, &x)) {
return false;
}
if (x != 0x10) {
return false;
}
return true;
}

bool compute_biases() {
mpu_write_int16(XG_OFFSET_H, 0);
mpu_write_int16(YG_OFFSET_H, 0);
mpu_write_int16(ZG_OFFSET_H, 0);

const int N = 100;
const int MAX_VAR = 40;
float sum_x = 0, sum_y = 0, sum_z = 0;
float sum_ax = 0, sum_ay = 0, sum_az = 0;
float xs[N];
float ys[N];
float zs[N];
for (int i = 0; i < N; i++) {
short x;
short y;
short z;
if (!mpu_read_int16(REG_GYRO_XOUT_H, &x)) {
return false;
}
if (!mpu_read_int16(REG_GYRO_YOUT_H, &y)) {
return false;
}
if (!mpu_read_int16(REG_GYRO_ZOUT_H, &z)) {
return false;
}
sum_x += x;
sum_y += y;
sum_z += z;
xs[i] = x;
ys[i] = y;
zs[i] = z;
short ax;
short ay;
short az;
if (!mpu_read_int16(REG_ACCEL_XOUT_H, &ax)) {
return false;
}
if (!mpu_read_int16(REG_ACCEL_YOUT_H, &ay)) {
return false;
}
if (!mpu_read_int16(REG_ACCEL_ZOUT_H, &az)) {
return false;
}
sum_ax += ax;
sum_ay += ay;
sum_az += az;
delay(10);
}

float mean_x = sum_x / N;
float mean_y = sum_y / N;
float mean_z = sum_z / N;
float var_x = 0, var_y = 0, var_z = 0;
for (int i = 0; i < N; i++) {
float dx = xs[i] - mean_x;
var_x = var_x + dx * dx;
float dy = ys[i] - mean_y;
var_y = var_y + dy * dy;
float dz = zs[i] - mean_z;
var_z = var_z + dz * dz;
}
var_x = var_x / N;
var_y = var_y / N;
var_z = var_z / N;

float mean_ax = sum_ax / N;
float mean_ay = sum_ay / N;
float mean_az = sum_az / N;

Serial.print("Gyro: mean ");
Serial.print(mean_x);
Serial.print(", ");
Serial.print(mean_y);
Serial.print(", ");
Serial.print(mean_z);
Serial.print(" var ");
Serial.print(var_x);
Serial.print(", ");
Serial.print(var_y);
Serial.print(", ");
Serial.print(var_z);
Serial.print(" Accel: mean ");
Serial.print(mean_ax);
Serial.print(", ");
Serial.print(mean_ay);
Serial.print(", ");
Serial.println(mean_az);
set_gyro_bias(mean_x, mean_y, mean_z);
set_accel_bias(mean_ax, 0, mean_az);
return true;
}

void set_gyro_bias(int x_bias, int y_bias, int z_bias) {
mpu_write_int16(XG_OFFSET_H, -x_bias);
mpu_write_int16(YG_OFFSET_H, -y_bias);
mpu_write_int16(ZG_OFFSET_H, -z_bias);
}

bool set_accel_bias(short x_bias, short y_bias, short z_bias) {
short cur_x;
short cur_y;
short cur_z;
if (!mpu_read_int16(XA_OFFSET_H, &cur_x)) {
return false;
}
if (!mpu_read_int16(YA_OFFSET_H, &cur_y)) {
return false;
}
if (!mpu_read_int16(ZA_OFFSET_H, &cur_z)) {
return false;
}
cur_x -= x_bias;
cur_y -= y_bias;
cur_z -= z_bias;
cur_x &= ~1;
cur_y &= ~1;
cur_z &= ~1;
mpu_write_int16(XA_OFFSET_H, cur_x);
mpu_write_int16(YA_OFFSET_H, cur_y);
mpu_write_int16(ZA_OFFSET_H, cur_z);
return true;
}

//-----------------------------

AF_DCMotor *motor_left;
AF_DCMotor *motor_right;

void motor_move(float v /* -100 to 100 */) {
if (v > 0) {
motor_left->run(BACKWARD);
motor_right->run(FORWARD);
} else {
motor_left->run(FORWARD);
motor_right->run(BACKWARD);
v = -v;
}
if (v > 100) {
v = 100;
}
v = v * 0.7 + 185;
if (v < 195) {
v = 0;
}
motor_left->setSpeed(v);
motor_right->setSpeed(v);
}

void motor_coast() {
motor_left->run(RELEASE);
motor_right->run(RELEASE);
}

//-----------------------------

long last_time = 0;
float motor_out = 0;
int vertical_count = 0;
bool motor_on = false;
float est_angle = 0;
float last_err = 0;
float i_err = 0;


void setup() {
pinMode(LED_BUILTIN, OUTPUT);
for (int i = 0; i < 20; i++) {
digitalWrite(LED_BUILTIN, HIGH);
delay(100);
digitalWrite(LED_BUILTIN, LOW);
delay(100);
}

digitalWrite(SDA, HIGH);
digitalWrite(SCL, HIGH);

Wire.begin();
Wire.setClock(400000);
while (1) {
byte id;
if (mpu_read(WHO_AM_I, &id)) {
if (id == 0x73) {
if (reset_mpu(5, 5)) {
break;
}
}
}
delay(3000);
}
// add 32768/16g = 2048 to the accel axis pointing to ground
//compute_biases();

set_gyro_bias(27, -40, -43);
set_accel_bias(40, 58, 417);

motor_left = new AF_DCMotor(3);
motor_right = new AF_DCMotor(2);
motor_coast();

last_time = millis();
motor_out = 0;
vertical_count = 0;
motor_on = false;
est_angle = 0;
last_err = 0;
i_err = 0;
digitalWrite(LED_BUILTIN, HIGH);
delay(500);
}

bool read_accel_tilt_angle(float *angle) {
short accel_x;
short accel_z;

if (!mpu_read_int16(REG_ACCEL_XOUT_H, &accel_x)) {
return false;
}
if (!mpu_read_int16(REG_ACCEL_ZOUT_H, &accel_z)) {
return false;
}

*angle = atan2(accel_z, -accel_x) * RAD_TO_DEG;
return true;
}

void loop() {
long current_time = millis();
long delta_t = current_time - last_time;
if (delta_t < 5) {
return;
}
short gyro_y;
if (!mpu_read_int16(REG_GYRO_YOUT_H, &gyro_y)) {
return;
}
float accel_angle;
if (!read_accel_tilt_angle(&accel_angle)) {
return;
}
float gyro_angle_rate = float(gyro_y) * -1000.0 / 32768.0; // degrees/s
float gyro_angle_change = gyro_angle_rate * delta_t / 1000; // degrees
est_angle = ((SMOOTHING_FACTOR-1) * (est_angle + gyro_angle_change) + accel_angle) / SMOOTHING_FACTOR;
if (motor_on && (est_angle > 30 || est_angle < -30)) {
last_err = 0;
i_err = 0;
motor_coast();
motor_on = false;
}
float err = est_angle - TARGET_ANGLE;
if (motor_on) {
float d_err = (err - last_err) * 1000 / delta_t;
last_err = err;
i_err = i_err + err * delta_t;
float u = err * KE + d_err * KD + i_err * KI;
motor_out = u;
motor_move(motor_out);
} else if (err <= 5 && err >= -5) {
vertical_count = vertical_count + 1;
if (vertical_count > 100) {
vertical_count = 0;
last_err = 0;
i_err = 0;
motor_on = true;
}
} else if (err <= 5 && err >= -5) {
motor_on = true;
}
last_time = current_time;
}

October 31, 2024

Balancing bot 2024

Recently, I bought a new 3D printer. My 10+ year old printer was long gone. Getting it to print properly was hard, so hard that I stopped using it. Is the new printer any better?


Well, since my balancing bot from 2017 has fallen apart, let's rebuild it!



It has been interesting coming back to this toy after seven years. Surprisingly, all the electronics on the bot still worked. The soldered daughter board remains solid despite all the physical shocks from the bot falling down. An FS90R servo had a broken plastic gear, so that got replaced. The servos are now powered by 4x AA rechargeable batteries. The code is the same except for some tweaks to the PID parameters. The code did not age at all.

Redesigning all the parts took some time. I had certain changes in mind when redesigning the bot. I wanted the new frame to be bigger so that the parts are spread out and accessible. All parts, especially the batteries, must be securely attached.  

Printing was the easiest part. The printer just works out of the box. I could concentrate on designing instead of tinkering with the printer to get it to work. It is a reliable workhorse, just as it should be.

So, I have been making stuff, including yet another balancing bot: