---
title: "otelcol.receiver.kafka | Grafana Alloy documentation"
description: "Learn about otelcol.receiver.kafka"
---

# `otelcol.receiver.kafka`

`otelcol.receiver.kafka` accepts telemetry data from a Kafka broker and forwards it to other `otelcol.*` components.

> Note
> 
> `otelcol.receiver.kafka` is a wrapper over the upstream OpenTelemetry Collector [`kafka`](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.147.0/receiver/kafkareceiver) receiver. Bug reports or feature requests will be redirected to the upstream repository, if necessary.

You can specify multiple `otelcol.receiver.kafka` components by giving them different labels.

## Usage

Alloy ![Copy code to clipboard](/media/images/icons/icon-copy-small-2.svg) Copy

```alloy
otelcol.receiver.kafka "<LABEL>" {
  brokers          = ["<BROKER_ADDR>"]
  protocol_version = "<PROTOCOL_VERSION>"

  output {
    metrics = [...]
    logs    = [...]
    traces  = [...]
  }
}
```

## Arguments

You can use the following arguments with `otelcol.receiver.kafka`:

Expand table

| Name                                       | Type           | Description                                                                                                           | Default            | Required |
|--------------------------------------------|----------------|-----------------------------------------------------------------------------------------------------------------------|--------------------|----------|
| `brokers`                                  | `list(string)` | Kafka brokers to connect to.                                                                                          | `[]`               | yes      |
| `protocol_version`                         | `string`       | Kafka protocol version to use.                                                                                        | `""`               | yes      |
| `client_id`                                | `string`       | Consumer client ID to use.                                                                                            | `"otel-collector"` | no       |
| `conn_idle_timeout`                        | `duration`     | Time after which idle connections are not reused and may be closed.                                                   | `"9m"`             | no       |
| `encoding`                                 | `string`       | (Deprecated) Encoding of payload read from Kafka.                                                                     | `"otlp_proto"`     | no       |
| `group_id`                                 | `string`       | Consumer group to consume messages from.                                                                              | `"otel-collector"` | no       |
| `group_instance_id`                        | `string`       | A unique identifier for the consumer instance within a consumer group.                                                | `""`               | no       |
| `group_rebalance_strategy`                 | `string`       | The strategy used to assign partitions to consumers within a consumer group.                                          | `"range"`          | no       |
| `heartbeat_interval`                       | `duration`     | The expected time between heartbeats to the consumer coordinator when using Kafka group management.                   | `"3s"`             | no       |
| `initial_offset`                           | `string`       | Initial offset to use if no offset was previously committed.                                                          | `"latest"`         | no       |
| `max_fetch_size`                           | `int`          | The maximum number of message bytes to fetch in a request.                                                            | `1048576`          | no       |
| `max_fetch_wait`                           | `duration`     | The maximum amount of time the broker should wait for `min_fetch_size` bytes to be available before returning anyway. | `"250ms"`          | no       |
| `max_partition_fetch_size`                 | `int`          | The default number of message bytes to fetch per partition in a request.                                              | `1048576`          | no       |
| `min_fetch_size`                           | `int`          | The minimum number of message bytes to fetch in a request.                                                            | `1`                | no       |
| `rack_id`                                  | `string`       | The rack identifier for this client. Used for rack-aware replica selection when supported by the brokers.             | `""`               | no       |
| `resolve_canonical_bootstrap_servers_only` | `bool`         | Whether to resolve then reverse-lookup broker IPs during startup.                                                     | `false`            | no       |
| `session_timeout`                          | `duration`     | The request timeout for detecting client failures when using Kafka group management.                                  | `"10s"`            | no       |
| `use_leader_epoch`                         | `bool`         | Whether to use leader epoch for log truncation detection (KIP-320).                                                   | `true`             | no       |

For `max_fetch_size`, the value `0` means no limit.

For `max_partition_fetch_size`, this setting controls the maximum bytes to fetch per partition. If a single record batch is larger than this value, the broker will still return it to ensure the consumer can make progress. This setting only applies when using the franz-go client.

The `rack_id` setting enables rack-aware replica selection. When configured and brokers support a rack-aware replica selector, the client will prefer fetching from the closest replica.

The `use_leader_epoch` setting is experimental and controls whether the consumer uses leader epochs (KIP-320) for detecting log truncation. When enabled, the consumer uses the leader epoch returned by brokers to detect log truncation. Setting this to `false` clears the leader epoch from fetch offsets, disabling KIP-320. Disabling can improve compatibility with brokers that don’t fully support leader epochs (for example, Azure Event Hubs), but you lose automatic log-truncation safety.

`initial_offset` must be either `"latest"` or `"earliest"`.

The `group_rebalance_strategy` argument determines how Kafka distributes topic partitions among the consumers in the group during rebalances. Supported strategies are:

- `range`: This strategy assigns partitions to consumers based on a range. It aims to distribute partitions evenly across consumers, but it can lead to uneven distribution if the number of partitions is not a multiple of the number of consumers. For more information, refer to the Kafka RangeAssignor documentation, see [RangeAssignor](https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html).
- `roundrobin`: This strategy assigns partitions to consumers in a round-robin fashion. It ensures a more even distribution of partitions across consumers, especially when the number of partitions is not a multiple of the number of consumers. For more information, refer to the Kafka RoundRobinAssignor documentation, see [RoundRobinAssignor](https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html).
- `sticky`: This strategy aims to maintain the same partition assignments during rebalances as much as possible. It minimizes the number of partition movements, which can be beneficial for stateful consumers. For more information, refer to the Kafka StickyAssignor documentation, see [StickyAssignor](https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html).
- `cooperative-sticky`: This strategy uses incremental cooperative rebalancing to reduce partition movement during rebalances. For more information, refer to the Kafka CooperativeStickyAssignor documentation, see [CooperativeStickyAssignor](https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html).

Using a `group_instance_id` is useful for stateful consumers or when you need to ensure that a specific consumer instance is always assigned the same set of partitions.

- If `group_instance_id` is set to a non-empty string, the consumer is treated as a static member of the group. This means that the consumer will maintain its partition assignments across restarts and rebalances, as long as it rejoins the group with the same `group_instance_id`.
- If `group_instance_id` is set to an empty string (or not set), the consumer is treated as a dynamic member. In this case, the consumer’s partition assignments may change during rebalances.

## Blocks

You can use the following blocks with `otelcol.receiver.kafka`:

No valid configuration blocks found.

### `output`

Required

The `output` block configures a set of components to forward resulting telemetry data to.

The following arguments are supported:

Expand table

| Name      | Type                     | Description                           | Default | Required |
|-----------|--------------------------|---------------------------------------|---------|----------|
| `logs`    | `list(otelcol.Consumer)` | List of consumers to send logs to.    | `[]`    | no       |
| `metrics` | `list(otelcol.Consumer)` | List of consumers to send metrics to. | `[]`    | no       |
| `traces`  | `list(otelcol.Consumer)` | List of consumers to send traces to.  | `[]`    | no       |

You must specify the `output` block, but all its arguments are optional. By default, telemetry data is dropped. Configure the `metrics`, `logs`, and `traces` arguments accordingly to send telemetry data to other components.

### `logs`

The `logs` block configures how to receive logs from Kafka brokers.

Expand table

| Name       | Type           | Description                                                                                    | Default         | Required |
|------------|----------------|------------------------------------------------------------------------------------------------|-----------------|----------|
| `encoding` | `string`       | The encoding for logs. Refer to [Supported encodings](#supported-encodings).                   | `"otlp_proto"`  | no       |
| `topic`    | `string`       | (Deprecated: use `topics` instead) The name of the Kafka topic on which logs will be received. | `""`            | no       |
| `topics`   | `list(string)` | The names of the Kafka topics on which logs will be received.                                  | `["otlp_logs"]` | no       |

> Warning
> 
> The `topic` argument is deprecated in favor of `topics`. If `topic` is set, it will take precedence over default value of `topics`.

### `metrics`

The `metrics` block configures how to receive metrics from Kafka brokers.

Expand table

| Name       | Type           | Description                                                                                       | Default            | Required |
|------------|----------------|---------------------------------------------------------------------------------------------------|--------------------|----------|
| `encoding` | `string`       | The encoding for logs. Refer to [Supported encodings](#supported-encodings).                      | `"otlp_proto"`     | no       |
| `topic`    | `string`       | (Deprecated: use `topics` instead) The name of the Kafka topic on which metrics will be received. | `""`               | no       |
| `topics`   | `list(string)` | The names of the Kafka topics on which metrics will be received.                                  | `["otlp_metrics"]` | no       |

> Warning
> 
> The `topic` argument is deprecated in favor of `topics`. If `topic` is set, it will take precedence over default value of `topics`.

### `traces`

The `traces` block configures how to receive traces from Kafka brokers.

Expand table

| Name       | Type           | Description                                                                                      | Default          | Required |
|------------|----------------|--------------------------------------------------------------------------------------------------|------------------|----------|
| `encoding` | `string`       | The encoding for logs. Refer to [Supported encodings](#supported-encodings).                     | `"otlp_proto"`   | no       |
| `topic`    | `string`       | (Deprecated: use `topics` instead) The name of the Kafka topic on which traces will be received. | `""`             | no       |
| `topics`   | `list(string)` | The names of the Kafka topics on which traces will be received.                                  | `["otlp_spans"]` | no       |

> Warning
> 
> The `topic` argument is deprecated in favor of `topics`. If `topic` is set, it will take precedence over default value of `topics`.

### `authentication`

The `authentication` block holds the definition of different authentication mechanisms to use when connecting to Kafka brokers. It doesn’t support any arguments and is configured fully through inner blocks.

### `kerberos`

The `kerberos` block configures Kerberos authentication against the Kafka broker.

The following arguments are supported:

Expand table

| Name                       | Type     | Description                                                     | Default | Required |
|----------------------------|----------|-----------------------------------------------------------------|---------|----------|
| `config_file`              | `string` | Path to Kerberos location, for example, `/etc/krb5.conf`.       |         | no       |
| `disable_fast_negotiation` | `bool`   | Disable PA-FX-FAST negotiation.                                 | `false` | no       |
| `keytab_file`              | `string` | Path to keytab file, for example, `/etc/security/kafka.keytab`. |         | no       |
| `password`                 | `secret` | Kerberos password to authenticate with.                         |         | no       |
| `realm`                    | `string` | Kerberos realm.                                                 |         | no       |
| `service_name`             | `string` | Kerberos service name.                                          |         | no       |
| `use_keytab`               | `string` | Enables using keytab instead of password.                       |         | no       |
| `username`                 | `string` | Kerberos username to authenticate as.                           |         | yes      |

When `use_keytab` is `false`, the `password` argument is required. When `use_keytab` is `true`, the file pointed to by the `keytab_file` argument is used for authentication instead. At most one of `password` or `keytab_file` must be provided.

`disable_fast_negotiation` is useful for Kerberos implementations which don’t support PA-FX-FAST (Pre-Authentication Framework - Fast) negotiation.

### `plaintext`

> Caution
> 
> The `plaintext` block has been deprecated. Use `sasl` with `mechanism` set to `PLAIN` instead.

The `plaintext` block configures plain text authentication against Kafka brokers.

The following arguments are supported:

Expand table

| Name       | Type     | Description                                    | Default | Required |
|------------|----------|------------------------------------------------|---------|----------|
| `password` | `secret` | Password to use for plain text authentication. |         | yes      |
| `username` | `string` | Username to use for plain text authentication. |         | yes      |

### `sasl`

The `sasl` block configures SASL authentication against Kafka brokers.

The following arguments are supported:

Expand table

| Name        | Type     | Description                                              | Default | Required |
|-------------|----------|----------------------------------------------------------|---------|----------|
| `mechanism` | `string` | SASL mechanism to use when authenticating.               |         | yes      |
| `password`  | `secret` | Password to use for SASL authentication.                 |         | yes      |
| `username`  | `string` | Username to use for SASL authentication.                 |         | yes      |
| `version`   | `number` | Version of the SASL Protocol to use when authenticating. | `0`     | no       |

You can set the `mechanism` argument to one of the following strings:

- `"PLAIN"`
- `"SCRAM-SHA-256"`
- `"SCRAM-SHA-512"`
- `"AWS_MSK_IAM_OAUTHBEARER"`

When `mechanism` is set to `"AWS_MSK_IAM_OAUTHBEARER"`, the `aws_msk` child block must also be provided.

You can set the `version` argument to either `0` or `1`.

### `aws_msk`

The `aws_msk` block configures extra parameters for SASL authentication when using the `AWS_MSK_IAM_OAUTHBEARER` mechanisms.

The following arguments are supported:

Expand table

| Name     | Type     | Description                             | Default | Required |
|----------|----------|-----------------------------------------|---------|----------|
| `region` | `string` | AWS region the MSK cluster is based in. |         | yes      |

### `tls`

The `tls` block configures TLS settings used for connecting to the Kafka brokers. If the `tls` block isn’t provided, TLS won’t be used for communication.

The following arguments are supported:

Expand table

| Name                           | Type           | Description                                                                                  | Default     | Required |
|--------------------------------|----------------|----------------------------------------------------------------------------------------------|-------------|----------|
| `ca_file`                      | `string`       | Path to the CA file.                                                                         |             | no       |
| `ca_pem`                       | `string`       | CA PEM-encoded text to validate the server with.                                             |             | no       |
| `cert_file`                    | `string`       | Path to the TLS certificate.                                                                 |             | no       |
| `cert_pem`                     | `string`       | Certificate PEM-encoded text for client authentication.                                      |             | no       |
| `cipher_suites`                | `list(string)` | A list of TLS cipher suites that the TLS transport can use.                                  | `[]`        | no       |
| `curve_preferences`            | `list(string)` | Set of elliptic curves to use in a handshake.                                                | `[]`        | no       |
| `include_system_ca_certs_pool` | `boolean`      | Whether to load the system certificate authorities pool alongside the certificate authority. | `false`     | no       |
| `insecure_skip_verify`         | `boolean`      | Ignores insecure server TLS certificates.                                                    |             | no       |
| `insecure`                     | `boolean`      | Disables TLS when connecting to the configured server.                                       |             | no       |
| `key_file`                     | `string`       | Path to the TLS certificate key.                                                             |             | no       |
| `key_pem`                      | `secret`       | Key PEM-encoded text for client authentication.                                              |             | no       |
| `max_version`                  | `string`       | Maximum acceptable TLS version for connections.                                              | `"TLS 1.3"` | no       |
| `min_version`                  | `string`       | Minimum acceptable TLS version for connections.                                              | `"TLS 1.2"` | no       |
| `reload_interval`              | `duration`     | The duration after which the certificate is reloaded.                                        | `"0s"`      | no       |
| `server_name`                  | `string`       | Verifies the hostname of server certificates when set.                                       |             | no       |

If the server doesn’t support TLS, you must set the `insecure` argument to `true`.

To disable `tls` for connections to the server, set the `insecure` argument to `true`.

If you set `reload_interval` to `"0s"`, the certificate never reloaded.

The following pairs of arguments are mutually exclusive and can’t both be set simultaneously:

- `ca_pem` and `ca_file`
- `cert_pem` and `cert_file`
- `key_pem` and `key_file`

If `cipher_suites` is left blank, a safe default list is used. Refer to the [Go TLS documentation](https://go.dev/src/crypto/tls/cipher_suites.go) for a list of supported cipher suites.

The `curve_preferences` argument determines the set of [elliptic curves](https://go.dev/src/crypto/tls/common.go#L138) to prefer during a handshake in preference order. If not provided, a default list is used. The set of elliptic curves available are `X25519`, `P521`, `P256`, and `P384`.

### `tpm`

The `tpm` block configures retrieving the TLS `key_file` from a trusted device.

The following arguments are supported:

Expand table

| Name         | Type     | Description                                                        | Default | Required |
|--------------|----------|--------------------------------------------------------------------|---------|----------|
| `auth`       | `string` | The authorization value used to authenticate the TPM device.       | `""`    | no       |
| `enabled`    | `bool`   | Load the `tls.key_file` from TPM.                                  | `false` | no       |
| `owner_auth` | `string` | The owner authorization value used to authenticate the TPM device. | `""`    | no       |
| `path`       | `string` | Path to the TPM device or Unix domain socket.                      | `""`    | no       |

The [trusted platform module](https://trustedcomputinggroup.org/resource/trusted-platform-module-tpm-summary/) (TPM) configuration can be used for loading TLS key from TPM. Currently only TSS2 format is supported.

The `path` attribute is not supported on Windows.

In the following example, the private key `my-tss2-key.key` in TSS2 format is loaded from the TPM device `/dev/tmprm0`:

Alloy ![Copy code to clipboard](/media/images/icons/icon-copy-small-2.svg) Copy

```alloy
otelcol.example.component "<LABEL>" {
    ...
    tls {
        ...
        key_file = "my-tss2-key.key"
        tpm {
            enabled = true
            path = "/dev/tpmrm0"
        }
    }
}
```

### `autocommit`

The `autocommit` block configures how to automatically commit updated topic offsets back to the Kafka brokers.

The following arguments are supported:

Expand table

| Name       | Type       | Description                                  | Default | Required |
|------------|------------|----------------------------------------------|---------|----------|
| `enable`   | `bool`     | Enable autocommitting updated topic offsets. | `true`  | no       |
| `interval` | `duration` | How frequently to autocommit.                | `"1s"`  | no       |

### `debug_metrics`

The `debug_metrics` block configures the metrics that this component generates to monitor its state.

The following arguments are supported:

Expand table

| Name                               | Type      | Description                                          | Default | Required |
|------------------------------------|-----------|------------------------------------------------------|---------|----------|
| `disable_high_cardinality_metrics` | `boolean` | Whether to disable certain high cardinality metrics. | `true`  | no       |

`disable_high_cardinality_metrics` is the Alloy equivalent to the `telemetry.disableHighCardinalityMetrics` feature gate in the OpenTelemetry Collector. It removes attributes that could cause high cardinality metrics. For example, attributes with IP addresses and port numbers in metrics about HTTP and gRPC connections are removed.

> Note
> 
> If configured, `disable_high_cardinality_metrics` only applies to `otelcol.exporter.*` and `otelcol.receiver.*` components.

### `error_backoff`

The `error_backoff` block configures how failed requests to Kafka are retried.

The following arguments are supported:

Expand table

| Name                   | Type       | Description                                            | Default | Required |
|------------------------|------------|--------------------------------------------------------|---------|----------|
| `enabled`              | `boolean`  | Enables retrying failed requests.                      | `false` | no       |
| `initial_interval`     | `duration` | Initial time to wait before retrying a failed request. | `"0s"`  | no       |
| `max_elapsed_time`     | `duration` | Maximum time to wait before discarding a failed batch. | `"0s"`  | no       |
| `max_interval`         | `duration` | Maximum time to wait between retries.                  | `"0s"`  | no       |
| `multiplier`           | `number`   | Factor to grow wait time before retrying.              | `0`     | no       |
| `randomization_factor` | `number`   | Factor to randomize wait time before retrying.         | `0`     | no       |

When `enabled` is `true`, failed batches are retried after a given interval. The `initial_interval` argument specifies how long to wait before the first retry attempt. If requests continue to fail, the time to wait before retrying increases by the factor specified by the `multiplier` argument, which must be greater than `1.0`. The `max_interval` argument specifies the upper bound of how long to wait between retries.

The `randomization_factor` argument is useful for adding jitter between retrying Alloy instances. If `randomization_factor` is greater than `0`, the wait time before retries is multiplied by a random factor in the range `[ I - randomization_factor * I, I + randomization_factor * I]`, where `I` is the current interval.

If a batch hasn’t been sent successfully, it’s discarded after the time specified by `max_elapsed_time` elapses. If `max_elapsed_time` is set to `"0s"`, failed requests are retried forever until they succeed.

### `header_extraction`

The `header_extraction` block configures how to extract headers from Kafka records.

The following arguments are supported:

Expand table

| Name              | Type           | Description                                             | Default | Required |
|-------------------|----------------|---------------------------------------------------------|---------|----------|
| `extract_headers` | `bool`         | Enables attaching header fields to resource attributes. | `false` | no       |
| `headers`         | `list(string)` | A list of headers to extract from the Kafka record.     | `[]`    | no       |

Regular expressions aren’t allowed in the `headers` argument. Only exact matching will be performed.

### `message_marking`

The `message_marking` block configures when Kafka messages are marked as read.

The following arguments are supported:

Expand table

| Name                   | Type   | Description                                                        | Default | Required |
|------------------------|--------|--------------------------------------------------------------------|---------|----------|
| `after_execution`      | `bool` | Mark messages after forwarding telemetry data to other components. | `false` | no       |
| `include_unsuccessful` | `bool` | Whether failed forwards should be marked as read.                  | `false` | no       |

By default, a Kafka message is marked as read immediately after it’s retrieved from the Kafka broker. If the `after_execution` argument is true, messages are only read after the telemetry data is forwarded to components specified in \[the `output` block]\[output].

When `after_execution` is true, messages are only marked as read when they’re decoded successfully and components where the data was forwarded didn’t return an error. If the `include_unsuccessful` argument is true, messages are marked as read even if decoding or forwarding failed. Setting `include_unsuccessful` has no effect if `after_execution` is `false`.

> Warning
> 
> Setting `after_execution` to `true` and `include_unsuccessful` to `false` can block the entire Kafka partition if message processing returns a permanent error, such as failing to decode.

### `metadata`

The `metadata` block configures how to retrieve and store metadata from the Kafka broker.

The following arguments are supported:

Expand table

| Name               | Type       | Description                                           | Default | Required |
|--------------------|------------|-------------------------------------------------------|---------|----------|
| `full`             | `bool`     | Whether to maintain a full set of metadata.           | `true`  | no       |
| `refresh_interval` | `duration` | The frequency at which cluster metadata is refreshed. | `"10m"` | no       |

When `full` is set to `false`, the client does not make the initial request to broker at the startup.

Retrieving metadata may fail if the Kafka broker is starting up at the same time as the Alloy component. The `retry` child block can be provided to customize retry behavior.

### `retry`

The `retry` block configures how to retry retrieving metadata when retrieval fails.

The following arguments are supported:

Expand table

| Name          | Type       | Description                                      | Default   | Required |
|---------------|------------|--------------------------------------------------|-----------|----------|
| `backoff`     | `duration` | Time to wait between retries.                    | `"250ms"` | no       |
| `max_retries` | `number`   | How many times to reattempt retrieving metadata. | `3`       | no       |

## Exported fields

`otelcol.receiver.kafka` doesn’t export any fields.

## Supported encodings

`otelcol.receiver.kafka` supports encoding extensions, as well as the following built-in encodings.

Available for all signals:

- `otlp_proto`: the payload is decoded as OTLP Protobuf
- `otlp_json`: the payload is decoded as OTLP JSON

Available only for traces:

- `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`.
- `jaeger_json`: the payload is deserialized to a single Jaeger JSON Span using `jsonpb`.
- `zipkin_proto`: the payload is deserialized into a list of Zipkin proto spans.
- `zipkin_json`: the payload is deserialized into a list of Zipkin V2 JSON spans.
- `zipkin_thrift`: the payload is deserialized into a list of Zipkin Thrift spans.

Available only for logs:

- `raw`: the payload’s bytes are inserted as the body of a log record.
- `text`: the payload are decoded as text and inserted as the body of a log record. By default, it uses UTF-8 to decode. You can use `text_<ENCODING>`, like `text_utf-8`, `text_shift_jis`, etc., to customize this behavior.
- `json`: the payload is decoded as JSON and inserted as the body of a log record.
- `azure_resource_logs`: the payload is converted from Azure Resource Logs format to OTel format.

## Message header propagation

`otelcol.receiver.kafka` will extract Kafka message headers and include them as request metadata (context). This metadata can then be used throughout the pipeline, for example to set attributes using `otelcol.processor.attributes`.

## Component health

`otelcol.receiver.kafka` is only reported as unhealthy if given an invalid configuration.

## Debug information

`otelcol.receiver.kafka` doesn’t expose any component-specific debug information.

## Example

This example forwards read telemetry data through a batch processor before finally sending it to an OTLP-capable endpoint:

Alloy ![Copy code to clipboard](/media/images/icons/icon-copy-small-2.svg) Copy

```alloy
otelcol.receiver.kafka "default" {
  brokers          = ["localhost:9092"]
  protocol_version = "2.0.0"

  output {
    metrics = [otelcol.processor.batch.default.input]
    logs    = [otelcol.processor.batch.default.input]
    traces  = [otelcol.processor.batch.default.input]
  }
}

otelcol.processor.batch "default" {
  output {
    metrics = [otelcol.exporter.otlphttp.default.input]
    logs    = [otelcol.exporter.otlphttp.default.input]
    traces  = [otelcol.exporter.otlphttp.default.input]
  }
}

otelcol.exporter.otlphttp "default" {
  client {
    endpoint = sys.env("OTLP_ENDPOINT")
  }
}
```

## Compatible components

`otelcol.receiver.kafka` can accept arguments from the following components:

- Components that export [OpenTelemetry `otelcol.Consumer`](../../../compatibility/#opentelemetry-otelcolconsumer-exporters)

> Note
> 
> Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. Refer to the linked documentation for more details.
