Important: This documentation is about an older version. It's relevant only to the release noted, many of the features and functions have been updated or replaced. Please view the current version.
enqueue(chunk)
The enqueue()
method of the
ReadableStreamDefaultController interface enqueues a chunk of data into the associated stream.
Parameters
Name | Type | Description |
---|---|---|
chunk | any | The chunk of data to enqueue into the stream. |
Exceptions
Exception | Description |
---|---|
TypeError | Thrown when the source object is not a ReadableStreamDefaultController. |
Example
import { open } from 'k6/experimental/fs';
import { ReadableStream } from 'k6/experimental/streams';
// Open a csv file containing the data to be read
let file;
(async function () {
file = await open('./data.csv');
})();
export default async function () {
let lineReaderState;
// Define a ReadableStream that reads lines from the file
// and parses them into objects with name and color properties.
const fileLinesStream = new ReadableStream({
// The start function is called when the readable stream is
// created. In here, you can connect to the data source
// and perform administrative tasks.
async start(controller) {
lineReaderState = {
buffer: new Uint8Array(1024),
remaining: '',
};
},
// The pull function is called repeatedly to get data, while the
// internal high water mark is not reached.
async pull(controller) {
const line = await getNextLine(file, lineReaderState);
if (line === null) {
controller.close();
return;
}
const [name, color] = line.split(',');
controller.enqueue({ name, color });
},
});
// Obtain and lock a reader to the stream
const reader = fileLinesStream.getReader();
try {
// Read and process each item from the stream
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
console.log(value);
}
} catch (error) {
console.error('Stream reading failed: ', error);
}
}
// getNextLine reads the next line from the file and returns it.
//
// It reads the file in chunks and buffers the remaining data
// to handle partial lines. It returns null when there are no
// more lines to read.
async function getNextLine(file, state) {
while (true) {
if (state.remaining.includes('\n')) {
const lineEndIndex = state.remaining.indexOf('\n');
const line = state.remaining.substring(0, lineEndIndex).trim();
state.remaining = state.remaining.substring(lineEndIndex + 1);
if (line) {
return line;
}
} else {
const bytesRead = await file.read(state.buffer);
if (bytesRead === null) {
// EOF
if (state.remaining) {
const finalLine = state.remaining.trim();
// Clear remaining to signal the end
state.remaining = '';
// Return the last non-empty line
return finalLine;
}
// Indicate that there are no more lines to read
return null;
}
state.remaining += String.fromCharCode.apply(
null,
new Uint8Array(state.buffer.slice(0, bytesRead))
);
}
}
}