otelcol.receiver.kafka
otelcol.receiver.kafka accepts telemetry data from a Kafka broker and forwards it to other otelcol.* components.
Note
otelcol.receiver.kafkais a wrapper over the upstream OpenTelemetry Collectorkafkareceiver. Bug reports or feature requests will be redirected to the upstream repository, if necessary.
You can specify multiple otelcol.receiver.kafka components by giving them different labels.
Usage
otelcol.receiver.kafka "<LABEL>" {
brokers = ["<BROKER_ADDR>"]
protocol_version = "<PROTOCOL_VERSION>"
output {
metrics = [...]
logs = [...]
traces = [...]
}
}Arguments
You can use the following arguments with otelcol.receiver.kafka:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
brokers | array(string) | Kafka brokers to connect to. | yes | |
protocol_version | string | Kafka protocol version to use. | yes | |
client_id | string | Consumer client ID to use. | "otel-collector" | no |
default_fetch_size | int | The default number of message bytes to fetch in a request. | 1048576 | no |
encoding | string | (Deprecated) Encoding of payload read from Kafka. | "otlp_proto" | no |
group_id | string | Consumer group to consume messages from. | "otel-collector" | no |
group_instance_id | string | A unique identifier for the consumer instance within a consumer group. | "" | no |
group_rebalance_strategy | string | The strategy used to assign partitions to consumers within a consumer group. | "range" | no |
heartbeat_interval | duration | The expected time between heartbeats to the consumer coordinator when using Kafka group management. | "3s" | no |
initial_offset | string | Initial offset to use if no offset was previously committed. | "latest" | no |
max_fetch_size | int | The maximum number of message bytes to fetch in a request. | 0 | no |
max_fetch_wait | duration | The maximum amount of time the broker should wait for min_fetch_size bytes to be available before returning anyway. | "250ms" | no |
min_fetch_size | int | The minimum number of message bytes to fetch in a request. | 1 | no |
resolve_canonical_bootstrap_servers_only | bool | Whether to resolve then reverse-lookup broker IPs during startup. | false | no |
session_timeout | duration | The request timeout for detecting client failures when using Kafka group management. | "10s" | no |
topic | string | (Deprecated) Kafka topic to read from. | See below | no |
Warning
The
topicandencodingarguments are deprecated in favor of the [logs][logs], [metrics][metrics], and [traces][traces] blocks.
For max_fetch_size, the value 0 means no limit.
initial_offset must be either "latest" or "earliest".
The group_rebalance_strategy argument determines how Kafka distributes topic partitions among the consumers in the group during rebalances.
Supported strategies are:
range: This strategy assigns partitions to consumers based on a range. It aims to distribute partitions evenly across consumers, but it can lead to uneven distribution if the number of partitions is not a multiple of the number of consumers. For more information, refer to the Kafka RangeAssignor documentation, see RangeAssignor.roundrobin: This strategy assigns partitions to consumers in a round-robin fashion. It ensures a more even distribution of partitions across consumers, especially when the number of partitions is not a multiple of the number of consumers. For more information, refer to the Kafka RoundRobinAssignor documentation, see RoundRobinAssignor.sticky: This strategy aims to maintain the same partition assignments during rebalances as much as possible. It minimizes the number of partition movements, which can be beneficial for stateful consumers. For more information, refer to the Kafka StickyAssignor documentation, see StickyAssignor.
Using a group_instance_id is useful for stateful consumers or when you need to ensure that a specific consumer instance is always assigned the same set of partitions.
- If
group_instance_idis set to a non-empty string, the consumer is treated as a static member of the group. This means that the consumer will maintain its partition assignments across restarts and rebalances, as long as it rejoins the group with the samegroup_instance_id. - If
group_instance_idis set to an empty string (or not set), the consumer is treated as a dynamic member. In this case, the consumer’s partition assignments may change during rebalances.
Blocks
You can use the following blocks with otelcol.receiver.kafka:
| Block | Description | Required |
|---|---|---|
output | Configures where to send received telemetry data. | yes |
authentication | Configures authentication for connecting to Kafka brokers. | no |
authentication > kerberos | Authenticates against Kafka brokers with Kerberos. | no |
authentication > plaintext | (Deprecated) Authenticates against Kafka brokers with plaintext. | no |
authentication > sasl | Authenticates against Kafka brokers with SASL. | no |
authentication > sasl > aws_msk | Additional SASL parameters when using AWS_MSK_IAM_OAUTHBEARER. | no |
authentication > tls | (Deprecated) Configures TLS for connecting to the Kafka brokers. | no |
authentication > tls > tpm | Configures TPM settings for the TLS key_file. | no |
autocommit | Configures how to automatically commit updated topic offsets to back to the Kafka brokers. | no |
debug_metrics | Configures the metrics which this component generates to monitor its state. | no |
logs | Configures how to send logs to Kafka brokers. | no |
error_backoff | Configures how to handle errors when receiving messages from Kafka. | no |
header_extraction | Extract headers from Kafka records. | no |
message_marking | Configures when Kafka messages are marked as read. | no |
metadata | Configures how to retrieve metadata from Kafka brokers. | no |
metadata > retry | Configures how to retry metadata retrieval. | no |
metrics | Configures how to send metrics to Kafka brokers. | no |
traces | Configures how to send traces to Kafka brokers. | no |
tls[] | Configures TLS for connecting to the Kafka brokers. | no |
tls > tpm | Configures TPM settings for the TLS key_file. | no |
The > symbol indicates deeper levels of nesting.
For example, authentication > tls refers to a tls block defined inside an authentication block.
output
RequiredThe output block configures a set of components to forward resulting telemetry data to.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
logs | list(otelcol.Consumer) | List of consumers to send logs to. | [] | no |
metrics | list(otelcol.Consumer) | List of consumers to send metrics to. | [] | no |
traces | list(otelcol.Consumer) | List of consumers to send traces to. | [] | no |
You must specify the output block, but all its arguments are optional.
By default, telemetry data is dropped.
Configure the metrics, logs, and traces arguments accordingly to send telemetry data to other components.
logs
The logs block configures how to receive logs from Kafka brokers.
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
encoding | string | The encoding for logs. Refer to Supported encodings. | "otlp_proto" | no |
topic | string | The name of the Kafka topic on which logs will be received. | "otlp_logs" | no |
metrics
The metrics block configures how to receive metrics from Kafka brokers.
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
encoding | string | The encoding for logs. Refer to Supported encodings. | "otlp_proto" | no |
topic | string | The name of the Kafka topic on which metrics will be received. | "otlp_metrics" | no |
traces
The traces block configures how to receive traces from Kafka brokers.
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
encoding | string | The encoding for logs. Refer to Supported encodings. | "otlp_proto" | no |
topic | string | The name of the Kafka topic on which traces will be received. | "otlp_spans" | no |
authentication
The authentication block holds the definition of different authentication mechanisms to use when connecting to Kafka brokers.
It doesn’t support any arguments and is configured fully through inner blocks.
kerberos
The kerberos block configures Kerberos authentication against the Kafka broker.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
config_file | string | Path to Kerberos location, for example, /etc/krb5.conf. | no | |
disable_fast_negotiation | bool | Disable PA-FX-FAST negotiation. | false | no |
keytab_file | string | Path to keytab file, for example, /etc/security/kafka.keytab. | no | |
password | secret | Kerberos password to authenticate with. | no | |
realm | string | Kerberos realm. | no | |
service_name | string | Kerberos service name. | no | |
use_keytab | string | Enables using keytab instead of password. | no | |
username | string | Kerberos username to authenticate as. | yes |
When use_keytab is false, the password argument is required.
When use_keytab is true, the file pointed to by the keytab_file argument is used for authentication instead.
At most one of password or keytab_file must be provided.
disable_fast_negotiation is useful for Kerberos implementations which don’t support PA-FX-FAST (Pre-Authentication Framework - Fast) negotiation.
plaintext
Caution
The
plaintextblock has been deprecated. Usesaslwithmechanismset toPLAINinstead.
The plaintext block configures plain text authentication against Kafka brokers.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
password | secret | Password to use for plain text authentication. | yes | |
username | string | Username to use for plain text authentication. | yes |
sasl
The sasl block configures SASL authentication against Kafka brokers.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
mechanism | string | SASL mechanism to use when authenticating. | yes | |
password | secret | Password to use for SASL authentication. | yes | |
username | string | Username to use for SASL authentication. | yes | |
version | number | Version of the SASL Protocol to use when authenticating. | 0 | no |
You can set the mechanism argument to one of the following strings:
"PLAIN""SCRAM-SHA-256""SCRAM-SHA-512""AWS_MSK_IAM_OAUTHBEARER"
When mechanism is set to "AWS_MSK_IAM_OAUTHBEARER", the aws_msk child block must also be provided.
You can set the version argument to either 0 or 1.
aws_msk
The aws_msk block configures extra parameters for SASL authentication when using the AWS_MSK_IAM_OAUTHBEARER mechanisms.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
region | string | AWS region the MSK cluster is based in. | yes |
tls
The tls block configures TLS settings used for connecting to the Kafka brokers.
If the tls block isn’t provided, TLS won’t be used for communication.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
ca_file | string | Path to the CA file. | no | |
ca_pem | string | CA PEM-encoded text to validate the server with. | no | |
cert_file | string | Path to the TLS certificate. | no | |
cert_pem | string | Certificate PEM-encoded text for client authentication. | no | |
cipher_suites | list(string) | A list of TLS cipher suites that the TLS transport can use. | [] | no |
curve_preferences | list(string) | Set of elliptic curves to use in a handshake. | [] | no |
include_system_ca_certs_pool | boolean | Whether to load the system certificate authorities pool alongside the certificate authority. | false | no |
insecure_skip_verify | boolean | Ignores insecure server TLS certificates. | no | |
insecure | boolean | Disables TLS when connecting to the configured server. | no | |
key_file | string | Path to the TLS certificate key. | no | |
key_pem | secret | Key PEM-encoded text for client authentication. | no | |
max_version | string | Maximum acceptable TLS version for connections. | "TLS 1.3" | no |
min_version | string | Minimum acceptable TLS version for connections. | "TLS 1.2" | no |
reload_interval | duration | The duration after which the certificate is reloaded. | "0s" | no |
server_name | string | Verifies the hostname of server certificates when set. | no |
If the server doesn’t support TLS, you must set the insecure argument to true.
To disable tls for connections to the server, set the insecure argument to true.
If you set reload_interval to "0s", the certificate never reloaded.
The following pairs of arguments are mutually exclusive and can’t both be set simultaneously:
ca_pemandca_filecert_pemandcert_filekey_pemandkey_file
If cipher_suites is left blank, a safe default list is used.
Refer to the Go TLS documentation for a list of supported cipher suites.
The curve_preferences argument determines the set of elliptic curves to prefer during a handshake in preference order.
If not provided, a default list is used.
The set of elliptic curves available are X25519, P521, P256, and P384.
tpm
The tpm block configures retrieving the TLS key_file from a trusted device.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
auth | string | The authorization value used to authenticate the TPM device. | "" | no |
enabled | bool | Load the tls.key_file from TPM. | false | no |
owner_auth | string | The owner authorization value used to authenticate the TPM device. | "" | no |
path | string | Path to the TPM device or Unix domain socket. | "" | no |
The trusted platform module (TPM) configuration can be used for loading TLS key from TPM. Currently only TSS2 format is supported.
The path attribute is not supported on Windows.
Example
otelcol.example.component "<LABEL>" {
...
tls {
...
key_file = "my-tss2-key.key"
tpm {
enabled = true
path = "/dev/tpmrm0"
}
}
}In the above example, the private key my-tss2-key.key in TSS2 format will be loaded from the TPM device /dev/tmprm0.
autocommit
The autocommit block configures how to automatically commit updated topic offsets back to the Kafka brokers.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
enable | bool | Enable autocommitting updated topic offsets. | true | no |
interval | duration | How frequently to autocommit. | "1s" | no |
debug_metrics
The debug_metrics block configures the metrics that this component generates to monitor its state.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
disable_high_cardinality_metrics | boolean | Whether to disable certain high cardinality metrics. | true | no |
disable_high_cardinality_metrics is the Alloy equivalent to the telemetry.disableHighCardinalityMetrics feature gate in the OpenTelemetry Collector.
It removes attributes that could cause high cardinality metrics.
For example, attributes with IP addresses and port numbers in metrics about HTTP and gRPC connections are removed.
Note
If configured,
disable_high_cardinality_metricsonly applies tootelcol.exporter.*andotelcol.receiver.*components.
error_backoff
The error_backoff block configures how failed requests to Kafka are retried.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
enabled | boolean | Enables retrying failed requests. | false | no |
initial_interval | duration | Initial time to wait before retrying a failed request. | "0s" | no |
max_elapsed_time | duration | Maximum time to wait before discarding a failed batch. | "0s" | no |
max_interval | duration | Maximum time to wait between retries. | "0s" | no |
multiplier | number | Factor to grow wait time before retrying. | 0 | no |
randomization_factor | number | Factor to randomize wait time before retrying. | 0 | no |
When enabled is true, failed batches are retried after a given interval.
The initial_interval argument specifies how long to wait before the first retry attempt.
If requests continue to fail, the time to wait before retrying increases by the factor specified by the multiplier argument, which must be greater than 1.0.
The max_interval argument specifies the upper bound of how long to wait between retries.
The randomization_factor argument is useful for adding jitter between retrying Alloy instances.
If randomization_factor is greater than 0, the wait time before retries is multiplied by a random factor in the range [ I - randomization_factor * I, I + randomization_factor * I], where I is the current interval.
If a batch hasn’t been sent successfully, it’s discarded after the time specified by max_elapsed_time elapses.
If max_elapsed_time is set to "0s", failed requests are retried forever until they succeed.
header_extraction
The header_extraction block configures how to extract headers from Kafka records.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
extract_headers | bool | Enables attaching header fields to resource attributes. | false | no |
headers | list(string) | A list of headers to extract from the Kafka record. | [] | no |
Regular expressions aren’t allowed in the headers argument.
Only exact matching will be performed.
message_marking
The message_marking block configures when Kafka messages are marked as read.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
after_execution | bool | Mark messages after forwarding telemetry data to other components. | false | no |
include_unsuccessful | bool | Whether failed forwards should be marked as read. | false | no |
By default, a Kafka message is marked as read immediately after it’s retrieved from the Kafka broker.
If the after_execution argument is true, messages are only read after the telemetry data is forwarded to components specified in the output block.
When after_execution is true, messages are only marked as read when they’re decoded successfully and components where the data was forwarded didn’t return an error.
If the include_unsuccessful argument is true, messages are marked as read even if decoding or forwarding failed.
Setting include_unsuccessful has no effect if after_execution is false.
Warning
Setting
after_executiontotrueandinclude_unsuccessfultofalsecan block the entire Kafka partition if message processing returns a permanent error, such as failing to decode.
metadata
The metadata block configures how to retrieve and store metadata from the Kafka broker.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
full | bool | Whether to maintain a full set of metadata. | true | no |
refresh_interval | duration | The frequency at which cluster metadata is refreshed. | "10m" | no |
When full is set to false, the client does not make the initial request to broker at the startup.
Retrieving metadata may fail if the Kafka broker is starting up at the same time as the Alloy component.
The retry child block can be provided to customize retry behavior.
retry
The retry block configures how to retry retrieving metadata when retrieval fails.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
backoff | duration | Time to wait between retries. | "250ms" | no |
max_retries | number | How many times to reattempt retrieving metadata. | 3 | no |
Exported fields
otelcol.receiver.kafka doesn’t export any fields.
Supported encodings
otelcol.receiver.kafka supports encoding extensions, as well as the following built-in encodings.
Available for all signals:
otlp_proto: the payload is decoded as OTLP Protobufotlp_json: the payload is decoded as OTLP JSON
Available only for traces:
jaeger_proto: the payload is deserialized to a single Jaeger protoSpan.jaeger_json: the payload is deserialized to a single Jaeger JSON Span usingjsonpb.zipkin_proto: the payload is deserialized into a list of Zipkin proto spans.zipkin_json: the payload is deserialized into a list of Zipkin V2 JSON spans.zipkin_thrift: the payload is deserialized into a list of Zipkin Thrift spans.
Available only for logs:
raw: the payload’s bytes are inserted as the body of a log record.text: the payload are decoded as text and inserted as the body of a log record. By default, it uses UTF-8 to decode. You can usetext_<ENCODING>, liketext_utf-8,text_shift_jis, etc., to customize this behavior.json: the payload is decoded as JSON and inserted as the body of a log record.azure_resource_logs: the payload is converted from Azure Resource Logs format to OTel format.
Message header propagation
otelcol.receiver.kafka will extract Kafka message headers and include them as request metadata (context).
This metadata can then be used throughout the pipeline, for example to set attributes using otelcol.processor.attributes.
Component health
otelcol.receiver.kafka is only reported as unhealthy if given an invalid
configuration.
Debug information
otelcol.receiver.kafka doesn’t expose any component-specific debug information.
Example
This example forwards read telemetry data through a batch processor before finally sending it to an OTLP-capable endpoint:
otelcol.receiver.kafka "default" {
brokers = ["localhost:9092"]
protocol_version = "2.0.0"
output {
metrics = [otelcol.processor.batch.default.input]
logs = [otelcol.processor.batch.default.input]
traces = [otelcol.processor.batch.default.input]
}
}
otelcol.processor.batch "default" {
output {
metrics = [otelcol.exporter.otlp.default.input]
logs = [otelcol.exporter.otlp.default.input]
traces = [otelcol.exporter.otlp.default.input]
}
}
otelcol.exporter.otlp "default" {
client {
endpoint = sys.env("OTLP_ENDPOINT")
}
}Compatible components
otelcol.receiver.kafka can accept arguments from the following components:
- Components that export OpenTelemetry
otelcol.Consumer
Note
Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. Refer to the linked documentation for more details.



