Important: This documentation is about an older version. It's relevant only to the release noted, many of the features and functions have been updated or replaced. Please view the current version.
otelcol.exporter.kafka
otelcol.exporter.kafka accepts logs, metrics, and traces telemetry data from
other otelcol components and sends it to Kafka.
It is important to use otelcol.exporter.kafka together with otelcol.processor.batch
to make sure otelcol.exporter.kafka doesn’t slow down due to sending Kafka a huge number of small payloads.
Note
otelcol.exporter.kafkais a wrapper over the upstream OpenTelemetry Collectorkafkaexporter from theotelcol-contribdistribution. Bug reports or feature requests will be redirected to the upstream repository, if necessary.
Multiple otelcol.exporter.kafka components can be specified by giving them
different labels.
Usage
otelcol.exporter.kafka "LABEL" {
protocol_version = "PROTOCOL_VERSION"
}Arguments
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
protocol_version | string | Kafka protocol version to use. | yes | |
brokers | list(string) | Kafka brokers to connect to. | ["localhost:9092"] | no |
topic | string | Kafka topic to send to. | See below | no |
topic_from_attribute | string | A resource attribute whose value should be used as the message’s topic. | "" | no |
encoding | string | Encoding of payload read from Kafka. | "otlp_proto" | no |
client_id | string | Consumer client ID to use. The ID will be used for all produce requests. | "sarama" | no |
timeout | duration | The timeout for every attempt to send data to the backend. | "5s" | no |
resolve_canonical_bootstrap_servers_only | bool | Whether to resolve then reverse-lookup broker IPs during startup. | "false" | no |
partition_traces_by_id | bool | Whether to include the trace ID as the message key in trace messages sent to Kafka. | "false" | no |
partition_metrics_by_resource_attributes | bool | Whether to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to Kafka. | "false" | no |
If topic is not set, different topics will be used for different telemetry signals:
- Metrics will be sent to an
otlp_metricstopic. - Traces will be sent to an
otlp_spanstopic. - Logs will be sent to an
otlp_logstopic.
If topic is set, the same topic will be used for all telemetry signals - metrics, logs, and traces.
When topic_from_attribute is set, it will take precedence over topic.
The encoding argument determines how to encode messages sent to Kafka.
encoding must be one of the following strings:
- Encodings which work for traces, logs, and metrics:
"otlp_proto": Encode messages as OTLP protobuf."otlp_json": Encode messages as OTLP JSON.
- Encodings which work only for traces:
"jaeger_proto": The payload is serialized to a single Jaeger protoSpan, and keyed by TraceID."jaeger_json": The payload is serialized to a single Jaeger JSON Span usingjsonpb, and keyed by TraceID."zipkin_proto": The payload is serialized to Zipkin v2 proto Span."zipkin_json": The payload is serialized to Zipkin v2 JSON Span.
- Encodings which work only for logs:
"raw": If the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
partition_traces_by_id does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.
Blocks
The following blocks are supported inside the definition of otelcol.exporter.kafka:
| Hierarchy | Block | Description | Required |
|---|---|---|---|
| authentication | authentication | Configures authentication for connecting to Kafka brokers. | no |
| authentication > plaintext | plaintext | Authenticates against Kafka brokers with plaintext. | no |
| authentication > sasl | sasl | Authenticates against Kafka brokers with SASL. | no |
| authentication > sasl > aws_msk | aws_msk | Additional SASL parameters when using AWS_MSK_IAM. | no |
| authentication > tls | tls | Configures TLS for connecting to the Kafka brokers. | no |
| authentication > kerberos | kerberos | Authenticates against Kafka brokers with Kerberos. | no |
| metadata | metadata | Configures how to retrieve metadata from Kafka brokers. | no |
| metadata > retry | retry | Configures how to retry metadata retrieval. | no |
| retry_on_failure | retry_on_failure | Configures retry mechanism for failed requests. | no |
| sending_queue | sending_queue | Configures batching of data before sending. | no |
| producer | producer | Kafka producer configuration, | no |
| debug_metrics | debug_metrics | Configures the metrics which this component generates to monitor its state. | no |
The > symbol indicates deeper levels of nesting.
For example, authentication > tls refers to a tls block defined inside an authentication block.
authentication block
The authentication block holds the definition of different authentication
mechanisms to use when connecting to Kafka brokers. It doesn’t support any
arguments and is configured fully through inner blocks.
plaintext block
The plaintext block configures plain text authentication against Kafka brokers.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
username | string | Username to use for plain text authentication. | yes | |
password | secret | Password to use for plain text authentication. | yes |
sasl block
The sasl block configures SASL authentication against Kafka brokers.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
username | string | Username to use for SASL authentication. | yes | |
password | secret | Password to use for SASL authentication. | yes | |
mechanism | string | SASL mechanism to use when authenticating. | yes | |
version | number | Version of the SASL Protocol to use when authenticating. | 0 | no |
The mechanism argument can be set to one of the following strings:
"PLAIN""AWS_MSK_IAM""SCRAM-SHA-256""SCRAM-SHA-512"
When mechanism is set to "AWS_MSK_IAM", the aws_msk child block must also be provided.
The version argument can be set to either 0 or 1.
aws_msk block
The aws_msk block configures extra parameters for SASL authentication when
using the AWS_MSK_IAM mechanism.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
region | string | AWS region the MSK cluster is based in. | yes | |
broker_addr | string | MSK address to connect to for authentication. | yes |
tls block
The tls block configures TLS settings used for connecting to the Kafka
brokers. If the tls block isn’t provided, TLS won’t be used for
communication.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
ca_file | string | Path to the CA file. | no | |
ca_pem | string | CA PEM-encoded text to validate the server with. | no | |
cert_file | string | Path to the TLS certificate. | no | |
cert_pem | string | Certificate PEM-encoded text for client authentication. | no | |
insecure_skip_verify | boolean | Ignores insecure server TLS certificates. | no | |
include_system_ca_certs_pool | boolean | Whether to load the system certificate authorities pool alongside the certificate authority. | false | no |
insecure | boolean | Disables TLS when connecting to the configured server. | no | |
key_file | string | Path to the TLS certificate key. | no | |
key_pem | secret | Key PEM-encoded text for client authentication. | no | |
max_version | string | Maximum acceptable TLS version for connections. | "TLS 1.3" | no |
min_version | string | Minimum acceptable TLS version for connections. | "TLS 1.2" | no |
cipher_suites | list(string) | A list of TLS cipher suites that the TLS transport can use. | [] | no |
reload_interval | duration | The duration after which the certificate is reloaded. | "0s" | no |
server_name | string | Verifies the hostname of server certificates when set. | no |
If the server doesn’t support TLS, you must set the insecure argument to true.
To disable tls for connections to the server, set the insecure argument to true.
If reload_interval is set to "0s", the certificate never reloaded.
The following pairs of arguments are mutually exclusive and can’t both be set simultaneously:
ca_pemandca_filecert_pemandcert_filekey_pemandkey_file
If cipher_suites is left blank, a safe default list is used.
See the Go TLS documentation for a list of supported cipher suites.
kerberos block
The kerberos block configures Kerberos authentication against the Kafka
broker.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
service_name | string | Kerberos service name. | no | |
realm | string | Kerberos realm. | no | |
use_keytab | string | Enables using keytab instead of password. | no | |
username | string | Kerberos username to authenticate as. | yes | |
password | secret | Kerberos password to authenticate with. | no | |
config_file | string | Path to Kerberos location (for example, /etc/krb5.conf). | no | |
keytab_file | string | Path to keytab file (for example, /etc/security/kafka.keytab). | no |
When use_keytab is false, the password argument is required. When
use_keytab is true, the file pointed to by the keytab_file argument is
used for authentication instead. At most one of password or keytab_file
must be provided.
metadata block
The metadata block configures how to retrieve and store metadata from the
Kafka broker.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
include_all_topics | bool | When true, maintains metadata for all topics. | true | no |
If the include_all_topics argument is true,
a full set of metadata for all topics is maintained rather than the minimal set
that has been necessary so far. Including the full set of metadata is more
convenient for users but can consume a substantial amount of memory if you have
many topics and partitions.
Retrieving metadata may fail if the Kafka broker is starting up at the same
time as the Alloy component. The retry child block can be provided to customize retry behavior.
retry block
The retry block configures how to retry retrieving metadata when retrieval
fails.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
max_retries | number | How many times to reattempt retrieving metadata. | 3 | no |
backoff | duration | Time to wait between retries. | "250ms" | no |
retry_on_failure block
The retry_on_failure block configures how failed requests to Kafka are retried.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
enabled | boolean | Enables retrying failed requests. | true | no |
initial_interval | duration | Initial time to wait before retrying a failed request. | "5s" | no |
max_elapsed_time | duration | Maximum time to wait before discarding a failed batch. | "5m" | no |
max_interval | duration | Maximum time to wait between retries. | "30s" | no |
multiplier | number | Factor to grow wait time before retrying. | 1.5 | no |
randomization_factor | number | Factor to randomize wait time before retrying. | 0.5 | no |
When enabled is true, failed batches are retried after a given interval.
The initial_interval argument specifies how long to wait before the first retry attempt.
If requests continue to fail, the time to wait before retrying increases by the factor specified by the multiplier argument, which must be greater than 1.0.
The max_interval argument specifies the upper bound of how long to wait between retries.
The randomization_factor argument is useful for adding jitter between retrying Alloy instances.
If randomization_factor is greater than 0, the wait time before retries is multiplied by a random factor in the range [ I - randomization_factor * I, I + randomization_factor * I], where I is the current interval.
If a batch hasn’t been sent successfully, it is discarded after the time specified by max_elapsed_time elapses.
If max_elapsed_time is set to "0s", failed requests are retried forever until they succeed.
sending_queue block
The sending_queue block configures an in-memory buffer of batches before data is sent to the gRPC server.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
enabled | boolean | Enables an in-memory buffer before sending data to the client. | true | no |
num_consumers | number | Number of readers to send batches written to the queue in parallel. | 10 | no |
queue_size | number | Maximum number of unwritten batches allowed in the queue at the same time. | 1000 | no |
When enabled is true, data is first written to an in-memory buffer before sending it to the configured server.
Batches sent to the component’s input exported field are added to the buffer as long as the number of unsent batches doesn’t exceed the configured queue_size.
queue_size determines how long an endpoint outage is tolerated.
Assuming 100 requests/second, the default queue size 1000 provides about 10 seconds of outage tolerance.
To calculate the correct value for queue_size, multiply the average number of outgoing requests per second by the time in seconds that outages are tolerated. A very high value can cause Out Of Memory (OOM) kills.
The num_consumers argument controls how many readers read from the buffer and send data in parallel.
Larger values of num_consumers allow data to be sent more quickly at the expense of increased network traffic.
producer block
The producer block configures how to retry retrieving metadata when retrieval fails.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
max_message_bytes | number | The maximum permitted size of a message in bytes. | 1000000 | no |
required_acks | number | Controls when a message is regarded as transmitted. | 1 | no |
compression | string | Time to wait between retries. | "none" | no |
flush_max_messages | number | Time to wait between retries. | 0 | no |
Refer to the sarama documentation for more information on required_acks.
compression could be set to either none, gzip, snappy, lz4, or zstd.
Refer to the Sarama documentation for more information.
debug_metrics block
The debug_metrics block configures the metrics that this component generates to monitor its state.
The following arguments are supported:
| Name | Type | Description | Default | Required |
|---|---|---|---|---|
disable_high_cardinality_metrics | boolean | Whether to disable certain high cardinality metrics. | true | no |
level | string | Controls the level of detail for metrics emitted by the wrapped collector. | "detailed" | no |
disable_high_cardinality_metrics is the Grafana Alloy equivalent to the telemetry.disableHighCardinalityMetrics feature gate in the OpenTelemetry Collector.
It removes attributes that could cause high cardinality metrics.
For example, attributes with IP addresses and port numbers in metrics about HTTP and gRPC connections are removed.
Note
If configured,
disable_high_cardinality_metricsonly applies tootelcol.exporter.*andotelcol.receiver.*components.
level is the Alloy equivalent to the telemetry.metrics.level feature gate in the OpenTelemetry Collector.
Possible values are "none", "basic", "normal" and "detailed".
Exported fields
The following fields are exported and can be referenced by other components:
| Name | Type | Description |
|---|---|---|
input | otelcol.Consumer | A value that other components can use to send telemetry data to. |
input accepts otelcol.Consumer data for any telemetry signal (metrics, logs, or traces).
Component health
otelcol.exporter.kafka is only reported as unhealthy if given an invalid
configuration.
Debug information
otelcol.exporter.kafka does not expose any component-specific debug
information.
Example
This example forwards telemetry data through a batch processor before finally sending it to Kafka:
otelcol.receiver.otlp "default" {
http {}
grpc {}
output {
metrics = [otelcol.processor.batch.default.input]
logs = [otelcol.processor.batch.default.input]
traces = [otelcol.processor.batch.default.input]
}
}
otelcol.processor.batch "default" {
output {
metrics = [otelcol.exporter.kafka.default.input]
logs = [otelcol.exporter.kafka.default.input]
traces = [otelcol.exporter.kafka.default.input]
}
}
otelcol.exporter.kafka "default" {
brokers = ["localhost:9092"]
protocol_version = "2.0.0"
}Compatible components
otelcol.exporter.kafka has exports that can be consumed by the following components:
- Components that consume OpenTelemetry
otelcol.Consumer
Note
Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. Refer to the linked documentation for more details.



