Important: This documentation is about an older version. It's relevant only to the release noted, many of the features and functions have been updated or replaced. Please view the current version.
otelcol.receiver.kafka
otelcol.receiver.kafka accepts telemetry data from a Kafka broker and forwards it to other otelcol.* components.
Note
otelcol.receiver.kafkais a wrapper over the upstream OpenTelemetry Collectorkafkareceiver from theotelcol-contribdistribution. Bug reports or feature requests will be redirected to the upstream repository, if necessary.
You can specify multiple otelcol.receiver.kafka components by giving them different labels.
Usage
otelcol.receiver.kafka "<LABEL>" {
brokers = ["<BROKER_ADDR>"]
protocol_version = "<PROTOCOL_VERSION>"
output {
metrics = [...]
logs = [...]
traces = [...]
}
}Arguments
You can use the following arguments with otelcol.receiver.kafka:
For max_fetch_size, the value 0 means no limit.
If topic isn’t set, different topics will be used for different telemetry signals:
- Metrics will be received from an
otlp_metricstopic. - Traces will be received from an
otlp_spanstopic. - Logs will be received from an
otlp_logstopic.
If topic is set to a specific value, then only the signal type that corresponds to the data stored in the topic must be set in the output block.
For example, if topic is set to "my_telemetry", then the "my_telemetry" topic can only contain either metrics, logs, or traces.
If it contains only metrics, then otelcol.receiver.kafka should be configured to output only metrics.
The encoding argument determines how to decode messages read from Kafka.
encoding supports encoding extensions.
It tries to load an encoding extension and falls back to internal encodings if no extension was loaded.
Available internal encodings:
"azure_resource_logs": The payload is converted from Azure Resource Logs format to an OTLP log."jaeger_json": Decode messages as a single Jaeger JSON span."jaeger_proto": Decode messages as a single Jaeger protobuf span."json": Decode the JSON payload and insert it into the body of a log record."otlp_json": Decode messages as OTLP JSON."otlp_proto": Decode messages as OTLP protobuf."raw": Copy the log message bytes into the body of a log record."text": Decode the log message as text and insert it into the body of a log record. By default, UTF-8 is used to decode. A different encoding can be chosen by usingtext_<ENCODING>. For example,text_utf-8ortext_shift_jis."zipkin_json": Decode messages as a list of Zipkin JSON spans."zipkin_proto": Decode messages as a list of Zipkin protobuf spans."zipkin_thrift": Decode messages as a list of Zipkin Thrift spans.
"otlp_proto" must be used to read all telemetry types from Kafka.
Other encodings are signal-specific.
initial_offset must be either "latest" or "earliest".
Blocks
You can use the following blocks with otelcol.receiver.kafka:
The > symbol indicates deeper levels of nesting.
For example, authentication > tls refers to a tls block defined inside an authentication block.
output
Required
The output block configures a set of components to forward resulting telemetry data to.
The following arguments are supported:
You must specify the output block, but all its arguments are optional.
By default, telemetry data is dropped.
Configure the metrics, logs, and traces arguments accordingly to send telemetry data to other components.
authentication
The authentication block holds the definition of different authentication mechanisms to use when connecting to Kafka brokers.
It doesn’t support any arguments and is configured fully through inner blocks.
kerberos
The kerberos block configures Kerberos authentication against the Kafka broker.
The following arguments are supported:
When use_keytab is false, the password argument is required.
When use_keytab is true, the file pointed to by the keytab_file argument is used for authentication instead.
At most one of password or keytab_file must be provided.
disable_fast_negotiation is useful for Kerberos implementations which don’t support PA-FX-FAST (Pre-Authentication Framework - Fast) negotiation.
plaintext
The plaintext block configures plain text authentication against Kafka brokers.
The following arguments are supported:
sasl
The sasl block configures SASL authentication against Kafka brokers.
The following arguments are supported:
You can set the mechanism argument to one of the following strings:
"PLAIN""AWS_MSK_IAM""SCRAM-SHA-256""SCRAM-SHA-512""AWS_MSK_IAM_OAUTHBEARER"
When mechanism is set to "AWS_MSK_IAM", the aws_msk child block must also be provided.
You can set the version argument to either 0 or 1.
aws_msk
The aws_msk block configures extra parameters for SASL authentication when using the AWS_MSK_IAM or AWS_MSK_IAM_OAUTHBEARER mechanisms.
The following arguments are supported:
tls
The tls block configures TLS settings used for connecting to the Kafka brokers.
If the tls block isn’t provided, TLS won’t be used for communication.
The following arguments are supported:
If the server doesn’t support TLS, you must set the insecure argument to true.
To disable tls for connections to the server, set the insecure argument to true.
If you set reload_interval to "0s", the certificate never reloaded.
The following pairs of arguments are mutually exclusive and can’t both be set simultaneously:
ca_pemandca_filecert_pemandcert_filekey_pemandkey_file
If cipher_suites is left blank, a safe default list is used.
Refer to the Go TLS documentation for a list of supported cipher suites.
The curve_preferences argument determines the set of elliptic curves to prefer during a handshake in preference order.
If not provided, a default list is used.
The set of elliptic curves available are X25519, P521, P256, and P384.
autocommit
The autocommit block configures how to automatically commit updated topic offsets back to the Kafka brokers.
The following arguments are supported:
debug_metrics
The debug_metrics block configures the metrics that this component generates to monitor its state.
The following arguments are supported:
disable_high_cardinality_metrics is the Alloy equivalent to the telemetry.disableHighCardinalityMetrics feature gate in the OpenTelemetry Collector.
It removes attributes that could cause high cardinality metrics.
For example, attributes with IP addresses and port numbers in metrics about HTTP and gRPC connections are removed.
Note
If configured,
disable_high_cardinality_metricsonly applies tootelcol.exporter.*andotelcol.receiver.*components.
error_backoff
The error_backoff block configures how failed requests to Kafka are retried.
The following arguments are supported:
When enabled is true, failed batches are retried after a given interval.
The initial_interval argument specifies how long to wait before the first retry attempt.
If requests continue to fail, the time to wait before retrying increases by the factor specified by the multiplier argument, which must be greater than 1.0.
The max_interval argument specifies the upper bound of how long to wait between retries.
The randomization_factor argument is useful for adding jitter between retrying Alloy instances.
If randomization_factor is greater than 0, the wait time before retries is multiplied by a random factor in the range [ I - randomization_factor * I, I + randomization_factor * I], where I is the current interval.
If a batch hasn’t been sent successfully, it’s discarded after the time specified by max_elapsed_time elapses.
If max_elapsed_time is set to "0s", failed requests are retried forever until they succeed.
header_extraction
The header_extraction block configures how to extract headers from Kafka records.
The following arguments are supported:
Regular expressions aren’t allowed in the headers argument.
Only exact matching will be performed.
message_marking
The message_marking block configures when Kafka messages are marked as read.
The following arguments are supported:
By default, a Kafka message is marked as read immediately after it’s retrieved from the Kafka broker.
If the after_execution argument is true, messages are only read after the telemetry data is forwarded to components specified in the output block.
When after_execution is true, messages are only marked as read when they’re decoded successfully and components where the data was forwarded didn’t return an error.
If the include_unsuccessful argument is true, messages are marked as read even if decoding or forwarding failed.
Setting include_unsuccessful has no effect if after_execution is false.
Warning
Setting
after_executiontotrueandinclude_unsuccessfultofalsecan block the entire Kafka partition if message processing returns a permanent error, such as failing to decode.
metadata
The metadata block configures how to retrieve and store metadata from the Kafka broker.
The following arguments are supported:
If the include_all_topics argument is true, a full set of metadata for all topics is maintained rather than the minimal set that has been necessary so far.
Including the full set of metadata is more convenient for users but can consume a substantial amount of memory if you have many topics and partitions.
Retrieving metadata may fail if the Kafka broker is starting up at the same time as the Alloy component.
The retry child block can be provided to customize retry behavior.
retry
The retry block configures how to retry retrieving metadata when retrieval fails.
The following arguments are supported:
Exported fields
otelcol.receiver.kafka doesn’t export any fields.
Component health
otelcol.receiver.kafka is only reported as unhealthy if given an invalid
configuration.
Debug information
otelcol.receiver.kafka doesn’t expose any component-specific debug information.
Example
This example forwards read telemetry data through a batch processor before finally sending it to an OTLP-capable endpoint:
otelcol.receiver.kafka "default" {
brokers = ["localhost:9092"]
protocol_version = "2.0.0"
output {
metrics = [otelcol.processor.batch.default.input]
logs = [otelcol.processor.batch.default.input]
traces = [otelcol.processor.batch.default.input]
}
}
otelcol.processor.batch "default" {
output {
metrics = [otelcol.exporter.otlp.default.input]
logs = [otelcol.exporter.otlp.default.input]
traces = [otelcol.exporter.otlp.default.input]
}
}
otelcol.exporter.otlp "default" {
client {
endpoint = sys.env("OTLP_ENDPOINT")
}
}Compatible components
otelcol.receiver.kafka can accept arguments from the following components:
- Components that export OpenTelemetry
otelcol.Consumer
Note
Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. Refer to the linked documentation for more details.



