November 18, 2025

Fast Reed Solomon erasure code in C

C version

Clearly, the Python Reed Solomon erasure code library in the previous post isn't meant to be fast. 

Now, let's build a fast C version.

  • Convert the code to C line by line.
  • Inline calls to gf256_add() and gf256_sub() functions. They are just xors.
  • Optimize the multiplication in compute_chunks() into a table lookup. One output byte now requires only K table lookups and K xors.
  • Unroll the critical inner loop in compute_chunks() 8 ways to do 64 bit load and stores. This unrolling results in a 2x speed up.
  • Rearrange the three loops in compute_chunks() to read the input chunks once only.
  • The two inner loops can be swapped. If number of output chunks is small (like 3), it is slightly faster to loop over the output chunks as the innermost loop.


// cc -O3
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <sys/time.h>

typedef uint8_t byte;
typedef byte *bytearray;
typedef uint64_t b8;

// GF(2**8)
byte gf256_log[256]; // zeroth entry is invalid
byte gf256_antilog[255]; // yes, only 255 entries
byte *gf256_mul_table = NULL; // [256*256] row*column multiplication table

byte gf256_slow_mul(byte x, byte y) { // long multiplication
  byte r = 0;
  while (y > 0) {
    if (y&1)
      r ^= x; // add
    if (x&0x80)
      x = (x<<1)^0x1b; // reduce by polynomial 0x11b
    else
      x <<= 1;
    y >>= 1;
  }
  return r;
}

void gf256_init_tables(void) {
  byte z = 1;
  for (byte x = 0; x < 255; x++) {
    assert(z != 0);
    gf256_antilog[x] = z;
    gf256_log[z] = x;
    z =  gf256_slow_mul(z, 3);
  }
  gf256_mul_table = malloc(256*256);
  for (int r = 0; r < 256; r++) {
    for (int c = 0; c < 256; c++)
      gf256_mul_table[(r<<8)+c] = gf256_slow_mul(r, c);
  }
}

byte gf256_div(byte x, byte y) {
  assert(y != 0);
  if (x == 0)
    return 0;
  return gf256_antilog[((int) gf256_log[x] - gf256_log[y] + 255) % 255];
}

byte gf256_mul(byte x, byte y) {
  return gf256_mul_table[(x<<8)+y];
}

void lag_interp_weights(const int K, const byte *xs, const byte x, byte **w) {
  // assume len(xs) == K
  // assume len(w) == K
  for (int i = 0; i < K; i++) {
    int t = 1;
    for (int j = 0; j < K; j++) {
      if (i != j) 
        t = gf256_mul(t, gf256_div((x ^ xs[j]), xs[i] ^ xs[j]));
    }
    w[i] = &gf256_mul_table[t<<8];
  }
}

void compute_chunks(const int K, const byte *xs, const int L, const bytearray *chunks, const int T, const byte *ts, const bytearray *results) {
  // assume len(xs) == K
  // assume len(chunks) == K
  // assume len(chunks[i]) == L for i = 0 to K-1
  // assume len(ts) == T
  // assume len(results) == T
  // assume len(results[i]) == L for i = 0 to T-1
  // assume not x in xs for x in ts
  assert(L % 8 == 0);
  bytearray inputs[K];
  byte *weights[T][K];
  for (int i = 0; i < K; i++)
    inputs[i] = chunks[xs[i]];
  for (int j = 0; j < T; j++)
    lag_interp_weights(K, xs, ts[j], weights[j]);
  {
    bytearray inp = inputs[0];
    for (int z = 0; z < L; z += 8) {
      const b8 dd = * (b8*) &inp[z];
      const byte d1 = dd;
      const byte d2 = dd >> 8;
      const byte d3 = dd >> 16;
      const byte d4 = dd >> 24;
      const byte d5 = dd >> 32;
      const byte d6 = dd >> 40;
      const byte d7 = dd >> 48;
      const byte d8 = dd >> 56;
      for (int j = 0; j < T; j++) {
        const byte *w = weights[j][0];
        * (b8*) &results[j][z] = (b8) w[d1] | (b8) w[d2] << 8 | (b8) w[d3] << 16 | (b8) w[d4] << 24 | (b8) w[d5] << 32 | (b8) w[d6] << 40 | (b8) w[d7] << 48 | (b8) w[d8] << 56;
      }
    }
  }
  for (int i = 1; i < K; i++) {
    bytearray inp = inputs[i];
    for (int z = 0; z < L; z += 8) {
      const b8 dd = * (b8*) &inp[z];
      const byte d1 = dd;
      const byte d2 = dd >> 8;
      const byte d3 = dd >> 16;
      const byte d4 = dd >> 24;
      const byte d5 = dd >> 32;
      const byte d6 = dd >> 40;
      const byte d7 = dd >> 48;
      const byte d8 = dd >> 56;
      for (int j = 0; j < T; j++) {
        const byte *w = weights[j][i];
        * (b8*) &results[j][z] ^= (b8) w[d1] | (b8) w[d2] << 8 | (b8) w[d3] << 16 | (b8) w[d4] << 24 | (b8) w[d5] << 32 | (b8) w[d6] << 40 | (b8) w[d7] << 48 | (b8) w[d8] << 56;
      }
    }
  }
}

double now() {
  struct timeval tv;
  gettimeofday(&tv, 0);
  return (double) tv.tv_sec + tv.tv_usec / 1000000.0;
}

void *alloc_buf(size_t size) {
  return malloc(size);
}

void dealloc_buf(void *p, size_t size) {
  free(p);
}

void encode_chunks(const int K, const int N, const int L, bytearray *chunks) {
  // assume len(chunks) == N.
  // assume len(chunks[x]) == L for x = 0 to K-1
  // assume chunk[x] == NULL, for x = K to N-1
  // computed chunks are stored in chunks[x] for x = K to N-1
  assert(K > 0 && K < N && N <= 256);
  byte xs[K];
  for (int i = 0; i < K; i++)
    xs[i] = i;
  const int T = N-K;
  byte ts[T];
  bytearray results[T];
  for (int t = 0; t < T; t++) {
    const int x = K+t;
    ts[t] = x;
    results[t] = alloc_buf(L);
    chunks[x] = results[t];
  }
  compute_chunks(K, xs, L, chunks, T, ts, results);
}

void decode_chunks(const int K, const int N, const int L, bytearray *chunks) {
  // assume len(chunks) == N.
  // assume at least K non-NULL chunks[x] for x = 0 to N-1
  // assume len(chunks[x]) == L for x = 0 to N-1 if chunks[x] is not NULL
  // computed chunks are stored in chunks[x] for x = 0 to K-1 if chunks[x] is NULL
  assert(K > 0 && K < N && N <= 256);
  byte xs[K];
  int j = 0;
  for (int x = 0; x < N && j < K; x++) {
    if (chunks[x] != NULL)
      xs[j++] = x;
  }
  assert(j == K);
  byte ts[N-K];
  bytearray results[N-K];
  int T = 0;
  for (int x = 0; x < K; x++) {
    if (chunks[x] == NULL) {
      ts[T] = x;
      results[T] = alloc_buf(L);
      chunks[x] = results[T];
      T += 1;
    }
  }
  compute_chunks(K, xs, L, chunks, T, ts, results);
}

// --- tests below ---
byte *temp = NULL;
const int TEMP_SIZE = 256*1024*1024;
void setup_temp() {
  temp = malloc(TEMP_SIZE);
}

void clear_cache() {
  for (int i = 0; i < TEMP_SIZE; i++) {
    temp[i] = i;
  }
  printf("Cache cleared\n");
}

void gen_data(const int K, const int N, const int L, bytearray *p) {
  srand(1);
  for (int i = 0; i < N; i++)
    p[i] = NULL;
  for (int i = 0; i < K; i++) {
    bytearray d = p[i] = alloc_buf(L);
    for (int j = 0; j < L; j++)
      d[j] = rand();
  }
}

int chunks_equal(const int K, const int L, const bytearray *chunks1, const bytearray *chunks2) {
  for (int x = 0; x < K; x++) {
    if (chunks1[x] != chunks2[x] &&
        memcmp(chunks1[x], chunks2[x], L) != 0) {
        printf("Chunks %d are not equal", x);
        return 0;
    }
  }
  return 1;
}

void test_random(const int K, const int N, const int L, const int encode_rounds, const int decode_rounds) {
  bytearray **test_chunks = malloc(encode_rounds*sizeof(bytearray*));
  for (int r = 0; r < encode_rounds; r++) {
    test_chunks[r] = malloc(N*sizeof(bytearray));
    gen_data(K, N, L, test_chunks[r]);
  }
  clear_cache();
  double estart = now();
  for (int r = 0; r < encode_rounds; r++) {
    encode_chunks(K, N, L, test_chunks[r]);
  }
  double encode_time = now() - estart;
  printf("Encode time: %fs %d rounds %fs/round  input rate: %.3fMB/s\n", encode_time, encode_rounds, encode_time / encode_rounds, (double) K*L*encode_rounds/1024/1024/encode_time);
  clear_cache();
  double dstart = now();
  srand(2);
  for (int round = 0; round < decode_rounds; round++) {
    bytearray *chunks = test_chunks[round % encode_rounds];
    bytearray temp[N];
    for (int i = 0; i < N; i++)
      temp[i] = NULL;
    for (int i = 0; i < K; i++) {
      int x = 0;
      do {
        x = rand() % N;
      } while (temp[x] != NULL);
      temp[x] = chunks[x];
    }
    decode_chunks(K, N, L, temp);
    assert(chunks_equal(K, L, chunks, temp));
    for (int i = 0; i < K; i++) {
      if (temp[i] != NULL && temp[i] != chunks[i])
        dealloc_buf(temp[i], L);
    }
  }
  double decode_time = now() - dstart;
  printf("Decode time: %fs %d rounds %fs/round  input rate: %.3fMB/s\n", decode_time, decode_rounds, decode_time / decode_rounds, (double) K*L*decode_rounds/1024/1024/decode_time);
  printf("random_%d_%d_%d succeeded\n", K, N, L);
  for (int r = 0; r < encode_rounds; r++) {
    for (int i = 0; i < N; i++)
      dealloc_buf(test_chunks[r][i], L);
    dealloc_buf(test_chunks[r], N*sizeof(bytearray));
  }
  dealloc_buf(test_chunks, encode_rounds*sizeof(bytearray*));
}

void start_comb(const int K, int *v) {
  for (int i = 0; i < K; i++)
    v[i] = i;
}

int next_comb(const int K, const int N, int *v) {
  int x = K-1;
  while (x >= 0) {
    if (v[x] < N-1) {
      v[x] += 1;
      for (int j = x+1; j < K; j++)
        v[j] = v[j-1] + 1;
      if (v[K-1] < N)
        return 0;
    }
    x -= 1;
  }
  return 1;
}

void test_all(const int K, const int N, const int L) {
  bytearray chunks[N];
  gen_data(K, N, L, chunks);
  clear_cache();
  double estart = now();
  encode_chunks(K, N, L, chunks);
  double encode_time = now() - estart;
  printf("Encode time: %fs  input rate: %.3fMB/s\n", encode_time, (double) K*L/1024/1024/encode_time);
  clear_cache();
  double dstart = now();
  int valid[K];
  int count = 0;
  int done = 0;
  start_comb(K, valid);
  while (!done) {
    bytearray temp[N];
    for (int i = 0; i < N; i++)
      temp[i] = NULL;
    for (int i = 0; i < K; i++) {
      temp[valid[i]] = chunks[valid[i]];
    }
    count += 1;
    decode_chunks(K, N, L, temp);
    assert(chunks_equal(K, L, chunks, temp));
    done = next_comb(K, N, valid);
    for (int i = 0; i < K; i++) {
      if (temp[i] != NULL && temp[i] != chunks[i])
        dealloc_buf(temp[i], L);
    }
  }
  double decode_time = now() - dstart;
  printf("Decode time: %fs %d combinations %fs/combination  input rate: %.3fMB/s\n", decode_time, count, decode_time / count, (double) K*L*count/1024/1024/decode_time);
  printf("Encode time plus decode time: %f\n", encode_time + decode_time);
  printf("all_%d_%d_%d %d combinations succeeded\n", K, N, L, count);
  for (int i = 0; i < N; i++)
    dealloc_buf(chunks[i], L);
}

int main() {
  gf256_init_tables();
  setup_temp();
  
  test_random(16, 24, 65536, 100, 100);
  test_random(16, 24, 1024*1024, 100, 10);
  test_random(17, 20, 64*1024*1024, 4, 1);
  test_random(17, 20, 1024*1024, 100, 100);
  test_random(17, 20, 200*1024, 100, 100);

  test_all(17, 20, 1024);
  test_all(10, 16, 1024);
  test_all(16, 24, 16);
  test_all(16, 24, 1024);

  return 0;
}



Performance


Python version, M2 MacBook Air,:

K=16, N=24, L=64KB (1MB) encoding time 1.7s. Input rate: 0.58 MB/s.
K=16, N=24, L=1MB (16MB) encoding time 28s. Input rate: 0.57 MB/s.
K=17, N=20, L=64MB (1088MB) encoding time 722s. Input rate: 1.5 MB/s.
K=17, N=20, L=1MB (17MB) encoding time 11.2s. Input rate: 1.5 MB/s.
K=17, N=20, L=200KB (3.3MB) encoding time 2.1s. Input rate: 1.58 MB/s.

test_all(17, 20, 1024) # 1140 combinations. 17KB input. 10s.
test_all(10, 16, 1024) # 8008 combinations. 10KB input. 1min.
test_all(16, 24, 16) # 735471 combinations. 256B input. 9.5min.
test_all(16, 24, 1024) # 735471 combinations. 16KB input. 5hours.


C version, M2 MacBook Air:

K=16, N=24, L=64KB (1MB) encoding time 0.0016s. Input rate: 625 MB/s.
K=16, N=24, L=1MB (16MB) encoding time 0.03s. Input rate: 510 MB/s.
K=17, N=20, L=64MB (1088MB) encoding time 0.66s. Input rate: 1650 MB/s.
K=17, N=20, L=1MB (17MB) encoding time 0.0097s. Input rate: 1750 MB/s.
K=17, N=20, L=200KB (3.3MB) encoding time 0.0018s. Input rate: 1800 MB/s.

test_all(17, 20, 1024) # 1140 combinations. 17KB input. 0.01s.
test_all(10, 16, 1024) # 8008 combinations. 10KB input. 0.058s.
test_all(16, 24, 16) # 735471 combinations. 256B input. 1.47s.
test_all(16, 24, 1024) # 735471 combinations. 16KB input. 12.3s.

C version, 2018 Intel i7 Mac Mini:

K=16, N=24, L=64KB (1MB) encoding time 0.0038s. Input rate: 260 MB/s.
K=16, N=24, L=1MB (16MB) encoding time 0.063s. Input rate: 255 MB/s.
K=17, N=20, L=64MB (1088MB) encoding time 1.4s. Input rate: 770 MB/s.
K=17, N=20, L=1MB (17MB) encoding takes 0.021s. Input rate: 790 MB/s.
K=17, N=20, L=200KB (3.3MB) encoding time 0.0039s. Input rate: 840 MB/s.

test_all(17, 20, 1024) # 1140 combinations. 17KB input. 0.023s.
test_all(10, 16, 1024) # 8008 combinations. 10KB input. 0.12s.
test_all(16, 24, 16) # 735471 combinations. 256B input. 3.3s.
test_all(16, 24, 1024) # 735471 combinations. 16KB input. 23.9s.


Is this fast enough for production use?

Yes! Only requires a single CPU core to keep up with input data coming in from a 10 gigabit network port. This is sufficient for building a cloud based data storage system.

For comparison, Backblaze open-sourced their Reed Solomon library 10 years ago. Their code is in Java and their benchmark machines are Xeon E5-1620 v2, clocked at 3.70GHz. The chart on their github page  shows their performance for K=17 N=20 with 200KB chunks. 

Our equivalent test runs at 1800 MB/s on a M2 MacBook Air.

So, with around 200 lines of plain C code (excluding tests), we have implemented a fast Reed Solomon erasure code library!

Conclusion


None of this is novel or new. 

Reed Solomon codes are usually presented in other forms and usually for error detection and error correction. The resulting math is complicated and the decoding routines are complex. When used for error detection,  it can handle up to floor((N-K)/2) errors.

Here, we are only using Reed Solomon coding for handling missing data (erasure). We can use a separate checksum per chunk for error detection and treat error chunks as missing data. It is simple and cheap to add per chunk checksum. Used in this way, it can handle N-K missing or error chunks. It achieves the same results but skips the complexity.


Simple. Maintainable. Fast.



November 09, 2025

Systematic Reed Solomon erasure code in Python

In part 1, I've visually demonstrated a version of the Reed Solomon erasure code.

Let's turn that into a little library to encode and decode chunks of data. It should be something that can be used in a large scale cloud data storage system with millions of storage devices.

API

encode_chunks(K, N, chunks)

K is the number of input chunks. 

N is the total number of chunks wanted.

chunks is a dictionary containing the data to encode. The keys has to be exactly 0 to K-1. The values should be a list of numbers, each between 0 to 255. All the chunks must be of the same length L.

The function will return a dictionary of N-K redundant chunks in the same format as chunks. The keys are from K to N-1.

decode_chunks(K, N, chunks)

K is the number of chunks.

N is the total number of chunks.

chunks is a dictionary of at least K out of N chunks. The keys has to be from 0 to N-1

The function returns a dictionary containing all missing chunks between 0 to K-1.

The Code

The code from the previous post looks slow. There are about 2*K*K "multiplications/divisions" and 3*K "additions/subtractions" for each output byte in the redundant chunks. Well, the code was written that way to match the mathematical equation of Lagrange Interpolation and to plot the curves. Let's make that faster.

Notice that we get to pick the x's for the polynomials. The x's are the keys for the chunks and result dictionaries. It is 0 to K-1 for the input chunks and K to N-1 for the redundant chunks. We loop through L slices of the chunks (one byte from each chunk) using the same x's. We extract this loop invariant part into lag_interp_weights(xs, x). This returns a vector of weights for the y's. These weights are actually the coefficients of the Lagrange polynomials.

For each redundant chunk, we compute this vector of weights once. Then we take a slice (one byte from each of the K chunks at the same offset) as the y values,  multiply each y with the corresponding weight in the vector and sum them together. In other words, the output byte for the slice is the dot product of the weight vector and a slice of the K chunks. Do that for all L slices to compute the redundant chunk. We only need to do K "multiplications" and K "additions" for each output byte.

Decoding the chunks is similar to encoding the chunks. As long as we have K of the N chunks, we use the keys for those chunks as the x's, use the same math to compute the data for any missing key between 0 to K-1

Here's the code, including a test function test_random() to check that it actually works.


import random
import time

# GF(2**8)
gf256_log = [0]*256 # zeroth entry is invalid
gf256_antilog = [0]*255 # yes, only 255 entries

def gf256_add(x, y):
    return x^y

gf256_sub = gf256_add

def gf256_slow_mul(x, y): # long multiplication
    r = 0
    while y>0:
        if y&1:
            r ^= x # add
        if x&0x80:
            x = (x<<1)^0x11b # reduce by polynomial
        else:
            x <<= 1
        y >>= 1
    return r

def gf256_init_tables():
    z = 1
    for x in range(255):
        gf256_antilog[x] = z
        gf256_log[z] = x
        z =  gf256_slow_mul(z, 3)

def gf256_div(x, y):
    assert y != 0
    if x == 0:
        return 0
    return gf256_antilog[(gf256_log[x] - gf256_log[y])]

def gf256_mul(x, y):
    if x == 0 or y == 0:
        return 0
    return gf256_antilog[(gf256_log[x] + gf256_log[y]) % 255]

gf256_init_tables()

def lag_interp_weights(xs, x):
    n = len(xs)
    w = n * [0]
    for i in range(n):
        t = 1
        for j in range(n):
            if i != j:
                t = gf256_mul(t,
                              gf256_div(gf256_sub(x, xs[j]),
                                        gf256_sub(xs[i], xs[j])))
        w[i] = t
    return w

def compute_chunks(K, xs, ts, chunks):
    assert K == len(xs)
    results = {}
    L = len(chunks[xs[0]])
    inputs = [chunks[xs[i]] for i in range(K)]
    for t in ts:
        assert not t in xs
        w = lag_interp_weights(xs, t)
        r = L * [0]
        for z in range(L):
            y = 0
            for i in range(K):
                y = gf256_add(y, gf256_mul(w[i], inputs[i][z]))
            r[z] = y
        results[t] = r
    return results

def encode_chunks(K, N, chunks):
    assert K > 0 and K < N and N <= 256
    L = len(chunks[0])
    for z in range(K):
        assert len(chunks[z]) == L
    xs = list(range(K))
    return compute_chunks(K, xs, list(range(K, N)), chunks)

def decode_chunks(K, N, chunks):
    assert K > 0 and K < N and N <= 256
    xs = []
    for x in range(N):
        if x in chunks:
            xs.append(x)
        if len(xs) >= K:
            break
    assert len(xs) == K
    ts = [x for x in range(K) if not x in chunks]
    return compute_chunks(K, xs, ts, chunks)

def recover_original_chunks(K, N, chunks):
    temp = decode_chunks(K, N, chunks)
    temp.update(chunks)
    for x in [j for j in temp.keys() if j >= K]:
        del temp[x]
    return temp

def test_random(K, N, L, rounds):
    chunks = gen_data(K, L)
    rr = random.Random(2)
    estart = time.time()
    enc = encode_chunks(K, N, chunks)
    encode_time = time.time() - estart;
    print(f"Encode time: {encode_time:f}s  input rate: {K*L/1024/1024/encode_time:.3f}MB/s")
    enc.update(chunks)
    dstart = time.time()
    for round in range(rounds):
        valid = list(range(N))
        while len(valid) > K:
            valid.pop(rr.randint(0, len(valid)-1))
        temp = {}
        for x in valid:
            temp[x] = enc[x]
        r = recover_original_chunks(K, N, temp)
        assert r == chunks, f"Recovered chunks are not equal the original chunks: {valid}"
    decode_time = time.time() - dstart;
    print(f"Decode time: {decode_time:f}s {rounds} rounds {decode_time/rounds:f}s/round  input rate: {K*L*rounds/1024/1024/decode_time:.3f}MB/s")
    print(f"random_{K}_{N}_{L} succeeded")

def gen_data(K, L):
    rr = random.Random(1)
    chunks = { x: list(rr.randbytes(L)) for x in range(K) }
    return chunks

test_random(16, 24, 65536, 1)

That's all. Just over 100 lines of Python code not counting the test functions.

No solving of systems of linear equations. No Vandermoonde matrix. No Gaussian elimination over Galios Fields to invert matrices. No dependencies. Just plain Python.

With K=16, N=24, encoding 1 MB (16 chunks of 64 KB) takes 1.7s (600 KB/s) on my M2 MacBook Air.

With K=17, N=20, encoding 17 MB (17 chunks of 1 MB) takes 11.2s (1.5 MB/s).

I have not seen an erasure code algorithm that does less work than one dot product per output byte.
Certainly, I haven't found one that is so easy to code!


Update 2:


Update:

I added test_all() to test that the data can be recovered from all possible N-choose-K subsets of chunks.


def start_comb(K, v):
    for i in range(K):
        v[i] = i

def next_comb(K, N, v):
    x = K-1
    while x >= 0:
        if v[x] < N-1:
            v[x] += 1
            for j in range(x+1, K):
                v[j] = v[j-1] + 1
            if v[K-1] < N:
                return False
        x -= 1
    return True

def test_all(K, N, L):
    chunks = gen_data(K, L)
    estart = time.time()
    enc = encode_chunks(K, N, chunks)
    encode_time = time.time() - estart;
    print(f"Encode time: {encode_time:f}s  input rate: {K*L/1024/1024/encode_time:.3f}MB/s")
    enc.update(chunks)
    dstart = time.time()
    valid = K * [0]
    count = 0
    done = False
    start_comb(K, valid)
    while not done:
        temp = {}
        count += 1
        for x in valid:
            temp[x] = enc[x]
        r = recover_original_chunks(K, N, temp)
        assert r == chunks, f"Recovered chunks are not equal the original chunks: {valid}"
        done = next_comb(K, N, valid)
    decode_time = time.time() - dstart;
    print(f"Decode time: {decode_time:f}s {count} combinations {decode_time/count:f}s/combination  input rate: {K*L*count/1024/1024/decode_time:.3f}MB/s")
    print(f"Encode time plus decode time: {encode_time+decode_time:f}s")
    print(f"all_{K}_{N}_{L} {count} combinations succeeded")

test_all(17, 20, 1024) # 1140 combinations. 17KB input. 10 seconds.
test_all(10, 16, 1024) # 8008 combinations. 10KB input. 1 minute.
test_all(16, 24, 16) # 735471 combinations. 256B input. 9.5 minutes.
test_all(16, 24, 1024) # 735471 combinations. 16KB input. 5 hours.


November 03, 2025

Systematic Reed Solomon erasure code

Problem

Given K bytes (message), compute N bytes (encoded message or codeword) such that the original K bytes can be recovered from any K subset of the N bytes. Assume we are only dealing with missing data, not corrupted data.

Motivation

Why is this useful? There are many practical applications of for this. It can be used to build a data storage system that tolerates N-K storage node failures with only N/K times the space. Over unreliable messaging systems, a message can be sent in N parts and then can be recovered by the receiver if any K out of N parts are received. The new Signal protocol proposes to do that in parts of the protocol.

How?

Conceptually, we can fit a curve through K points with a unique polynomial of degree less than K. Pick another N-K points along the curve. Together, we have N points on the same curve and any K points can be used to recalculate the same curve and recover any of the missing original K points.

Let's pick the output of the polynomial at x = 1 to K to be the K bytes in the message. The encoded message is the output of the polynomial at x = 1 to N. The first K bytes of the encoded message will be identical to the message (systematic). This is a useful property. When bytes 1 to K are not missing, no work needs to be done. If any of these bytes are missing, recompute only those bytes.

One way of doing this does not require explicit calculation of the coefficients of the polynomial directly. With K points (x and y values) on the curve, use Lagrange Interpolation to calculate the value of the polynomial at any x value.

Let's turn that into working code in plain Python:

def lagrange_interpolate(xs, ys):
    def f(x):
        n = len(xs)
        y = 0.0
        for i in range(n):
            t = ys[i]
            for j in range(n):
                if i != j:
                    t *= (x - xs[j]) / (xs[i] - xs[j])
            y += t
        return y
    assert len(xs) == len(ys)
    return f

K = 4
N = 6

a = list(range(1, K+1))
b = [82, 233, 217, 241] # message

poly = lagrange_interpolate(a, b) # polynomial

c = list(range(1, N+1))
d = [poly(x) for x in c] # encoded message
assert b == d[:K] # first K bytes are identical to message

e = c.copy()
f = d.copy()
for i in [3, 1]: # drop some data
    e.pop(i)
    f.pop(i)

poly2 = lagrange_interpolate(e[:K], f[:K]) # recalculate polynomial

g = list(range(1, K+1))
h = [round(poly2(x)) for x in g] # recovered original message

assert b == h, "Recovered message is not equal the original message"

if 1: # plot the data               
    import matplotlib.pyplot as plt
    cc = [x/10 for x in range(0, 100*N)]
    dd = [poly(x) for x in cc] # for plotting                                                                                                                                                                   gg = [x/10 for x in range(0, 100*N+1)]
    hh = [poly2(x) for x in gg] # for plotting                                                                                                                                                                   fig, axs = plt.subplots(4)
    axs[0].scatter(a, b, color="blue")
    for i in range(len(a)):
        axs[0].text(a[i], b[i], b[i])
    axs[1].scatter(cc, dd, color="pink")
    axs[1].scatter(a, b, color="blue")
    axs[1].scatter(c, d, color="red")
    for i in range(len(c)):
        axs[1].text(c[i], d[i], round(d[i]))
    axs[2].scatter(cc, dd, color="pink")
    axs[2].scatter(e, f, color="green")
    for i in range(len(e)):
        axs[2].text(e[i], f[i], round(f[i]))
    axs[3].scatter(gg, hh, color="yellow")
    axs[3].scatter(g, h, color="brown")
    for i in range(len(g)):
        axs[3].text(g[i], h[i], h[i])
    axs[0].set_title("Message")
    axs[1].set_title("Encoded message")
    axs[2].set_title("Encoded message with dropped data")
    axs[3].set_title("Recovered message")
    for ax in axs:
        ax.set_ylim([-500, 1500])
        ax.set_xlim([ 0, N+1])
        if ax != axs[-1]:
            ax.get_xaxis().set_visible(False)
    plt.subplots_adjust(hspace=0.5)
    plt.show()
The pink curve was calculated using the original message (blue) and the yellow curve is recalculated from the partially missing encoded message (green).
This works except that the encoded values are unlikely to be within the range of 0 to 255 if we choose x values less than 1 or greater than K. If we choose non-integer x values between 1 and K, then the encoded values are unlikely to be integers.

Instead of computing using real numbers, we switch to computing using Galois field (2**8). Think of it as a special type of "numbers" where "addition", "subtraction", "multiplication" and "division" are well defined (differently) and there are only 256 possible "numbers". Most importantly, all encoded message values are between 0 and 255, and Lagrange Interpolation only uses these four operations! That's a Reed Solomon code for handling missing data.

It is hard to believe that it is possible to define fancy math and then it would just work. Nothing beats a working example:
import random

# GF(2**8)
log_table = [0]*256 # zeroth entry is invalid
antilog_table = [0]*255 # yes, only 255 entries

def gf256_add(x, y):
    return x^y

gf256_sub = gf256_add

def gf256_smul(x, y): # long multiplication
    r = 0
    while y>0:
        if y&1:
            r ^= x # add
        if x&0x80:
            x = (x<<1)^0x11b # reduce by polynomial                                                                       
        else:
            x <<= 1
        y >>= 1
    return r

def init_tables():
    z = 1
    for x in range(255):
        antilog_table[x] = z
        log_table[z] = x
        z =  gf256_smul(z, 3)

def gf256_div(x, y):
    assert y != 0
    if x == 0:
        return 0
    return antilog_table[(log_table[x] - log_table[y])]

def gf256_mul(x, y):
    if x == 0 or y == 0:
        return 0
    return antilog_table[(log_table[x] + log_table[y]) % 255]

init_tables()

def lagrange_interpolate(xs, ys):
    def f(x):
        n = len(xs)
        y = 0
        for i in range(n):
            t = ys[i]
            for j in range(n):
                if i != j:
                    t = gf256_mul(t,
                                  gf256_div(gf256_sub(x, xs[j]),
                                            gf256_sub(xs[i], xs[j])))
            y = gf256_add(y, t)
        return y
    assert len(xs) == len(ys)
    return f

K = 20 # message length
N = 30 # encoded message length (message + redundancy) (<=255)

random.seed(324)

# a: 1..K. Fixed.
# b: message (K bytes)
a = list(range(1, K+1))
b = [random.randint(0, 255) for x in range(K)]

# c: 1..N. Fixed.
# d: encoded message (N bytes)
poly = lagrange_interpolate(a, b)
c = list(range(1, N+1))
d = [poly(x) for x in c]
assert b == d[:K] # first K bytes are identical to message

# e, f: simulate data loss by dropping random parts of c & d                                                      
e = c.copy()
f = d.copy()
while len(e) > K:
    i = random.randint(0, len(e)-1)
    e.pop(i)
    f.pop(i)

# g: 1..K. Fixed.                                                                 
# h: recovered message (K bytes)                                                               
assert(len(e) >= K)
poly2 = lagrange_interpolate(e[:K], f[:K])
g = list(range(1, K+1))
h = [poly2(x) for x in g]

assert b == h, "Recovered message is not equal the original message"

if 1:
    import matplotlib.pyplot as plt
    fig, axs = plt.subplots(4)
    axs[0].scatter(a, b, color="blue")
    axs[1].scatter(c, d, color="red")
    axs[2].scatter(e, f, color="green")
    axs[3].scatter(g, h, color="brown")
    axs[0].set_title("Message")
    axs[1].set_title("Encoded message")
    axs[2].set_title("Encoded message with dropped data")
    axs[3].set_title("Recovered message")
    for ax in axs:
        ax.set_ylim([0, 255])
        ax.set_xlim([ 0, N+1])
        if ax != axs[-1]:
            ax.get_xaxis().set_visible(False)
    plt.subplots_adjust(hspace=0.5)
    plt.show()
If you want to know how Galois field (2**8) math works, find an Abstract Algebra textbook or read section 2 of the Rijndael AES paper.

I hope somebody finds this useful!

Update:



March 17, 2025

Put your monitor 5 feet away

If we use our computer or laptop screen for 10 hours a day for decades, our eyes are going to optimize for that distance (1 to 1.5 feet). Over time, we are going to lose our distance vision and near vision.

This is the "use it or lose it" theory of how our body works. This happens for many muscles in our body so why wouldn't it happen with our eyes?

So, I set out to find out.

A few months ago, I set my 27 inch monitor about 5 feet away but left the fonts at the usual size. At first, it was a little uncomfortable. But now my eyes seem to have adapted to it. In fact, my far vision seems to have improved (tested against a mini eye chart at 10').

I am going to move the monitor further away gradually and also intentionally read books at a close distance.

Check back in a year.

Update 11/02/2025:

It has been more than six months. I can program in Emacs using 11 pt terminal font sitting 5 feet from a 24" screen. Even the eye charts at DMV are easy to read. 

Will I get my 20/20 vision back? Check back in a few years.

YMMV.