李志華

使用 Async/Await 的函数式编程基础

李志華 · 2017-05-05翻译 · 965阅读 原文链接

Async/await 可以很容易的将异步行为(asynchronous behavior)与命令式结构(imperative construct)集成起来,如 for 循环,if 语句和 try/catch 块。不幸的是,对于 forEach, map, reducefilter 这样的函数式结构来说,并非那么容易。使用这些带有异步函数的结构,会导致其行为非常令人困惑。在本文中,我将向您展示一些的异步函数的常见问题,包括 JavaScript 内置的函数数组方法以及如何解决这些问题。

注意:以下代码仅在 Node v7.6.0 上测试通过。此外,下面的代码仅仅是想法实验和教学示例。我不建议在生产环境中使用它。

动机和 forEach

在同步领域,forEach() 按照顺序对数组中每个元素执行一个函数,例如,以下脚本必定能保证打印出 0-9:

function print(n) {
  console.log(n);
}

function test() {
  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9].forEach(print);
}

test();

不幸的是,异步函数中这一切变得更加微妙。以下脚本将以相反的顺序打印 0-9!

async function print(n) {
  // 打印 0 之前等待 1 秒,打印 1 之前等待 0.9 秒,等等。
  await new Promise(resolve => setTimeout(() => resolve(), 1000 - n * 100));
  // 通常打印 9, 8, 7, 6, 5, 4, 3, 2, 1, 0,但顺序并不严格保证
  console.log(n);
}

async function test() {
  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9].forEach(print);
}

test();

即使这两个函数都是 async 函数,Node.js 不会等待前一个 print() 调用完成,然后执行下一个 print() !或许你需要尝试添加一个 await

async function test() {
  // SyntaxError: Unexpected identifier
  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9].forEach(n => { await print(n); });
}

答案是否定的,我不会让你这么做的,这是一个 SyntaxError(语法错误),因为 await 必须总是async 函数中。这时候,您可以放弃使用非标准的 Promise.series() 函数。但是,如果你还记得 async 函数只是返回 promise 的函数,你可以使用 promise 链和 .reduce() 来获得一个按顺序执行的 forEach()

async function print(n) {
  await new Promise(resolve => setTimeout(() => resolve(), 1000 - n * 100));
  console.log(n);
}

async function test() {
  // 这就是神奇的地方。每个 `print()` 调用都返回一个 promise,
  // 因此调用 `then()` 按顺序将它们链在一起,并且按照顺序打印出 0-9。
  await [0, 1, 2, 3, 4, 5, 6, 7, 8, 9].
    reduce((promise, n) => promise.then(() => print(n)), Promise.resolve());
}

test();

你还可以将此功能包装在一个便捷的 forEachAsync() 函数中:

async function print(n) {
  await new Promise(resolve => setTimeout(() => resolve(), 1000 - n * 100));
  console.log(n);
}

Array.prototype.forEachAsync = function(fn) {
  return this.reduce((promise, n) => promise.then(() => fn(n)), Promise.resolve());
};

async function test() {
  await [0, 1, 2, 3, 4, 5, 6, 7, 8, 9].forEachAsync(print);
}

test();

链式调用 map()filter()

JavaScript 的函数式数组结构的一大优点是链式。假设你有一个 id 数组,并且你想取出与这些 id 相应的文档,过滤出已经在另一个数据库中的文档,并保存所有这些文档。你可以在没有任何函数式基元(functional primitives)的情况下做到这些,但将涉及大量的中间值。

const { MongoClient } = require('mongodb');

async function copy(ids, db1, db2) {
  // 从 db1 中找出所有 docs
  const fromDb1 = await db1.collection('Test').find({ _id: { $in: ids } }).sort({ _id: 1 }).toArray();
  // 从 db2 中找出所有 docs
  const fromDb2 = await db2.collection('Test').find({ _id: { $in: ids } }).sort({ _id: 1 }).toArray();

  // 找出 db1 中有,但 db2 中没有的所有 docs
  const toInsert = [];
  for (const doc of fromDb1) {
    if (!fromDb2.find(_doc => _doc._id === doc._id)) {
      toInsert.push(doc);
      console.log('Insert', doc);
    }
  }
  // 把这些 docs 插入到 db2
  await db2.collection('Test').insertMany(toInsert);
}

async function test() {
  const db1 = await MongoClient.connect('mongodb://localhost:27017/db1');
  const db2 = await MongoClient.connect('mongodb://localhost:27017/db2');
  await db1.dropDatabase();
  await db2.dropDatabase();

  const docs = [
    { _id: 1 },
    { _id: 2 },
    { _id: 3 },
    { _id: 4 }
  ];

  await db1.collection('Test').insertMany(docs);
  // 只插入 docs 中 _id 等于 2 和 4 到 db2
  await db2.collection('Test').insertMany(docs.filter(doc => doc._id % 2 === 0));

  await copy(docs.map(doc => doc._id), db1, db2);
}

test();

函数式将会使这一切变得清晰明了 - 你只需 ids.map().filter().forEach(),但是每个 map(), filter(),和 each() 都需要处理异步函数。我们已经有了 forEachAsync(),类似的还需要实现 mapAsync()filterAsync()

Array.prototype.mapAsync = function(fn) {
  return Promise.all(this.map(fn));
};

Array.prototype.filterAsync = function(fn) {
  return this.mapAsync(fn).then(_arr => this.filter((v, i) => !!_arr[i]));
};

然而,现在我们产生了链式调用的问题。如何连接 mapAsync()filterAsync()?你可以使用 .then() 但是不会那么优雅。相反,您可以创建一个代表 Promise 的 AsyncArray 类,最终将返回一个数组,并附加 mapAsync, filterAsyncforEachAsync 作为类方法:

class AsyncArray {
  constructor(promise) {
    this.$promise = promise || Promise.resolve();
  }

  then(resolve, reject) {
    return new AsyncArray(this.$promise.then(resolve, reject));
  }

  catch(reject) {
    return this.then(null, reject);
  }

  mapAsync(fn) {
    return this.then(arr => Promise.all(arr.map(fn)));
  }

  filterAsync(fn) {
    return new AsyncArray(Promise.all([this, this.mapAsync(fn)]).then(([arr, _arr]) => arr.filter((v, i) => !!_arr[i])));
  }

  forEachAsync(fn) {
    return this.then(arr => arr.reduce((promise, n) => promise.then(() => fn(n)), Promise.resolve()));
  }
}

使用这个 AsyncArray 类,你能够使用 mapAsync(), filterAsync()forEachAsync() 链式调用,因为这些工具方法(helper method)仍然会返回一个 AsyncArray。以下是之前的 MongoDB 示例:

async function copy(ids, db1, db2) {
  new AsyncArray(Promise.resolve(ids)).
    mapAsync(function(_id) {
      return db1.collection('Test').findOne({ _id });
    }).
    filterAsync(async function(doc) {
      const _doc = await db2.collection('Test').findOne({ _id: doc._id });
      return !_doc;
    }).
    forEachAsync(async function(doc) {
      console.log('Insert', doc);
      await db2.collection('Test').insertOne(doc);
    }).
    catch(error => console.error(error));
}

async function test() {
  const db1 = await MongoClient.connect('mongodb://localhost:27017/db1');
  const db2 = await MongoClient.connect('mongodb://localhost:27017/db2');
  await db1.dropDatabase();
  await db2.dropDatabase();

  const docs = [
    { _id: 1 },
    { _id: 2 },
    { _id: 3 },
    { _id: 4 }
  ];

  await db1.collection('Test').insertMany(docs);
  // 只插入 docs 中 _id 等于 2 和 4 到 db2
  await db2.collection('Test').insertMany(docs.filter(doc => doc._id % 2 === 0));

  await copy(docs.map(doc => doc._id), db1, db2);
}

test();

reduce() 包装

现在,你已经有了 mapAsync(), filterAsync()forEachAsync(),为何不将它们贯穿起来,然后实现为 reduceAsync()

 reduceAsync(fn, initial) {
    return Promise.resolve(initial).then(cur => {
      return this.forEachAsync(async function(v, i) {
        cur = await fn(cur, v, i);
      }).then(() => cur);
    });
  }

这里是如何使用 reduceAsync()

async function test() {
  const db = await MongoClient.connect('mongodb://localhost:27017/test');
  await db.dropDatabase();

  const docs = [
    { _id: 1, name: 'Axl' },
    { _id: 2, name: 'Slash' },
    { _id: 3, name: 'Duff' },
    { _id: 4, name: 'Izzy' },
    { _id: 5, name: 'Adler' }
  ];

  await db.collection('People').insertMany(docs);

  const ids = docs.map(doc => doc._id);

  const nameToId = await new AsyncArray(Promise.resolve(ids)).
    reduceAsync(async function (cur, _id) {
      const doc = await db.collection('People').findOne({ _id });
      cur[doc.name] = doc._id;
      return cur;
    }, {});
  console.log(nameToId);
}

test();

总的来说,使用异步的 map(), filter(), reduce(), and forEach() 函数是可行的,但需要自定义函数和轻巧复杂的 promise 链。我相信有人会出来一个 library,可以使 promise 数组简单易行,我期待看到这一切。函数式编程原型(functional programming primitive)使同步数组操作清晰优雅,并通过链式消除大量无用的中间值。通过添加工具函数(helper),能够操作数组中 resolve 的 promise,展示了各种令人激动的可能性。

Async/await 是强大的,但是如果由于受制于 LTS 支持,而使用 Node.js 4.x 或 6.x(特别是由于 Node.js 8 延期发布),您仍然可以使用类似 ES6 generator 和 co 的函数式编程模式。如果您希望更深入 co,包括自己如何从头开始编写 co 的替代品,请查看我的电子书,The 80/20 Guide to ES2015 Generators

相关文章