数据库

Node.js 内核的幕后英雄 -子线程

时间:2010-12-5 17:23:32  作者:IT科技   来源:系统运维  查看:  评论:0
内容摘要:Node.js 为人所知的是单线程应用,也为人所知的是底层其实利用了多线程。单线程会使得代码实现上变得容易好理解,但是带来好处的同时,也往往会存在一些限制,这些限制导致在

Node.js 为人所知的内核是单线程应用,也为人所知的后英是底层其实利用了多线程。单线程会使得代码实现上变得容易好理解,雄线但是内核带来好处的同时,也往往会存在一些限制,后英这些限制导致在 Node.js 内核中,雄线不得不引入其他子线程,内核最终形成多线程。后英本文介绍 Node.js 中的雄线这些幕后英雄。

1 Libuv 线程池

Node.js 中,内核Libuv 线程池是后英最为人所知的子线程。Node.js 的雄线整体架构是单线程 + 事件驱动,其本质是内核一个生产者消费者的模型。

Node.js 不断通过事件驱动模块订阅 fd 的后英事件,然后等待 fd 对应事件的雄线就绪,接着执行对应回调,通过事件驱动,Node.js 甚至实现了定时器的功能。亿华云但是事件驱动模块不是万能的,至少目前不是,也就是说事件驱动模块还无法支持所有的操作,那么这些操作就需要用额外的线程去完成。这是引入 Libuv 线程池的背景。具体来说,Libuv 用来处理文件 IO、DNS 和 CPU 密集型的任务。

从上面图中可以看到,这些任务是直接提交给线程池处理的,等线程池把任务处理后,再通知主线程执行回调。所以底层虽然是多线程的,但是 Node.js 中,所有上层的代码都是在单线程(主线程)中执行的。网站模板下面以异步读文件为例看看大概的流程。

可以看到主线程提交任务给线程池后,就可以继续执行其他操作而不需要等待子线程完成,子线程完成任务后会以异步的方式通知主线程。

2 Watchdog

看门狗是计算机中的一个术语,大概就是定时做一些事情的一个程序,比如启动一个定时器定时检测系统是否运行正常,如果系统运行正常,则在定时器超时前重置定时器,如果系统挂了,则看门狗就会发出告警。在 Node.js 中也用到了看门狗。前面提到 Node.js 是单线程的,那么如何实现保证在某段时间内执行完一段代码呢?一旦我们把执行控制权交给某段代码,那么它什么时候结束就取决于这段代码的内容了,甚至如果它里面有个死循环,那么我们的系统也就无法做其他事情了。Node.js 提供了 API 处理这种情况。

Node.js 底层正是通过 watchdog 实现了这种能力,下面来看一下具体实现,下面是源码下载核心代码。

if (timeout != -1) {

Watchdog wd(env->isolate(), timeout, &timed_out);

result = run();

}

run 函数实现了 js 代码的执行,在 run 之前定义了一个watchdog。我们来看看它的实现。

class Watchdog {

public:

explicit Watchdog(v8::Isolate* isolate,

uint64_t ms,

bool* timed_out = nullptr);

~Watchdog();

v8::Isolate* isolate() { return isolate_; }

private:

static void Run(void* arg);

static void Timer(uv_timer_t* timer);

v8::Isolate* isolate_; // 主线程 isolate

uv_thread_t thread_;// 子线程标识

uv_loop_t loop_; // 事件循环核心结构体

uv_async_t async_; // 子线程和主线程通信结构体

uv_timer_t timer_; // 注册于子线程的定时器

bool* timed_out_; // 标记是否超时了

};

从上面定义中可以看到,watchdog 会新建一个子线程然后在子线程里跑一个新的事件循环(Node.js 中很多这种用法),接着看具体的实现。

Watchdog::Watchdog(v8::Isolate* isolate, uint64_t ms, bool* timed_out)

: isolate_(isolate), timed_out_(timed_out) {

int rc;

// 初始化新的事件循环结构体

rc = uv_loop_init(&loop_);

// 初始化线程间通信结构体,并设置回调

rc = uv_async_init(&loop_, &async_, [](uv_async_t* signal) {

Watchdog* w = ContainerOf(&Watchdog::async_, signal);

uv_stop(&w->loop_);

});

// 在子线程的事件循环中初始化和启动定时器

rc = uv_timer_init(&loop_, &timer_);

rc = uv_timer_start(&timer_, &Watchdog::Timer, ms, 0);

// 创建子线程,并在子线程中执行 Run 函数

rc = uv_thread_create(&thread_, &Watchdog::Run, this);

}

接着看一下 Run。

void Watchdog::Run(void* arg) {

Watchdog* wd = static_cast(arg);

uv_run(&wd->loop_, UV_RUN_DEFAULT);

uv_close(reinterpret_cast(&wd->timer_), nullptr);

}

在 Run 中进入事件循环。以此同时,主线程中也在执行 js 代码。随着时间的流逝,有两种情况,第一,js 代码在定时器超时前执行完,回头看看定义 watchdog 的代码是包裹在一个 if 里的,run 执行完后就会离开 if 作用域,从而析构 watchdog。我们看看析构函数的逻辑。

Watchdog::~Watchdog() {

// 通知子线程退出

uv_async_send(&async_);

// 等待子线程退出

uv_thread_join(&thread_);

uv_close(reinterpret_cast(&async_), nullptr);

// UV_RUN_DEFAULT so that libuv has a chance to clean up.

uv_run(&loop_, UV_RUN_DEFAULT);

CheckedUvLoopClose(&loop_);

}

析构函数里通过 uv_async_send 通知子线程退出,看一下回调。

uv_async_init(&loop_, &async_, [](uv_async_t* signal) {

Watchdog* w = ContainerOf(&Watchdog::async_, signal);

uv_stop(&w->loop_);

});

回调里执行 uv_stop 结束子线程中事件循环的运行,最终退出子线程。第二种情况就是 js 代码过久,然后子线程定时器超时,来看一下超时回调。

void Watchdog::Timer(uv_timer_t* timer) {

Watchdog* w = ContainerOf(&Watchdog::timer_, timer);

*w->timed_out_ = true;

w->isolate()->TerminateExecution();

uv_stop(&w->loop_);

}

回调里通过 isolate 的 TerminateExecution 函数终止 js 代码的执行,然后继续执行主线程后面的代码,而子线程也完成了自己的使命。

3 Inspector

调试 js 时,我们通常会打很多断点,当代码执行到断点时,整个线程就会停住,我们就可以查看当前的栈和信息。如果这时候我们想关闭调试功能,可能会发现无法关闭,因为整个线程都停在断点处了,虽然我们可以直接关闭调试客户端,但是真实情况可能比较复杂,比如多人打开了调试器,我们却只能关闭自己的。另外,如果我们的代码陷入了死循环,那么连打开调试功能的机会都没有了。这是单线程导致的问题,所以 Node.js 中的调试功能是以独立的线程实现的。在 Node.js 中我们可以通过很多种方式打开调试功能。比如在启动 Node.js 时通过加入命令行参数,在代码里通过 inspector 模块的 open 函数,通过给 Node.js 进程发送 SIGUSR1 信号(这种方式可以在主线程死循环的情况依然可以生效)。这些方式对应的实现都是一样的。

uv_thread_create(&thread_, InspectorIo::ThreadMain, this);

Node.js 会创建一个新的线程运行调试相关的代码。

void InspectorIo::ThreadMain() {

uv_loop_t loop;

loop.data = nullptr;

int err = uv_loop_init(&loop);

// 获取调试的 host 和端口

std::string host;

int port;

{

ExclusiveAccess::Scoped host_port(host_port_);

host = host_port->host();

port = host_port->port();

}

// 创建服务器,往事件循环注册 fd

InspectorSocketServer server(std::move(delegate),

&loop,

std::move(host),

port,

inspect_publish_uid_);

server.Start();

// 启动事件循环

uv_run(&loop, UV_RUN_DEFAULT);

CheckedUvLoopClose(&loop);

}

具体就不多介绍了,主要是开了子线程避免了主线程阻塞时所带来的限制。另外多提一下就是通过信号打开调试器的实现。哪怕进程陷入了死循环,也是会处理收到的信号的,因为操作系统在进程调度时,选中某个进程后,在执行前会先处理信号。所以如果 Node.js 进程如果正在陷入死循环,通过信号机制,我们依然有机会执行一些代码。接下来看看这时候 Node.js 都执行了什么代码。首先 Node.js 在初始化时先创建了一个线程。

pthread_create(&thread, &attr, StartIoThreadMain, nullptr);

inline void* StartIoThreadMain(void* unused) {

for (;;) {

uv_sem_wait(&start_io_thread_semaphore);

Mutex::ScopedLock lock(start_io_thread_async_mutex);

Agent* agent = static_cast(start_io_thread_async.data);

if (agent != nullptr)

agent->RequestIoThreadStart();

}

}

这个线程被创建后通过 uv_sem_wait 把自己阻塞,等待信号量 start_io_thread_semaphore 的唤醒,然后通过 agent->RequestIoThreadStart() 启动调试子线程。那么谁来唤醒 StartIoThreadMain 对应的线程呢?是信号 SIGUSR1 的处理函数。

RegisterSignalHandler(SIGUSR1, StartIoThreadWakeup);

static void StartIoThreadWakeup(int signo, siginfo_t* info, void* ucontext) {

uv_sem_post(&start_io_thread_semaphore);

}

但是事情没有那么简单,因为 RequestIoThreadStart 是在子线程里执行的,而 V8 的 isolate 不是线程安全的。

void Agent::RequestIoThreadStart() {

// We need to attempt to interrupt V8 flow (in case Node is running

// continuous JS code) and to wake up libuv thread (in case Node is waiting

// for IO events)

uv_async_send(&start_io_thread_async);

parent_env_->RequestInterrupt([this](Environment*) {

StartIoThread();

});

}

RequestIoThreadStart 往主线程的任务队列里插入一个任务,然后在主线程里执行任务回调去打开调试器,至于为什么需要同时调 uv_async_send 和 RequestInterrupt 注释里已经写得很清楚,Node.js 可能正阻塞于 Libuv 的 Poll IO 阶段,也可以正陷入 JS 的代码执行中甚至死循环中。这两个调用分别解决这两个问题,从而“唤醒”主线程继续执行任务。同样,start_io_thread_async 的回调也是调用了 StartIoThread 启动调试器。StartIoThread 中加了判断,如果已经开启了则直接返回,避免两个回调执行重复的代码。

4 Trace

Node.js 的 Trace 功能中也使用了子线程。先看一下 Trace Agent 的构造函数。

Agent::Agent() : tracing_controller_(new TracingController()) {

CHECK_EQ(uv_loop_init(&tracing_loop_), 0);

// 注册当注册 writer 时的回调

CHECK_EQ(uv_async_init(&tracing_loop_,

&initialize_writer_async_,

[](uv_async_t* async) {

Agent* agent = ContainerOf(&Agent::initialize_writer_async_, async);

agent->InitializeWritersOnThread();

}), 0);

}

接着看 Agent 启动的代码,Trace Agent 启动的时候就会创建一个新的子线程。

void Agent::Start() {

if (started_)

return;

// 创建 controller 和 buffer 后续用到

NodeTraceBuffer* trace_buffer_ = new NodeTraceBuffer(NodeTraceBuffer::kBufferChunks, this, &tracing_loop_);

tracing_controller_->Initialize(trace_buffer_);

// 创建子线程运行新的事件循环

CHECK_EQ(0, uv_thread_create(&thread_, [](void* arg) {

Agent* agent = static_cast(arg);

uv_run(&agent->tracing_loop_, UV_RUN_DEFAULT);

}, this));

started_ = true;

}

逻辑和之前介绍的类似,都是开启一个新的线程运行一个事件循环,然后主线程和子线程进行通信。看 NodeTraceBuffer 的初始化

NodeTraceBuffer::NodeTraceBuffer(size_t max_chunks,

Agent* agent, uv_loop_t* tracing_loop)

: tracing_loop_(tracing_loop),

buffer1_(max_chunks, 0, agent),

buffer2_(max_chunks, 1, agent) {

current_buf_.store(&buffer1_);

flush_signal_.data = this;

// 初始化线程间通信的结构体,NonBlockingFlushSignalCb 在子线程中执行

int err = uv_async_init(tracing_loop_, &flush_signal_,

NonBlockingFlushSignalCb);

exit_signal_.data = this;

err = uv_async_init(tracing_loop_, &exit_signal_, ExitSignalCb);

NodeTraceBuffer 往 loop 中注册了一个 async 结构体。注册完任务后,子线程就进入了事件循环。这时候有两种情况会改变事件循环的状态。第一个是注册 writer。writer 是消费者,当有 trace 事件发生时,agent 会调用 writer 进行数据的消费,比如写入文件。我们看一下注册 writer 的逻辑。

AgentWriterHandle Agent::AddClient(

const std::set& categories,

std::unique_ptr writer,

enum UseDefaultCategoryMode mode) {

// 省略了一些逻辑

AsyncTraceWriter* raw = writer.get();

writers_[id] = std::move(writer);

// 记录待初始化的 writer,通知子线程处理

{

Mutex::ScopedLock lock(initialize_writer_mutex_);

to_be_initialized_.insert(raw);

uv_async_send(&initialize_writer_async_);

while (to_be_initialized_.count(raw) > 0)

initialize_writer_condvar_.Wait(lock);

}

return AgentWriterHandle(this, id);

前面看到 initialize_writer_async_ 的回调是

uv_async_init(&tracing_loop_,

&initialize_writer_async_,

[](uv_async_t* async) {

Agent* agent = ContainerOf(&Agent::initialize_writer_async_, async);

agent->InitializeWritersOnThread();

}), 0)

接着看 InitializeWritersOnThread。

void Agent::InitializeWritersOnThread() {

Mutex::ScopedLock lock(initialize_writer_mutex_);

while (!to_be_initialized_.empty()) {

AsyncTraceWriter* head = *to_be_initialized_.begin();

head->InitializeOnThread(&tracing_loop_);

to_be_initialized_.erase(head);

}

initialize_writer_condvar_.Broadcast(lock);

}

InitializeWritersOnThread 就是遍历刚才记录的 writer 并执行它的 InitializeOnThread。以 NodeTraceWriter 为例。

void NodeTraceWriter::InitializeOnThread(uv_loop_t* loop) {

tracing_loop_ = loop;

flush_signal_.data = this;

int err = uv_async_init(tracing_loop_, &flush_signal_,

[](uv_async_t* signal) {

NodeTraceWriter* trace_writer =

ContainerOf(&NodeTraceWriter::flush_signal_, signal);

trace_writer->FlushPrivate();

});

CHECK_EQ(err, 0);

exit_signal_.data = this;

err = uv_async_init(tracing_loop_, &exit_signal_, ExitSignalCb);

}

writer 会往子线程的事件循环里注册任务。等到有数据消费的时候主线程会通知子线程。现在有了消费者,那么生产者是谁?生产者散布在 Node.js 源码的多个地方。下面以文件模块的打开文件为例。

FS_SYNC_TRACE_BEGIN(open);

int result = SyncCall(env, args[4], &req_wrap_sync, "open",

uv_fs_open, *path, flags, mode);

FS_SYNC_TRACE_END(open);

FS_SYNC_TRACE_BEGIN 和 FS_SYNC_TRACE_END 是 Trace 模块定义的宏,非常繁琐。最终会调用 controller->AddTraceEvent 生产 trace 数据,这是改变子线程事件循环状态的第二种情况。Node.js 的 controller 为 TracingController 对象,继承 v8::platform::tracing::TracingController。看一下这个对象的 AddTraceEvent 函数。

uint64_t TracingController::AddTraceEvent(...) {

return AddTraceEventWithTimestamp(...);

}

uint64_t TracingController::AddTraceEventWithTimestamp() {

TraceObject* trace_object = trace_buffer_->AddTraceEvent(...);

}

最终调用了 buffer 的 AddTraceEvent。buffer 就是 Trace Agent 初始化时设置的 NodeTraceBuffer。

TraceObject* NodeTraceBuffer::AddTraceEvent(uint64_t* handle) {

// 数据太多则 flush

if (!TryLoadAvailableBuffer()) {

*handle = 0;

return nullptr;

}

// 记录数据

return current_buf_.load()->AddTraceEvent(handle);

}

NodeTraceBuffer::AddTraceEvent 会不断积攒数据。达到某个值后会通知子线程进行 flush。我们看一下 TryLoadAvailableBuffer。

bool NodeTraceBuffer::TryLoadAvailableBuffer() {

InternalTraceBuffer* prev_buf = current_buf_.load();

if (prev_buf->IsFull()) {

uv_async_send(&flush_signal_);

}

return true;

}

通过 uv_async_send 通知 flush_signal_ ,然后在子线程里执行回调 NonBlockingFlushSignalCb。

void NodeTraceBuffer::NonBlockingFlushSignalCb(uv_async_t* signal) {

NodeTraceBuffer* buffer = static_cast(signal->data);

if (buffer->buffer1_.IsFull() && !buffer->buffer1_.IsFlushing()) {

buffer->buffer1_.Flush(false);

}

if (buffer->buffer2_.IsFull() && !buffer->buffer2_.IsFlushing()) {

buffer->buffer2_.Flush(false);

}

}

buffer->buffer1_ 和 buffer->buffer2_ 是 InternalTraceBuffer 对象。

void InternalTraceBuffer::Flush(bool blocking) {

{

Mutex::ScopedLock scoped_lock(mutex_);

if (total_chunks_ > 0) {

flushing_ = true;

for (size_t i = 0; i < total_chunks_; ++i) {

auto& chunk = chunks_[i];

for (size_t j = 0; j < chunk->size(); ++j) {

TraceObject* trace_event = chunk->GetEventAt(j);

if (trace_event->name()) {

agent_->AppendTraceEvent(trace_event);

}

}

}

total_chunks_ = 0;

flushing_ = false;

}

}

agent_->Flush(blocking);

}

InternalTraceBuffer 会把积攒的数据写到 agent 并 flush。

void Agent::AppendTraceEvent(TraceObject* trace_event) {

for (const auto& id_writer : writers_)

id_writer.second->AppendTraceEvent(trace_event);

}

Agent 也只是个中间人,它会调用每个 writer 进行数据的消费。以 NodeTraceWriter 为例,NodeTraceWriter 会先把数据积攒起来。

void Agent::Flush(bool blocking) {

for (const auto& id_writer : writers_)

id_writer.second->Flush(blocking);}

// 通知子线程一起 flush 到文件

void NodeTraceWriter::Flush(bool blocking) {

int err = uv_async_send(&flush_signal_);

}

flush_signal_ 回调是 NodeTraceWriter 的 FlushPrivate。

void NodeTraceWriter::FlushPrivate() {

WriteToFile(std::move(str), highest_request_id);

}

最终写入文件,我们也就拿到了 Trace 对应的数据。

5 Platform

Node.js 在初始化时会创建一个 WorkerThreadsTaskRunner。

WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) {

Mutex platform_workers_mutex;

ConditionVariable platform_workers_ready;

Mutex::ScopedLock lock(platform_workers_mutex);

int pending_platform_workers = thread_pool_size;

delayed_task_scheduler_ = std::make_unique(

&pending_worker_tasks_);

// threads_ 是线程队列

threads_.push_back(delayed_task_scheduler_->Start());

for (int i = 0; i < thread_pool_size; i++) {

PlatformWorkerData* worker_data = new PlatformWorkerData{

&pending_worker_tasks_, &platform_workers_mutex,

&platform_workers_ready, &pending_platform_workers, i

};

std::unique_ptrt { new uv_thread_t() };

// 创建线程

if (uv_thread_create(t.get(), PlatformWorkerThread,

worker_data) != 0) {

break;

}

threads_.push_back(std::move(t));

}

}

首先会根据 thread_pool_size 创建多个子线程。还创建了一个延迟任务的调度器 DelayedTaskScheduler,这个对象里也创建了一个线程。

std::unique_ptrStart() {

auto start_thread = [](void* data) {

// 处理任务

static_cast(data)->Run();

};

std::unique_ptrt { new uv_thread_t() };

uv_sem_init(&ready_, 0);

CHECK_EQ(0, uv_thread_create(t.get(), start_thread, this));

uv_sem_wait(&ready_);

uv_sem_destroy(&ready_);

return t;

}

Node.js 初始化后就创建了一波线程,然后 V8 会使用这些线程,我们通过 Post 和 PostDelayedTask 可以看到。

6 总结

大致完成了 Node.js 中幕后线程的分析,单线程的 Node.js 正是因为这些幕后的子线程变得越来越强大,另外我们也可以通过 Addon 的方式开启新的子线程,以此做更多的事情,当然也可以使用 worker_thread 模块。

copyright © 2025 powered by 益强资讯全景  滇ICP备2023006006号-31sitemap