Table of Contents
Kể từ phiên bản v10.5.0, Nodejs đã có thêm module worker_threads. Vậy chính xác module mới này là gì và tại sao lại cần thêm nó vào. Trong bài blog này chúng ta sẽ đề cập đến những lý do mà xử lý đồng thời (concurrency) được implement trong Javascript và Nodejs, những vấn đề nó gặp phải, giải pháp hiện tại và tương lai của xử lý song song (parallel) với worker threads.
1. Đơn luồng
Javascript được biết đến là một ngôn ngữ lập trình đơn luồng chạy trên browser. Đơn luồng nghĩa là những tập lệnh, những dòng code chúng ta viết ra chỉ được thực thi tuần tự tại một thời điểm của cùng process.
Javascript khởi đầu là một ngôn ngữ chỉ hữu ích cho việc thêm các tương tác cho trang web, validate form,… Những xử lý này không cần phải thêm vào multithreading cho phức tạp.
Ryan Dahl, người tạo ra Nodejs, đã nhìn thấy sự hạn chế này là một cơ hội. Anh ấy đã muốn triển khai một server-side platform dựa trên asynchronous I/O, điều đó có nghĩa không cần đến thread. Concurrency là một vấn đề khó để giải quyết. Nhiều thread cùng truy cập đến cùng vùng nhớ có thể gây nên hiện tượng race condition , hiện tượng này rất khó để reproduce và fix.
2. Nodejs có phải là đơn luồng ?
Điều này vừa đúng vừa không đúng. Thật sự thì, chúng ta có thể chạy song song, nhưng ta không tạo thread và không đồng bộ hóa chúng. Máy ảo và OS sẽ chạy I/O song song và khi trở lại Javascript code, phần Javascript code sẽ chỉ chạy đơn luồng.
Nói cách khác, tất cả mọi thứ chạy song song, ngoại trừ Javascript code.
Điều này rất tuyệt nếu tất cả những gì ta làm là asynchronous I/O. Những dòng code chỉ gồm các phần nhỏ của synchronous block – được thực thi nhanh và truyền data đến các file và stream. Javscript code chạy rất nhanh nên sẽ không block việc thực thi của các phần Javascript khác. Thời gian sẽ được dành cho việc chờ đợi I/O event hơn là Javscript code được thực thi. Ví dụ sau đây sẽ giúp ta hiểu sơ qua:
db.findOne('SELECT ... LIMIT 1', function(err, result) { if (err) return console.error(err) console.log(result) }) console.log('Running query') setTimeout(function() { console.log('Hey there') }, 1000)
Có thể lệnh query đến database sẽ tốn hơn một phút nhưng dòng chữ “Running query” sẽ xuất hiện ngay sau khi tạo lệnh query. Và ta sẽ thấy dòng chữ “Hey there” sau một giây sau khi tạo lệnh query dù query có đang chạy hay không. Nodejs app chỉ gọi function mà không block việc thực thi của các phần code khác. Nó sẽ được thông báo thông qua function callback khi lệnh query chạy xong và ta sẽ thu được kết quả.
3. Các các vụ làm nặng CPU
Điều gì sẽ xảy ra nếu ta thực thi các tác vụ đồng bộ nặng? Ví dụ như các tính toán phức tạp trên tập data lớn. Khi đó ta có thể có một synchronous block code mà khi chạy tốn rất nhiều thời gian và nó sẽ block các phần code còn lại. Giả định rằng code tính toán phức tạp mất 10 giây. Nếu ta chạy trên web server điều đó có nghĩa rằng tất cả các request khác sẽ bị block ít nhất 10 giây bởi tính toán phức tạp đó. Điều này thực sự không tốt chút nào.
Javascript và Nodejs thường không được dùng cho các tác vụ nặng CPU. Bởi vì Javascript chạy đơn luồng nó sẽ làm đứng UI trên browser và làm hàng đợi (queue) tất cả các I/O event trong Nodejs.
Trở lại với ví dụ trước đó. Giả sử ta có một lệnh query return hàng ngàn kết quả và ta cần phải giải mã các giá trị bằng Javascript code:
db.findAll('SELECT ...', function(err, results) { if (err) return console.error(err) // Heavy computation and many results for (const encrypted of results) { const plainText = decrypt(encrypted) console.log(plainText) } })
Ta sẽ có được kết quả trong biến results trong hàm callback một khi query xong. Rồi sau đó, sẽ không có Javascript code nào được thực thi cho đến khi hàm callback này xử lý xong. Như đã nói trước đó, các dòng Javascript thường nhỏ và đủ nhanh, tuy nhiên trong trường hợp này, chúng ta có quá nhiều kết quả và cần phải tính toán phức tạp, nặng nề trên các kết quả đó. Điều này có thể mất vài giây, và trong thời gian này việc thực thi các phần Javascript khác sẽ bị queue, điều đó có nghĩa ta sẽ block tất cả các user trong thời gian này nếu ta đang chạy một server trên cùng một app.
4. Tại sao các phiên bản trước v10.5.0 không thêm thread ?
Nhiều người nghĩ rằng cần phải thêm module mới trong Nodejs core để cho phép ta tạo và đồng bộ các thread. Tuy nhiên, không có cách nào để giải quyết tốt trường hợp này trong một server-side platform đã trưởng thành như Nodejs.
Nếu thêm thread vào, điều đó sẽ thay đổi bản chất của ngôn ngữ. Ta không thể chỉ thêm thread thành một tập class hay function có sẵn. Ta cần thay đổi ngôn ngữ. Những ngôn ngữ mà hỗ trợ multithreading thường có từ khóa như là “synchronized” để cho phép thread cộng tác, đồng bộ với nhau. Ví dụ như trong Java một vài kiểu số không không phải là atomic, điều đó có nghĩa là nếu ta không đồng bộ hóa các truy cập thì có thể xảy ra hiện tượng hai hay nhiều thread cùng thay đổi giá trị của biến đó và kết quả có thể dẫn đến các giá trị không hợp lệ.
5. Cách giải quyết đơn giản
Nodejs sẽ không đưa block code tiếp theo vào event queue cho đến khi block trước đó được thực thi xong. Vậy nên điều đơn giản ta có thể làm là là chia nhỏ code thành các phần synchronous code nhỏ hơn và dùng hàm setImmediate(callback) để nói với Nodejs rằng ta đã xong và mày có thể thực thi các phần đang chờ ở trong queue.
Điều này có thể giúp tiếp tục các vòng lặp tiếp theo hay gọi là tick của event loop. Cùng xem cách ta cấu trúc lại các dòng code để tận dụng lợi thế này. Giả sử ta có một array lớn mà ta muốn xử lý các item trong array mà việc xử lý này sẽ làm nặng CPU:
const arr = [/*large array*/] for (const item of arr) { // do heavy stuff for each item on the array } // code that runs after the whole array is executed
Như đã nói trước đó nếu ta xử lý toàn bộ array thì sẽ tốn quá nhiều thời gian để xử lý và sẽ block các phần Javascript còn lại. Vì thế hãy chia nhỏ chúng thành các phần nhỏ hơn và sử dụng hàm setImmediate(callback):
const crypto = require('crypto') const arr = new Array(200).fill('something') function processChunk() { if (arr.length === 0) { // code that runs after the whole array is executed } else { console.log('processing chunk'); // pick 10 items and remove them from the array const subarr = arr.splice(0, 10) for (const item of subarr) { // do heavy stuff for each item on the array doHeavyStuff(item) } // Put the function back in the queue setImmediate(processChunk) } } processChunk() function doHeavyStuff(item) { crypto.createHmac('sha256', 'secret').update(new Array(10000).fill(item).join('.')).digest('hex') } // This is just for confirming that we can continue // doing things let interval = setInterval(() => { console.log('tick!') if (arr.length === 0) clearInterval(interval) }, 0)
Bây giờ ta sẽ chỉ xử lý mỗi lần 10 item và gọi setImmediate(callback) vì vậy nếu có việc gì khác mà chương trình cần phải làm, nó sẽ được thực hiện chen giữa các khối 10 item.
Ta có thể thấy rằng code trở nên phức tạp hơn. Và nhiều khi thuật toán phức tạp hơn thì thật khó để biết nên gọi setImmediate() ở đâu là tốt. Bên cạnh đó, code sẽ trở thành asynchronous và nếu ta phụ thuộc vào các thư viện bên thứ ba, khi đó ta sẽ khó có thể và gần như không thể chia nhỏ code thành các phần nhỏ hơn.
6. Cách giải quyết thứ hai: dùng background process
setImmeditate() ổn trong vài trường hợp đơn giản, tuy nhiên nó không phải là cách giải quyết lý tưởng. Bên cạnh đó, ta không có thread và ta cũng không muốn thay đổi ngôn ngữ, liệu ta có thể xử lý song song mà không dùng thread không ? Câu trả lời là có, ta cần dùng một thứ gọi là background processing: một cách chạy một task với input, mà có thể sử dụng bất cứ lượng CPU nào và thời gian nó cần, và sau đó trả lại kết quả cho main app. Ví dụ như là:
// Runs `script.js` in a new environment without sharing memory. const service = createService('script.js') // We send an input and receive an output service.compute(data, function(err, result) { // result available here })
Thực tế rằng ta đã có thể thực hiện background processing trong Nodejs. Ta có thể fork process và sử dụng message passing. Main process có thể giao tiếp với child process bằng cách gửi và nhận các sự kiện. Không một vùng nhớ nào được chia sẻ.Tất cả sự trao đổi data đều được “clone” nghĩa là thay đổi nó từ một phía sẽ không thay đổi nó ở các phía khác. Giống như HTTP response, một khi gửi nó, phía còn lại chỉ là bản copy của nó. Nếu ta không chia sẻ vùng nhớ, sẽ không có race condition và ta cũng không cần thread. Vấn đề đã được giải quyết!
Tuy nhiên chờ đã. Đây là một giải pháp, tuy nhiên cũng không phải là giải pháp lý tưởng. Fork một process sẽ tốt rất nhiều tài nguyên và chậm chạp. Điều này có nghĩa là chạy thêm một máy ảo mới từ đầu sử dụng rất nhiều memory bởi vì các process không chia sẻ memory. Ta có thể tái sử dụng cùng process đã fork không? Đương nhiên rồi, nhưng gửi tác vụ nặng khác nhau sẽ được thực hiện đồng bộ (synchronous) trong process đã fork có hai vấn đề sau:
- Main app sẽ không bị block, nhưng process đã fork sẽ chỉ có thể xử lý một tác vụ một lúc. Nếu có hai tác vụ, giả sử tác vụ đầu tiên tốn 10 giây và tác vụ thứ hai tốn 1s (theo thứ tự), sẽ không tốt nếu phải đợi 10 giây để thực thi tác vụ thứ hai. Bởi vì ta đang fork các process, ta muốn tận dụng scheduling của OS và tất cả các lõi (core) của máy tính. Một cách tương tự là ta có thể vừa nghe nhạc và lướt web cùng 1 lúc bằng cách fork hai process và thực thi tất cả các task song song.
- Bên cạnh đó, nếu một task bị crash, các task còn lại trong cùng process sẽ không được xử lý.
Để giải quyết các vấn đề trên ta cần fork rất nhiều process, nhưng ta cần giới hạn số lượng process được fork vì sẽ tốn rất nhiều bộ nhớ. Giống như các kết nối database, ta cần một dãy các process sẵn sằng để sử dụng (pool of process), chạy một tác vụ trong một thời điểm và tái sử dụng process sau khi task hoàn thành. Nhìn sơ qua thì thấy nó phức tạp để implement, nhưng may thay đã có sẵn worker-farm để giúp chúng ta:
// main app const workerFarm = require('worker-farm') const service = workerFarm(require.resolve('./script')) service('hello', function (err, output) { console.log(output) }) // script.js // This will run in forked processes module.exports = (input, callback) => { callback(null, input + ' ' + world) }
7. Vấn đề có được giải quyết ?
Vậy thì, vấn đề đã được giải quyết chưa? Vâng, đã được giải quyết, nhưng ta vẫn sử dụng rất nhiều bộ nhớ hơn thay vì multithreading. Thread vẫn rất là gọn nhẹ so với forked process. Và đây là lý do Worker Thread được sinh ra.
Worker Thread có các context cô lập. Chúng trao đổi thông tin với main process bằng cách sử dụng message passing, vì vậy ta sẽ tránh được vấn đề race condition mà thread có. Tuy nhiên chúng vẫn cùng nằm trong một process, do đó chúng sẽ tốn ít bộ nhớ hơn nhiều.
Ok, ta giờ đây có thể chia sẻ vùng nhớ với Worker Thread bằng cách truyền đối tượng SharedArrayBuffer. Nên nhớ rằng chỉ nên sử dụng chúng khi bạn cần xử lý các tác vụ nặng yêu cầu rất nhiều data. Điều này sẽ cho phép ta tránh các bước tuần tự hóa dữ liệu.
8. Sử dụng worker thread
Bây giờ nếu bạn đang sử dụng Nodejs phiên bản v10.5.0 trở lên bạn có thể bắt đầu sử dụng Worker Thread. Tuy nhiên Worker Thread vẫn đang trong quá trình thử nghiệm và có thể thay đổi trong tương lai. Thực tế thì, nó không có sẵn theo mặc định, ta cần phải bật nó lên bằng cách sử dụng flag experimental-worker khi chạy Nodejs.
Ngoài ra, hãy nhớ rằng việc tạo ra một Worker (giống như thread ở các ngôn ngữ khác) mặc dù nó nhẹ hơn nhiều so với việc fork một process, cũng có thể sử dụng quá nhiều tài nguyên phụ thuộc theo như cầu của bạn. Trong trường hợp đó, theo như docs gợi ý rằng ta nên tạo một dãy worker (pool of workers). Bạn nên tham khảo các cách triển khai pool chuẩn hoặc sử dụng các gói NPM thay vì tự tạo cách triển khai pool của riêng mình.
Hãy xem qua ví dụ sau. Đầu tiên, ta hãy implement index.js nơi ta sẽ tạo một Worker Thread và gửi cho nó vài data.
// index.js // run with node --experimental-worker index.js on Node.js 10.x const { Worker } = require('worker_threads') function runService(workerData) { return new Promise((resolve, reject) => { const worker = new Worker('./service.js', { workerData }); worker.on('message', resolve); worker.on('error', reject); worker.on('exit', (code) => { if (code !== 0) reject(new Error(`Worker stopped with exit code ${code}`)); }) }) } async function run() { const result = await runService('world') console.log(result); } run().catch(err => console.error(err))
Theo như ta thấy rất dễ dàng để truyền tham số tên file và data mà ta muốn Worker xử lý. Hãy nhớ rằng data truyền qua sẽ được clone và không chia sẻ vùng nhớ. Sau đó, ta sẽ đợi Worker Thread gửi message đến bằng cách lắng nghe sự kiện “message”.
Bây giờ, ta cần phải implement file service.js
const { workerData, parentPort } = require('worker_threads') // You can do any heavy stuff here, in a synchronous way // without blocking the "main thread" parentPort.postMessage({ hello: workerData })
Ở đây ta cần 2 thứ: workerData mà main app gửi tới, và một cách để gửi trả lại thông tin cho main app. Điều này được thực hiện với parentPort mà biến này có một phương thức tên là postMessage nơi ta sẽ truyền kết quả của xử lý.
Đây chỉ là một ví dụ đơn giản, nhưng ta có thể tạo những thứ phức tạp hơn. Ví dụ, ta có thể gửi nhiều message từ Worker Thread mà cho biết trạng thái thực thi nếu ta muốn cung cấp feedback. Hoặc nếu ta muốn gửi một phần kết quả. Ví dụ như giả sử ta xử lý hàng ngàn hình ảnh, có thể ta muốn gửi một message khi mỗi hình ảnh đã được xử lý mà không phải đợi xử lý xong hết tất cả.
Để chạy ví dụ trên, nhớ rằng phải dùng flag experimental-worker nếu đang ở Nodejs v10.5.0 trở lên:
$ node --experimental-worker index.js
Xem thêm về worker thread ở trang chủ để biết thêm chi tiết.
9. Kết luận
Worker Thread là một module hứa hẹn để xử lý các tác vụ nặng CPU trong Nodejs app. Nó giống như thread nhưng lại không chia sẻ vùng nhớ và do đó sẽ không gây nên race condition. Mặc dù module này vẫn đang trong giai đoạn thử nghiệm những việc biết sơ qua nó cũng là một điều tốt. Hiện tại ta có thể vẫn sử dụng giải pháp background process và trong tương lai sau khi Worker Thread được hoàn thiện và trưởng thành, ta vẫn sẽ có thể dễ dàng chuyên đổi.