说一说 JavaScript 异步迭代器

2024年 2月 27日 80.1k 0

你知道吗?除了像Promise.finally这样的 API 之外,ECMAScript 2018还为我们带来了另一种处理迭代器的方式——异步迭代器。

问题

假设现在我们正处于这样一个情景:需要使用Node.js逐行读取文件。Node有个API叫做readLine,它是一个包装器,可用于逐行从输入流中读取数据,不需要分析输入缓冲区、也不需要将文本分解为小块。

你可以像这样监听:

const fs = require('fs')
const readline = require('readline')
const reader = readline.createInterface({
  input: fs.createReadStream('./file.txt'),
  crlfDelay: Infinity
})

reader.on('line', (line) => console.log(line))

假设有这样一个简单的文件:

line 1
line 2
line 3

如果我们在创建的文件上运行代码,那么就能在控制台上逐行输出。但是,使用事件并不是编写可维护代码的最佳方法之一,因为事件是完全异步的,可能会中断代码流——因为是无序触发的,并且只能通过侦听器分配操作。

解决方案

除了事件 API之外,readline还有async iterator。现在我们可以不通过line事件中的侦听器读取,而是通过for关键字来读取。

举几个使用for循环的例子。第一个是最常见的使用计数器和条件:

for (let x = 0; x < array.length; x++) {
  // Code here
}

我们也可以使用for … in表示法读取数组索引:

const a = [1,2,3,4,5,6]
for (let index in a) {
  console.log(a[index])
}

在前一种情况下,console.log输出从1到6的数字,但是如果我们使用console.log (index),那么记录的是数组的索引,从0到5的数字。

下面,我们使用for … of表示法,直接获取数组的可枚举属性,即它的直接值:

const a = [1,2,3,4,5,6]

for (let item of a) {
  console.log(item)
}

注意,这些方法都是同步的。那么,如果我们有一系列promise,这时该如何按顺序读取呢?

假设我们还有另一个接口,它总是返回一个Promise。为了按顺序解析promise,我们需要这样做:

async function readLine (files) {
  for (const file of files) {
    const line = await readFile(file) // Imagine readFile is our cursor
    console.log(line)
  }
}

而现在,多亏异步可迭代对象(如readline)的魔力,我们可以执行以下操作:

const fs = require('fs')
const readline = require('readline')
const reader = readline.createInterface({
  input: fs.createReadStream('./xpto.txt'),
  crlfDelay: Infinity
})

async function read () {
  for await (const line of reader) {
    console.log(line)
  }
}

read()

注意,我们现在使用的是for、for await (const x of y)的新定义。

对于await和node.js

从10.x版开始,Node.js运行时原生支持for await表示法。如果你使用的是8.x或9.x版本,则需要使用--harmony_async_iteration标志启动Javascript文件。遗憾的是,Node.js的版本6和版本7不支持异步迭代器。

为了理解异步迭代器的概念,首先我们需要知道迭代器的本质。简而言之,迭代器是一个对象,公开next()函数,此函数返回另一个对象,其中{value: any, done: boolean}表示当前迭代的值,done表示序列中是否还有其他值。

遍历数组中所有项的迭代器示例如下:

const array = [1,2,3]
let index = 0

const iterator = {
  next: () => {
    if (index >= array.length) return { done: true }
    return {
      value: array[index++],
      done: false
    }
  }
}

就其本身而言,迭代器没有实际用途,那么怎么办呢?为此,我们需要iterable。iterable是一个对象,它有一个Symbol.iterator键,该键返回的函数返回迭代器:

// ... Iterator code here ...

const iterable = {
  [Symbol.iterator]: () => iterator
}

现在我们可以正常使用迭代器了,通过for (const x of iterable),我们可以一个一个地迭代array中的所有值。

在后台,所有数组和对象都有Symbol.iterator,这样就可以执行for (let x of [1,2,3])并返回我们想要的值。

我们可以看到,异步迭代器与迭代器完全相同,不同之处在于iterable拥有的是Symbol.asyncIterator,而不是Symbol.iterator,拥有的是解析为具有相同签名对象的Promise,而不是返回{value, done}的对象。

让我们把上面的迭代器变成一个异步迭代器:

const array = [1,2,3]
let index = 0

const asyncIterator = {
  next: () => {
  if (index >= array.length) return Promise.resolve({done: true})
  return Promise.resolve({value: array[index++], done: false})
  }
}

const asyncIterable = {
  [Symbol.asyncIterator]: () => asyncIterator
}

异步迭代

我们可以通过调用next()函数来手动迭代迭代器:

// ... Async iterator Code here ...

async function manual () {
  const promise = asyncIterator.next() // Promise
  await p // Object { value: 1, done: false }
  await asyncIterator.next() // Object { value: 2, done: false }
  await asyncIterator.next() // Object { value: 3, done: false }
  await asyncIterator.next() // Object { done: true }
}

为了遍历异步迭代器,我们需要使用for await,但请记住,关键字await只能在异步函数中使用,所以我们需要有类似这样的代码:

// ... Code above ...

async function iterate () {
  for await (const num of asyncIterable) console.log(num)
}

iterate() // 1, 2, 3

但是,由于Node 8.x和9.x这样的老版本不支持异步迭代器,为了在这些版本中使用异步迭代器,我们可以简单地从对象中提取next并手动遍历:

// ... Async Iterator Code here ...

async function iterate () {
  const {next} = asyncIterable[Symbol.asyncIterator]() // we take the next iterator function

  for (let {value, done} = await next(); !done; {value, done} = await next()) {
    console.log(value)
  }
}

注意,for await更干净、更简洁,因为它的行为类似于常规循环,而且,除了更易于理解之外,还可以通过done键自行检查迭代器的结束。

处理错误

如果promise在迭代器中被拒绝,会发生什么?好吧,和任何被拒绝的promise一样,通过简单的try/catch就可以来捕获错误(因为我们使用的是await):

const asyncIterator = { next: () => Promise.reject('Error') }
const asyncIterable = { [Symbol.asyncIterator]: () => asyncIterator 

async function iterate () {
  try {
    for await (const num of asyncIterable) {}
  } catch (e) {
    console.log(e.message)
  }
}

iterate()

回退

关于异步迭代器,非常有趣的一点是,它们有Symbol.iterator的回退,这意味着你也可以将它与常规迭代器一起使用,例如,有这样一个promise数组:

const promiseArray = [
  fetch('https://lsantos.dev'),
  fetch('https://lsantos.me')
]

async function iterate () {
  for await (const response of promiseArray) console.log(response.status)
}

iterate() // 200, 200

异步生成器

在大多数情况下,迭代器和异步迭代器可以创建自生成器。

生成器是允许暂停和恢复执行的函数,因此可以操作执行,然后通过next()函数获取下一个值。

异步生成器的行为类似于异步迭代器,但你必须手动实现停止机制,例如,这里我们构建一个用于git提交的随机消息生成器:

async function* gitCommitMessageGenerator () {
  const url = 'https://whatthecommit.com/index.txt'

  while (true) {
    const response = await fetch(url)
    yield await response.text() // We return the value
  }
}

注意,在任何时候都不会返回{value, done}对象,因此循环无法知道执行何时完成。这时我们可以实现这样的函数:

// Previous Code
async function getCommitMessages (times) {
  let execution = 1
  for await (const message of gitCommitMessageGenerator()) {
    console.log(message)
    if (execution++ >= times) break
  }
}

getCommitMessages(5)
// I'll explain this when I'm sober .. or revert it
// Never before had a small typo like this one caused so much damage.
// For real, this time.
// Too lazy to write descriptive message
// Ugh. Bad rebase.

用例

再来一个更有趣的示例,为一个真实用例构建异步迭代器。目前,适用于Node.js的Oracle数据库驱动程序支持resultSet API,因此可以在数据库上执行查询并返回,而数据流则可以通过getRow()方法逐个读取。

要创建resultSet,我们需要在数据库中执行查询,如下所示:

const oracle = require('oracledb')
const options = {
  user: 'example',
  password: 'example123',
  connectString: 'string'
}

async function start () {
  const connection = await oracle.getConnection(options)
  const { resultSet } = await connection.execute('query', [], { outFormat: oracle.OBJECT, resultSet: true })
  return resultSet
}

start().then(console.log)

resultSet有一个名为getRow()的方法,这个方法从数据库中返回要获取的下一行的Promise。我们可以创建一个逐行返回此resultSet的光标。下面让我们创建Cursor类:

class Cursor {
  constructor(resultSet) {
    this.resultSet = resultSet
  }

  getIterable() {
    return {
      [Symbol.asyncIterator]: () => this._buildIterator()
    }
  }

  _buildIterator() {
    return {
      next: () => this.resultSet.getRow().then((row) => ({ value: row, done: row === undefined }))
    }
  }
}

module.exports = Cursor

查看光标是否接收到它应该处理的resultSet,并将其存储在当前状态。因此,我们需要更改之前的方法,以便返回光标而不是resultSet:

const oracle = require('oracledb')
const options = {
  user: 'example',
  password: 'example123',
  connectString: 'string'
}
async function getResultSet() {
  const connection = await oracle.getConnection(options)
  const { resultSet } = await connection.execute('query', [], { outFormat: oracle.OBJECT, resultSet: true })
  return resultSet
}

async function start() {
  const resultSet = await getResultSet()
  const cursor = new Cursor(resultSet)

  for await (const row of cursor.getIterable()) {
    console.log(row)
  }
}

start()

这样,我们就可以遍历所有返回的行,不需要单独的Promise解析。

结论

异步迭代器非常强大,尤其是在Javascript等动态和异步语言中。有了它们,我们就可以将复杂的执行变成简单的代码,从而向用户隐藏复杂性,增加友好的用户体验。

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论