Creating Lazy: Async Iterators

Creating Lazy: Async Iterators

In general Tim fashion, I brought in the new year by working on a new npm package. This one dealing with a subject that I am really excited about in JavaScript: async generators.

After working in Elixir and Haskell over the break, I’ve come to love the idea of expressing data as infinite lists of things. I’m sure this is old news to literally everyone other than me, but jeeze, lazy iterators are the bees knees.

lazy what?

Before we get into it all, let’s make sure that we’re all speaking the same language. I know words might not mean what I think they mean so let’s clear up any confusion that might be lingering.

When I talk about iteration, I am talking about walking over some thing and doing something for each time you take a step. In synchronous, regular JavaScript, iterating might look like this

const list = [1, 2, 3];

for (let i = 0; i < list.length; i++) {
  console.log(list[i]);
}

We iterate over the list, logging the value at each index or iteration.

I wager to say that all programming can be thought of as iterating over lists and doing something for each iteration. In fact, there’s languages devoted to nothing but LISt Processing

We can wrap these iterations and their effect behind different functions with names that mean something to people

const above1 = doubled.filter(n => n > 1);
// [2, 3]
const doubledAbove1 = above1.map(n => n * 2);
// [4, 6]

We can even compose those iterations, if we abstract them enough

const map = fn => list => list.map(fn);
const filter = fn => list => list.filter(fn);
const compose = (b, a) => value => b(a(v));

const above1AndDoubled = compose(
  map(n => n * 2),
  filter(n => n > 1)
);

above1AndDoubled(list); // [4, 6]

This works and it makes everyone happy and we can move along with our day.

But something’s lingering underneath the surface. Something’s festering, only to be revealed when your list gets big. Can you see it?

When we compose (map, filter), we have to build the entire filter array before we get a chance to ask map to do its thing. Put in another way, for each array that we pass that composed function, it will build 2 more arrays, one from filter, and then again one from map.

What we need instead is a way for us to say "I want you to pass items to map as soon as they pass filter". We want a way to let both functions be ignorant of the other but still be able to pass values to each other whenever.

Iterators

We can achieve this by using the iterator protocol, abstracting away the idea of an array while gaining the benefit of what transducers offer for free

/**
 * Using any
 */
const map = function*(fn, iter) {
  for (let v of iter) {
    yield fn(v);
  }
};

const filter = function*(pred, iter) {
  for (let v of iter) {
    if (pred(v)) {
      yield v;
    }
  }
};

const list = [1, 2, 3];

const iterator = map(
  n => {
    console.log("mapping %s", n);

    return n * 2;
  },
  filter(n => {
    console.log("filtering %s", n);

    return n > 1;
  }, list)
);

for (let v of iterator) {
  console.log(v);
}

// filtering 1
// filtering 2
// mapping 2
// 4
// filtering 3
// mapping 3
// 6

We call the value we are iterating over iter to express that they can be any type, as long as they express the iterator protocol. This means we can pass in a Map, Set, or any other built-in to those functions and they will work the exact same.

One of the benefits of thinking in iterators instead of arrays or maps is that we can iterate over both without knowing its type, meaning that we can create functions that manipulate those iterations without those functions being tied to a specific class/type and instead working on any algebraic type or interface.

This abstraction away from the specific data type/class and towards the algebraic type might be something that we should pay attention to.

but my function deals with promises!

I know, like, how in the world can we do anything with this idea if it has to be synchronous? If we have to yield, how can we deal with promises?

Async Iterators! at our service!

As a quick refresher, async/await looks like this

// Given some Promise
const prom = Promise.resolve({});

// Calling then
prom.then(console.log); // {}

// Is the same as
(async () => {
  // saying await inside of
  // an async function
  const val = await prom;
  console.log(val); // {}
})();

And using the for-await-of syntax, we can simply say the following

// given some _iterable_ of
// promises
const asyncList = [Promise.resolve(1), Promise.resolve(2)];

(async () => {
  // we can iterate over it
  // and _await_ the value
  for await (let v of asyncList) {
    console.log(v); // 1, 2
  }
})();

And, to make it so that our worker can map and filter async values, we can update map and filter to be async generator functions

const map = async function*(fn, iter) {
  for await (let v of iter) {
    yield fn(v);
  }
};

const filter = async function*(pred, iter) {
  for await (let v of iter) {
    if (pred(v)) {
      yield v;
    }
  }
};

const iterator = map(
  n => n * 2,
  filter(n => n > 1, new Set([Promise.resolve(1), Promise.resolve(2)]))
);

(async () => {
  for await (let v of iterator) {
    console.log(v); // 4
  }
})();

but…how?

Using all of that, let’s write some more helpers in order to show us the power that we just unleashed

// Create an iterator that
// creates values from start to end
// and allows you to transform each one
const range = async function*(start, end = Infinity, mapper = a => a) {
  let i = start;
  while (i <= end) {
    yield mapper(i++);
  }
};

// Take some number of items
// from the iterator
const take = async function*(num, iter) {
  let remaining = num;
  for await (let v of iter) {
    if (!remaining) {
      break;
    }

    yield v;
    remaining--;
  }
};

Given those two above, we can write some code that will crash our system and some code that will make us happy:

Note: This will not stop. Do not copy/paste unless you can kill the process

const infiniteRange = range(1)

for await (let v of infiniteRange) {
    console.log(v) //  CRASH!
}

Because we didn’t give it an end, it will continually keep logging numbers, increasing by one each time.

Let’s instead take the first 10

const first5 = take(5, infiniteRange)

for await (let v of first5) {
    console.log(v) // 1, 2, 3, 4, 5
}

Even though the infiniteRange value is, well, infinite, that doesn’t mean that our code has to hold the entire thing in its memory. Instead, we can generate the next value whenever we need to.

We can even skip some values from the infinite range and compose it with take to get some values from the middle of an infinite range

const range = async function*(start, end = Infinity, mapper = a => a) {
    let i = start;
    while (i <= end) {
        yield mapper(i++);
    }
}

const take = async function*(num, iter) {
    let remaining = num;
    for await (let v of iter) {
        if (!remaining) {
            break;
        }

        yield v;
        remaining--;
    }
}
const skip = async function* (num, iter) {
    let remaning = num;
    for await (let v of iter) {
        if (remaning) {
            remaning--;
            continue;
        }

        yield v;
    }
}

const infiniteRange = range(1)
const the20s = take(5, skip(20, infiniteRange))

for await (let v of the20s) {
    console.log(v) // 21, 22, 23, 24, 25
}

lazy, lazy, lazy

While the code looks synchronous, and it appears as if we have some infinite list in our worker’s memory, in fact we are only creating or generating values whenever we specifically need them, in an asynchronous and lazy way.

This opens up an entire world of possibilities. We are able to reason about things that haven’t happened yet without worrying about in what shape their container will be in.

As an example, let’s suppose that we have some event-emitter, like a websocket and we want to iterate over its values in a specific part of the code

const ws = new Websocket('ws://...')

const socketIter = async function* (socket) {
    // Look at all these `let` statements!
    // SO MUCH STATE!
    //
    // BUT! It's all hidden behind a function
    // and no one else can touch it so we are
    // free to do _whatever_ we want to it
    let resolve
    let prom = new Promise(r => resolve = r)
    let connected = true
    socket.on('message', message => {
        resolve(message) // send message to iterator
        prom = new Promise(r => resolve = r) // haxors
    })

    socket.on('disconnect', () => connected = false)

    while(connected) {
        yield prom // this will wait for the resolve to be called
    }
}

const socket = socketIter(ws)

for await (let message of socket) {
    // do something with the message!
}

Fin

We have only touched the surface of what async iterators and for-await-of can do. If you want to mess around without writing the map and filter functions yourself, take a look at lazy, a utility belt for handling and creating async iterators. If you have any corrections that you feel must be made, create a PR on the repo that hosts this codebase!