Menu

This is documentation for the next version of K6. For the latest stable release, go to the latest version.

Open source

enqueue(chunk)

The enqueue() method of the ReadableStreamDefaultController interface enqueues a chunk of data into the associated stream.

Parameters

NameTypeDescription
chunkanyThe chunk of data to enqueue into the stream.

Exceptions

ExceptionDescription
TypeErrorThrown when the source object is not a ReadableStreamDefaultController.

Example

JavaScript
import { open } from 'k6/experimental/fs';
import { ReadableStream } from 'k6/experimental/streams';

// Open a csv file containing the data to be read
let file;
(async function () {
  file = await open('./data.csv');
})();

export default async function () {
  let lineReaderState;

  // Define a ReadableStream that reads lines from the file
  // and parses them into objects with name and color properties.
  const fileLinesStream = new ReadableStream({
    // The start function is called when the readable stream is
    // created. In here, you can connect to the data source
    // and perform administrative tasks.
    async start(controller) {
      lineReaderState = {
        buffer: new Uint8Array(1024),
        remaining: '',
      };
    },

    // The pull function is called repeatedly to get data, while the
    // internal high water mark is not reached.
    async pull(controller) {
      const line = await getNextLine(file, lineReaderState);
      if (line === null) {
        controller.close();
        return;
      }

      const [name, color] = line.split(',');
      controller.enqueue({ name, color });
    },
  });

  // Obtain and lock a reader to the stream
  const reader = fileLinesStream.getReader();

  try {
    // Read and process each item from the stream
    while (true) {
      const { done, value } = await reader.read();
      if (done) {
        break;
      }

      console.log(value);
    }
  } catch (error) {
    console.error('Stream reading failed: ', error);
  }
}

// getNextLine reads the next line from the file and returns it.
//
// It reads the file in chunks and buffers the remaining data
// to handle partial lines. It returns null when there are no
// more lines to read.
async function getNextLine(file, state) {
  while (true) {
    if (state.remaining.includes('\n')) {
      const lineEndIndex = state.remaining.indexOf('\n');
      const line = state.remaining.substring(0, lineEndIndex).trim();

      state.remaining = state.remaining.substring(lineEndIndex + 1);

      if (line) {
        return line;
      }
    } else {
      const bytesRead = await file.read(state.buffer);
      if (bytesRead === null) {
        // EOF

        if (state.remaining) {
          const finalLine = state.remaining.trim();

          // Clear remaining to signal the end
          state.remaining = '';

          // Return the last non-empty line
          return finalLine;
        }

        // Indicate that there are no more lines to read
        return null;
      }

      state.remaining += String.fromCharCode.apply(
        null,
        new Uint8Array(state.buffer.slice(0, bytesRead))
      );
    }
  }
}