enqueue(chunk)
Documentation
Grafana k6
JavaScript API
k6/experimental
streams
ReadableStreamDefaultController
enqueue(chunk)
Open source
enqueue(chunk)
The enqueue()
method of the ReadableStreamDefaultController interface enqueues a chunk of data into the associated stream.
Parameters
Name | Type | Description |
---|---|---|
chunk | any | The chunk of data to enqueue into the stream. |
Exceptions
Exception | Description |
---|---|
TypeError | Thrown when the source object is not a ReadableStreamDefaultController. |
Example
JavaScript
import { open } from 'k6/experimental/fs';
import { ReadableStream } from 'k6/experimental/streams';
// Open a csv file containing the data to be read
const file = await open('./data.csv');
export default async function () {
let lineReaderState;
// Define a ReadableStream that reads lines from the file
// and parses them into objects with name and color properties.
const fileLinesStream = new ReadableStream({
// The start function is called when the readable stream is
// created. In here, you can connect to the data source
// and perform administrative tasks.
async start(controller) {
lineReaderState = {
buffer: new Uint8Array(1024),
remaining: '',
};
},
// The pull function is called repeatedly to get data, while the
// internal high water mark is not reached.
async pull(controller) {
const line = await getNextLine(file, lineReaderState);
if (line === null) {
controller.close();
return;
}
const [name, color] = line.split(',');
controller.enqueue({ name, color });
},
});
// Obtain and lock a reader to the stream
const reader = fileLinesStream.getReader();
try {
// Read and process each item from the stream
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
console.log(value);
}
} catch (error) {
console.error('Stream reading failed: ', error);
}
}
// getNextLine reads the next line from the file and returns it.
//
// It reads the file in chunks and buffers the remaining data
// to handle partial lines. It returns null when there are no
// more lines to read.
async function getNextLine(file, state) {
while (true) {
if (state.remaining.includes('\n')) {
const lineEndIndex = state.remaining.indexOf('\n');
const line = state.remaining.substring(0, lineEndIndex).trim();
state.remaining = state.remaining.substring(lineEndIndex + 1);
if (line) {
return line;
}
} else {
const bytesRead = await file.read(state.buffer);
if (bytesRead === null) {
// EOF
if (state.remaining) {
const finalLine = state.remaining.trim();
// Clear remaining to signal the end
state.remaining = '';
// Return the last non-empty line
return finalLine;
}
// Indicate that there are no more lines to read
return null;
}
state.remaining += String.fromCharCode.apply(
null,
new Uint8Array(state.buffer.slice(0, bytesRead))
);
}
}
}
Was this page helpful?
Related documentation
Related resources from Grafana Labs
Additional helpful documentation, links, and articles:
60 min

Performance testing and observability in Grafana Cloud
In this webinar, learn how Grafana Cloud k6 offers you the best developer experience for performance testing.
60 min

User-centered observability: load testing, real user monitoring, and synthetics
Learn how to use load testing, synthetic monitoring, and real user monitoring (RUM) to understand end users' experience of your apps. Watch on demand.