为之漫笔

尝鲜异步迭代器和生成器

为之漫笔 · 2017-06-01翻译 · 820阅读 原文链接 betsey审校中

Chrome、Edge和Safari支持流式抓取,类似这样:

async function getResponseSize(url) {
  const response = await fetch(url);
  const reader = response.body.getReader();
  let total = 0;

  while (true) {
    const {done, value} = await reader.read();
    if (done) return total;
    total += value.length;
  }
}

因为使用了异步函数(万一你不熟悉,可以参考这个教程),这段代码非常容易理解,但它仍然略显笨拙。

幸运的是,异步迭代器即将问世,能让这段代码更简洁:

async function getResponseSize(url) {
  const response = await fetch(url);
  let total = 0;

  for await (const chunk of response.body) {
    total += chunk.length;
  }
  return total;
}

Chrome Canary开始支持异步迭代器,但要使用以下选项启动它:--js-flags=--harmony-async-iteration。本文接下来就讲一步异步迭代器的原理,以及怎么利用它实现流的迭代……

异步迭代器

异步迭代器与常规迭代器的工作过程非常相似,但使用了promise:

async function example() {
  // Regular iterator:
  const iterator = createNumberIterator();
  iterator.next(); // Object {value: 1, done: false}
  iterator.next(); // Object {value: 2, done: false}
  iterator.next(); // Object {value: 3, done: false}
  iterator.next(); // Object {value: undefined, done: true}

  // Async iterator:
  const asyncIterator = createAsyncNumberIterator();
  const p = asyncIterator.next(); // Promise
  await p;                    // Object {value: 1, done: false}
  await asyncIterator.next(); // Object {value: 2, done: false}
  await asyncIterator.next(); // Object {value: 3, done: false}
  await asyncIterator.next(); // Object {value: undefined, done: true}
}

这两种迭代器都有.return()方法,用于让迭代器提早结束并做一些清理工作。

迭代器与循环

我们平时很少直接用迭代器,而是使用循环,但循环背后使用的则是迭代器对象。

async function example() {
  // Regular iterator:
  for (const item of thing) {
    // …
  }

  // Async iterator:
  for await (const item of asyncThing) {
    // …
  }
}

这个for-of循环通过调用thing[Symbol.iterator]取得其迭代器。而for-wait循环则通过调用asyncThing[Symbol.asyncIterator]获得其迭代器,如果没有,则退而求其次,取得asyncThing[Symbol.iterator]

for-wait会在asyncIterator.next()解决之后给出一个值。因为需要等待promise返回 ,那么在迭代的同时就可以做其他事。在当前迭代完成之前,是不会调用asyncIterator.next()的。这意味着顺序始终不会乱,而迭代循环也不会重叠。

for-await能够退取Symbol.iterator是非常棒的。这意味着可以同数组之类的常规可迭代对象一起使用:

async function example() {
  const arrayOfFetchPromises = [
    fetch('1.txt'),
    fetch('2.txt'),
    fetch('3.txt')
  ];

  // Regular iterator:
  for (const item of arrayOfFetchPromises) {
    console.log(item); // Logs a promise
  }

  // Async iterator:
  for await (const item of arrayOfFetchPromises) {
    console.log(item); // Logs a response
  }
}

在这种情况下, for-await从数组中取出每一项目并等待它解决。即使第二个响应尚未就绪,也会得到第一个响应,但顺序一直保持不变。

异步生成器:创建自己的异步迭代器

就跟生成吕创建迭代工厂一样,也可以使用异步生成器创建异步迭代器工厂。

异步生成器是异步函数和生成器合休。假设要创建一个返回随机数值的迭代器,但随机数值来自一个Web服务:

// Note the * after "function"
async function* asyncRandomNumbers() {
  // This is a web service that returns a random number
  const url = 'https://www.random.org/decimal-fractions/?num=1&dec=10&col=1&format=plain&rnd=new';

  while (true) {
    const response = await fetch(url);
    const text = await response.text();
    yield Number(text);
  }
}

这个迭代器没有自然的结尾,它只是不断取数值。好在可以使用break停止:

async function example() {
  for await (const number of asyncRandomNumbers()) {
    console.log(number);
    if (number > 0.95) break;
  }
}

在线演示

与常规生成器一样,yield返回值;与常规生成器不同,可以awaitpromise。

for循环相似,可以根据情况break。这样循环就会调用iterator.return(),结果就像生成器的当前(或下一个)yield之后有一个return语句一样。

通过Web服务取得随机数值是有点傻,所以我们看一些更接近实际的例子……

迭代流

正像本文开始时说的,很快你就可以这样写代码了:

async function example() {
  const response = await fetch(url);

  for await (const chunk of response.body) {
    // …
  }
}

但这种写法还没有规范。那么我们就自己来写一个可以迭代流的异步生成器!我们的目标:

  • 锁住流,迭代期间不允许其他代码操作。
  • 输出流的值。
  • 完成后解锁。

解锁很重要。如果开发者中断了循环,我们希望他们可以从中断的地方继续。这样:

async function* streamAsyncIterator(stream) {
  // Get a lock on the stream
  const reader = stream.getReader();

  try {
    while (true) {
      // Read from the stream
      const {done, value} = await reader.read();
      // Exit if we're done
      if (done) return;
      // Else yield the chunk
      yield value;
    }
  }
  finally {
    reader.releaseLock();
  }
}

finally子句在这里非常重要。如果用户中断循环,会导致异步生成器在当前(或下一个)输出点返回。此时,我们仍然想释放读取器的锁,而finallyreturn之后唯一可以执行的代码。

就这样!现在你就可以这么写了:

async function example() {
  const response = await fetch(url);

  for await (const chunk of streamAsyncIterator(response.body)) {
    // …
  }
}

在线演示

释放锁意味着在循环之后仍然可以控制流。假设我们想找到HTML规范中第J的字节位置……

async function example() {
  const find = 'J';
  const findCode = find.codePointAt(0);
  const response = await fetch('https://html.spec.whatwg.org');
  let bytes = 0;

  for await (const chunk of streamAsyncIterator(response.body)) {
    const index = chunk.indexOf(findCode);

    if (index != -1) {
      bytes += index;
      console.log(`Found ${find} at byte ${bytes}.`);
      break;
    }

    bytes += chunk.length;
  }

  response.body.cancel();
}

在线演示

这里在找到匹配的字符时中断了循环。因为streamAsyncIterator释放了对流的锁,因此我们可以取消后续操作,节省流量。

注意我们没有把streamAsyncIterator赋值给ReadableStream.prototype[Symbol.asyncIterator]。这样是可以让我们直接迭代流,但也会搞乱不是我们的对象。如果流变成异步迭代器,那么要是规范的行为与我们实现的不一致,就会导致奇怪的bug。

更短的实现

不一定非要使用异步生成器创建异步可迭代对象,你可以自己创建迭代器对象。Domenic Denicola就是这么做的。以下是他的实现:

function streamAsyncIterator(stream) {
  // Get a lock on the stream:
  const reader = stream.getReader();

  return {
    next() {
      // Stream reads already resolve with {done, value}, so
      // we can just call read:
      return reader.read();
    },
    return() {
      // Release the lock if the iterator terminates.
      return reader.releaseLock();
    },
    // for-await calls this on whatever it's passed, so
    // iterators tend to return themselves.
    [Symbol.asyncIterator]() {
      return this;
    }
  };
}

以上代码在Chrome Canary中(以--js-flags=--harmony-async-iteration选项启动)可以跑起来如果你想在线上环境中使用,Babel可以编译。

相关文章