Menu
Open source

otelcol.receiver.kafka

otelcol.receiver.kafka accepts telemetry data from a Kafka broker and forwards it to other otelcol.* components.

NOTE: otelcol.receiver.kafka is a wrapper over the upstream OpenTelemetry Collector kafka receiver from the otelcol-contrib distribution. Bug reports or feature requests will be redirected to the upstream repository, if necessary.

Multiple otelcol.receiver.kafka components can be specified by giving them different labels.

Usage

river
otelcol.receiver.kafka "LABEL" {
  brokers          = ["BROKER_ADDR"]
  protocol_version = "PROTOCOL_VERSION"

  output {
    metrics = [...]
    logs    = [...]
    traces  = [...]
  }
}

Arguments

The following arguments are supported:

NameTypeDescriptionDefaultRequired
brokersarray(string)Kafka brokers to connect to.yes
protocol_versionstringKafka protocol version to use.yes
topicstringKafka topic to read from."otlp_spans"no
encodingstringEncoding of payload read from Kafka."otlp_proto"no
group_idstringConsumer group to consume messages from."otel-collector"no
client_idstringConsumer client ID to use."otel-collector"no
initial_offsetstringInitial offset to use if no offset was previously committed."latest"no

The encoding argument determines how to decode messages read from Kafka. encoding must be one of the following strings:

  • "otlp_proto": Decode messages as OTLP protobuf.
  • "jaeger_proto": Decode messages as a single Jaeger protobuf span.
  • "jaeger_json": Decode messages as a single Jaeger JSON span.
  • "zipkin_proto": Decode messages as a list of Zipkin protobuf spans.
  • "zipkin_json": Decode messages as a list of Zipkin JSON spans.
  • "zipkin_thrift": Decode messages as a list of Zipkin Thrift spans.
  • "raw": Copy the log message bytes into the body of a log record.
  • "text": Decode the log message as text and insert it into the body of a log record. By default, UTF-8 is used to decode. A different encoding can be chosen by using text_<ENCODING>. For example, text_utf-8 or text_shift_jis.
  • "json": Decode the JSON payload and insert it into the body of a log record.

"otlp_proto" must be used to read all telemetry types from Kafka; other encodings are signal-specific.

initial_offset must be either "latest" or "earliest".

Blocks

The following blocks are supported inside the definition of otelcol.receiver.kafka:

HierarchyBlockDescriptionRequired
authenticationauthenticationConfigures authentication for connecting to Kafka brokers.no
authentication > plaintextplaintextAuthenticates against Kafka brokers with plaintext.no
authentication > saslsaslAuthenticates against Kafka brokers with SASL.no
authentication > sasl > aws_mskaws_mskAdditional SASL parameters when using AWS_MSK_IAM.no
authentication > tlstlsConfigures TLS for connecting to the Kafka brokers.no
authentication > kerberoskerberosAuthenticates against Kafka brokers with Kerberos.no
metadatametadataConfigures how to retrieve metadata from Kafka brokers.no
metadata > retryretryConfigures how to retry metadata retrieval.no
autocommitautocommitConfigures how to automatically commit updated topic offsets to back to the Kafka brokers.no
message_markingmessage_markingConfigures when Kafka messages are marked as read.no
header_extractionheader_extractionExtract headers from Kafka records.no
debug_metricsdebug_metricsConfigures the metrics which this component generates to monitor its state.no
outputoutputConfigures where to send received telemetry data.yes

The > symbol indicates deeper levels of nesting. For example, authentication > tls refers to a tls block defined inside an authentication block.

authentication block

The authentication block holds the definition of different authentication mechanisms to use when connecting to Kafka brokers. It doesn’t support any arguments and is configured fully through inner blocks.

plaintext block

The plaintext block configures PLAIN authentication against Kafka brokers.

The following arguments are supported:

NameTypeDescriptionDefaultRequired
usernamestringUsername to use for PLAIN authentication.yes
passwordsecretPassword to use for PLAIN authentication.yes

sasl block

The sasl block configures SASL authentication against Kafka brokers.

The following arguments are supported:

NameTypeDescriptionDefaultRequired
usernamestringUsername to use for SASL authentication.yes
passwordsecretPassword to use for SASL authentication.yes
mechanismstringSASL mechanism to use when authenticating.yes
versionnumberVersion of the SASL Protocol to use when authenticating.0no

The mechanism argument can be set to one of the following strings:

  • "PLAIN"
  • "AWS_MSK_IAM"
  • "SCRAM-SHA-256"
  • "SCRAM-SHA-512"

When mechanism is set to "AWS_MSK_IAM", the aws_msk child block must also be provided.

The version argument can be set to either 0 or 1.

aws_msk block

The aws_msk block configures extra parameters for SASL authentication when using the AWS_MSK_IAM mechanism.

The following arguments are supported:

NameTypeDescriptionDefaultRequired
regionstringAWS region the MSK cluster is based in.yes
broker_addrstringMSK address to connect to for authentication.yes

tls block

The tls block configures TLS settings used for connecting to the Kafka brokers. If the tls block isn’t provided, TLS won’t be used for communication.

The following arguments are supported:

NameTypeDescriptionDefaultRequired
ca_filestringPath to the CA file.no
ca_pemstringCA PEM-encoded text to validate the server with.no
cert_filestringPath to the TLS certificate.no
cert_pemstringCertificate PEM-encoded text for client authentication.no
insecure_skip_verifybooleanIgnores insecure server TLS certificates.no
insecurebooleanDisables TLS when connecting to the configured server.no
key_filestringPath to the TLS certificate key.no
key_pemsecretKey PEM-encoded text for client authentication.no
max_versionstringMaximum acceptable TLS version for connections."TLS 1.3"no
min_versionstringMinimum acceptable TLS version for connections."TLS 1.2"no
reload_intervaldurationThe duration after which the certificate is reloaded."0s"no
server_namestringVerifies the hostname of server certificates when set.no

If the server doesn’t support TLS, you must set the insecure argument to true.

To disable tls for connections to the server, set the insecure argument to true.

If reload_interval is set to "0s", the certificate never reloaded.

The following pairs of arguments are mutually exclusive and can’t both be set simultaneously:

  • ca_pem and ca_file
  • cert_pem and cert_file
  • key_pem and key_file

kerberos block

The kerberos block configures Kerberos authentication against the Kafka broker.

The following arguments are supported:

NameTypeDescriptionDefaultRequired
service_namestringKerberos service name.no
realmstringKerberos realm.no
use_keytabstringEnables using keytab instead of password.no
usernamestringKerberos username to authenticate as.yes
passwordsecretKerberos password to authenticate with.no
config_filestringPath to Kerberos location (for example, /etc/krb5.conf).no
keytab_filestringPath to keytab file (for example, /etc/security/kafka.keytab).no

When use_keytab is false, the password argument is required. When use_keytab is true, the file pointed to by the keytab_file argument is used for authentication instead. At most one of password or keytab_file must be provided.

metadata block

The metadata block configures how to retrieve and store metadata from the Kafka broker.

The following arguments are supported:

NameTypeDescriptionDefaultRequired
include_all_topicsboolWhen true, maintains metadata for all topics.trueno

If the include_all_topics argument is true, otelcol.receiver.kafka maintains a full set of metadata for all topics rather than the minimal set that has been necessary so far. Including the full set of metadata is more convenient for users but can consume a substantial amount of memory if you have many topics and partitions.

Retrieving metadata may fail if the Kafka broker is starting up at the same time as the otelcol.receiver.kafka component. The retry child block can be provided to customize retry behavior.

retry block

The retry block configures how to retry retrieving metadata when retrieval fails.

The following arguments are supported:

NameTypeDescriptionDefaultRequired
max_retriesnumberHow many times to reattempt retrieving metadata.3no
backoffdurationTime to wait between retries."250ms"no

autocommit block

The autocommit block configures how to automatically commit updated topic offsets back to the Kafka brokers.

The following arguments are supported:

NameTypeDescriptionDefaultRequired
enableboolEnable autocommitting updated topic offsets.trueno
intervaldurationHow frequently to autocommit."1s"no

message_marking block

The message_marking block configures when Kafka messages are marked as read.

The following arguments are supported:

NameTypeDescriptionDefaultRequired
after_executionboolMark messages after forwarding telemetry data to other components.falseno
include_unsuccessfulboolWhether failed forwards should be marked as read.falseno

By default, a Kafka message is marked as read immediately after it is retrieved from the Kafka broker. If the after_execution argument is true, messages are only read after the telemetry data is forwarded to components specified in the output block.

When after_execution is true, messages are only marked as read when they are decoded successfully and components where the data was forwarded did not return an error. If the include_unsuccessful argument is true, messages are marked as read even if decoding or forwarding failed. Setting include_unsuccessful has no effect if after_execution is false.

WARNING: Setting after_execution to true and include_unsuccessful to false can block the entire Kafka partition if message processing returns a permanent error, such as failing to decode.

header_extraction block

The header_extraction block configures how to extract headers from Kafka records.

The following arguments are supported:

NameTypeDescriptionDefaultRequired
extract_headersboolEnables attaching header fields to resource attributes.falseno
headerslist(string)A list of headers to extract from the Kafka record.[]no

Regular expressions are not allowed in the headers argument. Only exact matching will be performed.

debug_metrics block

The debug_metrics block configures the metrics that this component generates to monitor its state.

The following arguments are supported:

NameTypeDescriptionDefaultRequired
disable_high_cardinality_metricsbooleanWhether to disable certain high cardinality metrics.trueno

disable_high_cardinality_metrics is the Grafana Agent equivalent to the telemetry.disableHighCardinalityMetrics feature gate in the OpenTelemetry Collector. It removes attributes that could cause high cardinality metrics. For example, attributes with IP addresses and port numbers in metrics about HTTP and gRPC connections are removed.

output block

The output block configures a set of components to forward resulting telemetry data to.

The following arguments are supported:

NameTypeDescriptionDefaultRequired
logslist(otelcol.Consumer)List of consumers to send logs to.[]no
metricslist(otelcol.Consumer)List of consumers to send metrics to.[]no
traceslist(otelcol.Consumer)List of consumers to send traces to.[]no

You must specify the output block, but all its arguments are optional. By default, telemetry data is dropped. Configure the metrics, logs, and traces arguments accordingly to send telemetry data to other components.

Exported fields

otelcol.receiver.kafka does not export any fields.

Component health

otelcol.receiver.kafka is only reported as unhealthy if given an invalid configuration.

Debug information

otelcol.receiver.kafka does not expose any component-specific debug information.

Example

This example forwards read telemetry data through a batch processor before finally sending it to an OTLP-capable endpoint:

river
otelcol.receiver.kafka "default" {
  brokers          = ["localhost:9092"]
  protocol_version = "2.0.0"

  output {
    metrics = [otelcol.processor.batch.default.input]
    logs    = [otelcol.processor.batch.default.input]
    traces  = [otelcol.processor.batch.default.input]
  }
}

otelcol.processor.batch "default" {
  output {
    metrics = [otelcol.exporter.otlp.default.input]
    logs    = [otelcol.exporter.otlp.default.input]
    traces  = [otelcol.exporter.otlp.default.input]
  }
}

otelcol.exporter.otlp "default" {
  client {
    endpoint = env("OTLP_ENDPOINT")
  }
}

Compatible components

otelcol.receiver.kafka can accept arguments from the following components:

Note

Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. Refer to the linked documentation for more details.