首页 > 语言 > JavaScript > 正文

Node.js pipe实现源码解析

2024-05-06 15:13:08
字体:
来源:转载
供稿:网友

从前面两篇文章,我们了解到。想要把 Readable 的数据写到 Writable,就必须先手动的将数据读入内存,然后写入 Writable。换句话说,每次传递数据时,都需要写如下的模板代码

readable.on('readable', (err) => { if(err) throw err writable.write(readable.read())})

为了方便使用,Node.js 提供了 pipe() 方法,让我们可以优雅的传递数据

readable.pipe(writable)

现在,就让我们来看看它是如何实现的吧

pipe

首先需要先调用 Readable 的 pipe() 方法

// lib/_stream_readable.jsReadable.prototype.pipe = function(dest, pipeOpts) { var src = this; var state = this._readableState; // 记录 Writable switch (state.pipesCount) {  case 0:   state.pipes = dest;   break;  case 1:   state.pipes = [state.pipes, dest];   break;  default:   state.pipes.push(dest);   break; } state.pipesCount += 1; // ...  src.once('end', endFn); dest.on('unpipe', onunpipe);  // ... dest.on('drain', ondrain); // ... src.on('data', ondata); // ... // 保证 error 事件触发时,onerror 首先被执行 prependListener(dest, 'error', onerror); // ... dest.once('close', onclose);  // ... dest.once('finish', onfinish); // ... // 触发 Writable 的 pipe 事件 dest.emit('pipe', src); // 将 Readable 改为 flow 模式 if (!state.flowing) {  debug('pipe resume');  src.resume(); } return dest;};

执行 pipe() 函数时,首先将 Writable 记录到 state.pipes 中,然后绑定相关事件,最后如果 Readable 不是 flow 模式,就调用 resume() 将 Readable 改为 flow 模式

传递数据

Readable 从数据源获取到数据后,触发 data 事件,执行 ondata()

ondata() 相关代码:

// lib/_stream_readable.js // 防止在 dest.write(chunk) 内调用 src.push(chunk) 造成 awaitDrain 重复增加,awaitDrain 不能清零,Readable 卡住的情况 // 详情见 https://github.com/nodejs/node/issues/7278 var increasedAwaitDrain = false; function ondata(chunk) {  debug('ondata');  increasedAwaitDrain = false;  var ret = dest.write(chunk);  if (false === ret && !increasedAwaitDrain) {   // 防止在 dest.write() 内调用 src.unpipe(dest),导致 awaitDrain 不能清零,Readable 卡住的情况   if (((state.pipesCount === 1 && state.pipes === dest) ||      (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)     ) &&      !cleanedUp) {    debug('false write response, pause', src._readableState.awaitDrain);    src._readableState.awaitDrain++;    increasedAwaitDrain = true;   }   // 进入 pause 模式   src.pause();  } }

在 ondata(chunk) 函数内,通过 dest.write(chunk) 将数据写入 Writable

此时,在 _write() 内部可能会调用 src.push(chunk) 或使其 unpipe,这会导致 awaitDrain 多次增加,不能清零,Readable 卡住

当不能再向 Writable 写入数据时,Readable 会进入 pause 模式,直到所有的 drain 事件触发

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表

图片精选