enqueue(chunk)
Documentation
Grafana k6
JavaScript API
k6/experimental
streams
ReadableStreamDefaultController
enqueue(chunk)
Open source
enqueue(chunk)
The enqueue()
method of the
ReadableStreamDefaultController interface enqueues a chunk of data into the associated stream.
Parameters
Name | Type | Description |
---|---|---|
chunk | any | The chunk of data to enqueue into the stream. |
Exceptions
Exception | Description |
---|---|
TypeError | Thrown when the source object is not a ReadableStreamDefaultController. |
Example
JavaScript
import { open } from 'k6/experimental/fs';
import { ReadableStream } from 'k6/experimental/streams';
// Open a csv file containing the data to be read
const file = await open('./data.csv');
export default async function () {
let lineReaderState;
// Define a ReadableStream that reads lines from the file
// and parses them into objects with name and color properties.
const fileLinesStream = new ReadableStream({
// The start function is called when the readable stream is
// created. In here, you can connect to the data source
// and perform administrative tasks.
async start(controller) {
lineReaderState = {
buffer: new Uint8Array(1024),
remaining: '',
};
},
// The pull function is called repeatedly to get data, while the
// internal high water mark is not reached.
async pull(controller) {
const line = await getNextLine(file, lineReaderState);
if (line === null) {
controller.close();
return;
}
const [name, color] = line.split(',');
controller.enqueue({ name, color });
},
});
// Obtain and lock a reader to the stream
const reader = fileLinesStream.getReader();
try {
// Read and process each item from the stream
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
console.log(value);
}
} catch (error) {
console.error('Stream reading failed: ', error);
}
}
// getNextLine reads the next line from the file and returns it.
//
// It reads the file in chunks and buffers the remaining data
// to handle partial lines. It returns null when there are no
// more lines to read.
async function getNextLine(file, state) {
while (true) {
if (state.remaining.includes('\n')) {
const lineEndIndex = state.remaining.indexOf('\n');
const line = state.remaining.substring(0, lineEndIndex).trim();
state.remaining = state.remaining.substring(lineEndIndex + 1);
if (line) {
return line;
}
} else {
const bytesRead = await file.read(state.buffer);
if (bytesRead === null) {
// EOF
if (state.remaining) {
const finalLine = state.remaining.trim();
// Clear remaining to signal the end
state.remaining = '';
// Return the last non-empty line
return finalLine;
}
// Indicate that there are no more lines to read
return null;
}
state.remaining += String.fromCharCode.apply(
null,
new Uint8Array(state.buffer.slice(0, bytesRead))
);
}
}
}
Was this page helpful?
Related resources from Grafana Labs
Additional helpful documentation, links, and articles:

Intro to Kubernetes monitoring in Grafana Cloud
Learn how Grafana offers developers and SREs a simple and quick-to-value solution for monitoring their Kubernetes infrastructure.

Optimizing Kubernetes Operations with Grafana Cloud
Learn to effectively manage Kubernetes cost auto-de-scaling by implementing strategic automation and essential guardrails, explore distinctive advantages and applications of Prometheus and OТel in Kubernetes environments, and more.

How to use Grafana Cloud to avoid common Kubernetes monitoring mistakes
Learn about the most common Kubernetes monitoring mistakes - we'll share best practices on how to set up Kubernetes monitoring the optimal way.