---
title: "otelcol.receiver.kafka | Grafana Agent documentation"
description: "Learn about otelcol.receiver.kafka"
---

> For a curated documentation index, see [llms.txt](/llms.txt). For the complete documentation index, see [llms-full.txt](/llms-full.txt).

# otelcol.receiver.kafka

`otelcol.receiver.kafka` accepts telemetry data from a Kafka broker and forwards it to other `otelcol.*` components.

> **NOTE**: `otelcol.receiver.kafka` is a wrapper over the upstream OpenTelemetry Collector `kafka` receiver from the `otelcol-contrib` distribution. Bug reports or feature requests will be redirected to the upstream repository, if necessary.

Multiple `otelcol.receiver.kafka` components can be specified by giving them different labels.

## Usage

Alloy ![Copy code to clipboard](/media/images/icons/icon-copy-small-2.svg) Copy

```alloy
otelcol.receiver.kafka "LABEL" {
  brokers          = ["BROKER_ADDR"]
  protocol_version = "PROTOCOL_VERSION"

  output {
    metrics = [...]
    logs    = [...]
    traces  = [...]
  }
}
```

## Arguments

The following arguments are supported:

Expand table

| Name                                       | Type            | Description                                                       | Default            | Required |
|--------------------------------------------|-----------------|-------------------------------------------------------------------|--------------------|----------|
| `brokers`                                  | `array(string)` | Kafka brokers to connect to.                                      |                    | yes      |
| `protocol_version`                         | `string`        | Kafka protocol version to use.                                    |                    | yes      |
| `topic`                                    | `string`        | Kafka topic to read from.                                         |                    | no       |
| `encoding`                                 | `string`        | Encoding of payload read from Kafka.                              | `"otlp_proto"`     | no       |
| `group_id`                                 | `string`        | Consumer group to consume messages from.                          | `"otel-collector"` | no       |
| `client_id`                                | `string`        | Consumer client ID to use.                                        | `"otel-collector"` | no       |
| `initial_offset`                           | `string`        | Initial offset to use if no offset was previously committed.      | `"latest"`         | no       |
| `resolve_canonical_bootstrap_servers_only` | `bool`          | Whether to resolve then reverse-lookup broker IPs during startup. | `"false"`          | no       |

If `topic` is not set, different topics will be used for different telemetry signals:

- Metrics will be received from an `otlp_metrics` topic.
- Traces will be received from an `otlp_spans` topic.
- Logs will be received from an `otlp_logs` topic.

If `topic` is set to a specific value, then only the signal type that corresponds to the data stored in the topic must be set in the output block. For example, if `topic` is set to `"my_telemetry"`, then the `"my_telemetry"` topic can only contain either metrics, logs, or traces. If it contains only metrics, then `otelcol.receiver.kafka` should be configured to output only metrics.

The `encoding` argument determines how to decode messages read from Kafka. `encoding` must be one of the following strings:

- `"otlp_proto"`: Decode messages as OTLP protobuf.
- `"jaeger_proto"`: Decode messages as a single Jaeger protobuf span.
- `"jaeger_json"`: Decode messages as a single Jaeger JSON span.
- `"zipkin_proto"`: Decode messages as a list of Zipkin protobuf spans.
- `"zipkin_json"`: Decode messages as a list of Zipkin JSON spans.
- `"zipkin_thrift"`: Decode messages as a list of Zipkin Thrift spans.
- `"raw"`: Copy the log message bytes into the body of a log record.
- `"text"`: Decode the log message as text and insert it into the body of a log record. By default, UTF-8 is used to decode. A different encoding can be chosen by using `text_<ENCODING>`. For example, `text_utf-8` or `text_shift_jis`.
- `"json"`: Decode the JSON payload and insert it into the body of a log record.
- `"azure_resource_logs"`: The payload is converted from Azure Resource Logs format to an OTLP log.

`"otlp_proto"` must be used to read all telemetry types from Kafka; other encodings are signal-specific.

`initial_offset` must be either `"latest"` or `"earliest"`.

## Blocks

The following blocks are supported inside the definition of `otelcol.receiver.kafka`:

Expand table

| Hierarchy                              | Block                                          | Description                                                                                | Required |
|----------------------------------------|------------------------------------------------|--------------------------------------------------------------------------------------------|----------|
| authentication                         | [authentication](#authentication-block)        | Configures authentication for connecting to Kafka brokers.                                 | no       |
| authentication &gt; plaintext          | [plaintext](#plaintext-block)                  | Authenticates against Kafka brokers with plaintext.                                        | no       |
| authentication &gt; sasl               | [sasl](#sasl-block)                            | Authenticates against Kafka brokers with SASL.                                             | no       |
| authentication &gt; sasl &gt; aws\_msk | [aws\_msk](#aws_msk-block)                     | Additional SASL parameters when using AWS\_MSK\_IAM.                                       | no       |
| authentication &gt; tls                | [tls](#tls-block)                              | Configures TLS for connecting to the Kafka brokers.                                        | no       |
| authentication &gt; kerberos           | [kerberos](#kerberos-block)                    | Authenticates against Kafka brokers with Kerberos.                                         | no       |
| metadata                               | [metadata](#metadata-block)                    | Configures how to retrieve metadata from Kafka brokers.                                    | no       |
| metadata &gt; retry                    | [retry](#retry-block)                          | Configures how to retry metadata retrieval.                                                | no       |
| autocommit                             | [autocommit](#autocommit-block)                | Configures how to automatically commit updated topic offsets to back to the Kafka brokers. | no       |
| message\_marking                       | [message\_marking](#message_marking-block)     | Configures when Kafka messages are marked as read.                                         | no       |
| header\_extraction                     | [header\_extraction](#header_extraction-block) | Extract headers from Kafka records.                                                        | no       |
| debug\_metrics                         | [debug\_metrics](#debug_metrics-block)         | Configures the metrics which this component generates to monitor its state.                | no       |
| output                                 | [output](#output-block)                        | Configures where to send received telemetry data.                                          | yes      |

The `>` symbol indicates deeper levels of nesting. For example, `authentication > tls` refers to a `tls` block defined inside an `authentication` block.

### authentication block

The `authentication` block holds the definition of different authentication mechanisms to use when connecting to Kafka brokers. It doesn’t support any arguments and is configured fully through inner blocks.

### plaintext block

The `plaintext` block configures `PLAIN` authentication against Kafka brokers.

The following arguments are supported:

Expand table

| Name       | Type     | Description                                 | Default | Required |
|------------|----------|---------------------------------------------|---------|----------|
| `username` | `string` | Username to use for `PLAIN` authentication. |         | yes      |
| `password` | `secret` | Password to use for `PLAIN` authentication. |         | yes      |

### sasl block

The `sasl` block configures SASL authentication against Kafka brokers.

The following arguments are supported:

Expand table

| Name        | Type     | Description                                              | Default | Required |
|-------------|----------|----------------------------------------------------------|---------|----------|
| `username`  | `string` | Username to use for SASL authentication.                 |         | yes      |
| `password`  | `secret` | Password to use for SASL authentication.                 |         | yes      |
| `mechanism` | `string` | SASL mechanism to use when authenticating.               |         | yes      |
| `version`   | `number` | Version of the SASL Protocol to use when authenticating. | `0`     | no       |

The `mechanism` argument can be set to one of the following strings:

- `"PLAIN"`
- `"AWS_MSK_IAM"`
- `"SCRAM-SHA-256"`
- `"SCRAM-SHA-512"`

When `mechanism` is set to `"AWS_MSK_IAM"`, the [`aws_msk` child block](#aws_msk-block) must also be provided.

The `version` argument can be set to either `0` or `1`.

### aws\_msk block

The `aws_msk` block configures extra parameters for SASL authentication when using the `AWS_MSK_IAM` mechanism.

The following arguments are supported:

Expand table

| Name          | Type     | Description                                   | Default | Required |
|---------------|----------|-----------------------------------------------|---------|----------|
| `region`      | `string` | AWS region the MSK cluster is based in.       |         | yes      |
| `broker_addr` | `string` | MSK address to connect to for authentication. |         | yes      |

### tls block

The `tls` block configures TLS settings used for connecting to the Kafka brokers. If the `tls` block isn’t provided, TLS won’t be used for communication.

The following arguments are supported:

Expand table

| Name                           | Type           | Description                                                                                  | Default     | Required |
|--------------------------------|----------------|----------------------------------------------------------------------------------------------|-------------|----------|
| `ca_file`                      | `string`       | Path to the CA file.                                                                         |             | no       |
| `ca_pem`                       | `string`       | CA PEM-encoded text to validate the server with.                                             |             | no       |
| `cert_file`                    | `string`       | Path to the TLS certificate.                                                                 |             | no       |
| `cert_pem`                     | `string`       | Certificate PEM-encoded text for client authentication.                                      |             | no       |
| `insecure_skip_verify`         | `boolean`      | Ignores insecure server TLS certificates.                                                    |             | no       |
| `include_system_ca_certs_pool` | `boolean`      | Whether to load the system certificate authorities pool alongside the certificate authority. | `false`     | no       |
| `insecure`                     | `boolean`      | Disables TLS when connecting to the configured server.                                       |             | no       |
| `key_file`                     | `string`       | Path to the TLS certificate key.                                                             |             | no       |
| `key_pem`                      | `secret`       | Key PEM-encoded text for client authentication.                                              |             | no       |
| `max_version`                  | `string`       | Maximum acceptable TLS version for connections.                                              | `"TLS 1.3"` | no       |
| `min_version`                  | `string`       | Minimum acceptable TLS version for connections.                                              | `"TLS 1.2"` | no       |
| `cipher_suites`                | `list(string)` | A list of TLS cipher suites that the TLS transport can use.                                  | `[]`        | no       |
| `reload_interval`              | `duration`     | The duration after which the certificate is reloaded.                                        | `"0s"`      | no       |
| `server_name`                  | `string`       | Verifies the hostname of server certificates when set.                                       |             | no       |

If the server doesn’t support TLS, you must set the `insecure` argument to `true`.

To disable `tls` for connections to the server, set the `insecure` argument to `true`.

If `reload_interval` is set to `"0s"`, the certificate never reloaded.

The following pairs of arguments are mutually exclusive and can’t both be set simultaneously:

- `ca_pem` and `ca_file`
- `cert_pem` and `cert_file`
- `key_pem` and `key_file`

If `cipher_suites` is left blank, a safe default list is used. See the [Go TLS documentation](https://go.dev/src/crypto/tls/cipher_suites.go) for a list of supported cipher suites.

### kerberos block

The `kerberos` block configures Kerberos authentication against the Kafka broker.

The following arguments are supported:

Expand table

| Name           | Type     | Description                                                      | Default | Required |
|----------------|----------|------------------------------------------------------------------|---------|----------|
| `service_name` | `string` | Kerberos service name.                                           |         | no       |
| `realm`        | `string` | Kerberos realm.                                                  |         | no       |
| `use_keytab`   | `string` | Enables using keytab instead of password.                        |         | no       |
| `username`     | `string` | Kerberos username to authenticate as.                            |         | yes      |
| `password`     | `secret` | Kerberos password to authenticate with.                          |         | no       |
| `config_file`  | `string` | Path to Kerberos location (for example, `/etc/krb5.conf`).       |         | no       |
| `keytab_file`  | `string` | Path to keytab file (for example, `/etc/security/kafka.keytab`). |         | no       |

When `use_keytab` is `false`, the `password` argument is required. When `use_keytab` is `true`, the file pointed to by the `keytab_file` argument is used for authentication instead. At most one of `password` or `keytab_file` must be provided.

### metadata block

The `metadata` block configures how to retrieve and store metadata from the Kafka broker.

The following arguments are supported:

Expand table

| Name                 | Type   | Description                                   | Default | Required |
|----------------------|--------|-----------------------------------------------|---------|----------|
| `include_all_topics` | `bool` | When true, maintains metadata for all topics. | `true`  | no       |

If the `include_all_topics` argument is `true`, `otelcol.receiver.kafka` maintains a full set of metadata for all topics rather than the minimal set that has been necessary so far. Including the full set of metadata is more convenient for users but can consume a substantial amount of memory if you have many topics and partitions.

Retrieving metadata may fail if the Kafka broker is starting up at the same time as the `otelcol.receiver.kafka` component. The [`retry` child block](#retry-block) can be provided to customize retry behavior.

### retry block

The `retry` block configures how to retry retrieving metadata when retrieval fails.

The following arguments are supported:

Expand table

| Name          | Type       | Description                                      | Default   | Required |
|---------------|------------|--------------------------------------------------|-----------|----------|
| `max_retries` | `number`   | How many times to reattempt retrieving metadata. | `3`       | no       |
| `backoff`     | `duration` | Time to wait between retries.                    | `"250ms"` | no       |

### autocommit block

The `autocommit` block configures how to automatically commit updated topic offsets back to the Kafka brokers.

The following arguments are supported:

Expand table

| Name       | Type       | Description                                  | Default | Required |
|------------|------------|----------------------------------------------|---------|----------|
| `enable`   | `bool`     | Enable autocommitting updated topic offsets. | `true`  | no       |
| `interval` | `duration` | How frequently to autocommit.                | `"1s"`  | no       |

### message\_marking block

The `message_marking` block configures when Kafka messages are marked as read.

The following arguments are supported:

Expand table

| Name                   | Type   | Description                                                        | Default | Required |
|------------------------|--------|--------------------------------------------------------------------|---------|----------|
| `after_execution`      | `bool` | Mark messages after forwarding telemetry data to other components. | `false` | no       |
| `include_unsuccessful` | `bool` | Whether failed forwards should be marked as read.                  | `false` | no       |

By default, a Kafka message is marked as read immediately after it is retrieved from the Kafka broker. If the `after_execution` argument is true, messages are only read after the telemetry data is forwarded to components specified in [the `output` block](#output-block).

When `after_execution` is true, messages are only marked as read when they are decoded successfully and components where the data was forwarded did not return an error. If the `include_unsuccessful` argument is true, messages are marked as read even if decoding or forwarding failed. Setting `include_unsuccessful` has no effect if `after_execution` is `false`.

> **WARNING**: Setting `after_execution` to `true` and `include_unsuccessful` to `false` can block the entire Kafka partition if message processing returns a permanent error, such as failing to decode.

### header\_extraction block

The `header_extraction` block configures how to extract headers from Kafka records.

The following arguments are supported:

Expand table

| Name              | Type           | Description                                             | Default | Required |
|-------------------|----------------|---------------------------------------------------------|---------|----------|
| `extract_headers` | `bool`         | Enables attaching header fields to resource attributes. | `false` | no       |
| `headers`         | `list(string)` | A list of headers to extract from the Kafka record.     | `[]`    | no       |

Regular expressions are not allowed in the `headers` argument. Only exact matching will be performed.

### debug\_metrics block

The `debug_metrics` block configures the metrics that this component generates to monitor its state.

The following arguments are supported:

Expand table

| Name                               | Type      | Description                                          | Default | Required |
|------------------------------------|-----------|------------------------------------------------------|---------|----------|
| `disable_high_cardinality_metrics` | `boolean` | Whether to disable certain high cardinality metrics. | `true`  | no       |

`disable_high_cardinality_metrics` is the Grafana Agent equivalent to the `telemetry.disableHighCardinalityMetrics` feature gate in the OpenTelemetry Collector. It removes attributes that could cause high cardinality metrics. For example, attributes with IP addresses and port numbers in metrics about HTTP and gRPC connections are removed.

### output block

The `output` block configures a set of components to forward resulting telemetry data to.

The following arguments are supported:

Expand table

| Name      | Type                     | Description                           | Default | Required |
|-----------|--------------------------|---------------------------------------|---------|----------|
| `logs`    | `list(otelcol.Consumer)` | List of consumers to send logs to.    | `[]`    | no       |
| `metrics` | `list(otelcol.Consumer)` | List of consumers to send metrics to. | `[]`    | no       |
| `traces`  | `list(otelcol.Consumer)` | List of consumers to send traces to.  | `[]`    | no       |

You must specify the `output` block, but all its arguments are optional. By default, telemetry data is dropped. Configure the `metrics`, `logs`, and `traces` arguments accordingly to send telemetry data to other components.

## Exported fields

`otelcol.receiver.kafka` does not export any fields.

## Component health

`otelcol.receiver.kafka` is only reported as unhealthy if given an invalid configuration.

## Debug information

`otelcol.receiver.kafka` does not expose any component-specific debug information.

## Example

This example forwards read telemetry data through a batch processor before finally sending it to an OTLP-capable endpoint:

Alloy ![Copy code to clipboard](/media/images/icons/icon-copy-small-2.svg) Copy

```alloy
otelcol.receiver.kafka "default" {
  brokers          = ["localhost:9092"]
  protocol_version = "2.0.0"

  output {
    metrics = [otelcol.processor.batch.default.input]
    logs    = [otelcol.processor.batch.default.input]
    traces  = [otelcol.processor.batch.default.input]
  }
}

otelcol.processor.batch "default" {
  output {
    metrics = [otelcol.exporter.otlp.default.input]
    logs    = [otelcol.exporter.otlp.default.input]
    traces  = [otelcol.exporter.otlp.default.input]
  }
}

otelcol.exporter.otlp "default" {
  client {
    endpoint = env("OTLP_ENDPOINT")
  }
}
```

## Compatible components

`otelcol.receiver.kafka` can accept arguments from the following components:

- Components that export [OpenTelemetry `otelcol.Consumer`](../../compatibility/#opentelemetry-otelcolconsumer-exporters)

> Note
> 
> Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. Refer to the linked documentation for more details.
