---
title: "loki.source.kafka | Grafana Alloy documentation"
description: "Learn about loki.source.kafka"
---

# loki.source.kafka

`loki.source.kafka` reads messages from Kafka using a consumer group and forwards them to other `loki.*` components.

The component starts a new Kafka consumer group for the given arguments and fans out incoming entries to the list of receivers in `forward_to`.

Before using `loki.source.kafka`, Kafka should have at least one producer writing events to at least one topic. Follow the steps in the [Kafka Quick Start](https://kafka.apache.org/documentation/#quickstart) to get started with Kafka.

You can specify multiple `loki.source.kafka` components by giving them different labels.

## Usage

Alloy ![Copy code to clipboard](/media/images/icons/icon-copy-small-2.svg) Copy

```alloy
loki.source.kafka "<LABEL>" {
  brokers    = "<BROKER_LIST>"
  topics     = "<TOPIC_LIST>"
  forward_to = <RECEIVER_LIST>
}
```

## Arguments

You can use the following arguments with `loki.source.kafka`:

Expand table

| Name                     | Type                 | Description                                             | Default               | Required |
|--------------------------|----------------------|---------------------------------------------------------|-----------------------|----------|
| `brokers`                | `list(string)`       | The list of brokers to connect to Kafka.                |                       | yes      |
| `forward_to`             | `list(LogsReceiver)` | List of receivers to send log entries to.               |                       | yes      |
| `topics`                 | `list(string)`       | The list of Kafka topics to consume.                    |                       | yes      |
| `assignor`               | `string`             | The consumer group rebalancing strategy to use.         | `"range"`             | no       |
| `group_id`               | `string`             | The Kafka consumer group ID.                            | `"loki.source.kafka"` | no       |
| `labels`                 | `map(string)`        | The labels to associate with each received Kafka event. | `{}`                  | no       |
| `relabel_rules`          | `RelabelRules`       | Relabeling rules to apply on log entries.               | `{}`                  | no       |
| `use_incoming_timestamp` | `bool`               | Whether to use the timestamp received from Kafka.       | `false`               | no       |
| `version`                | `string`             | Kafka version to connect to.                            | `"2.2.1"`             | no       |

`assignor` values can be either `"range"`, `"roundrobin"`, or `"sticky"`.

If a topic starts with a ‘^’, it’s treated as a regular expression and may match multiple topics.

Labels from the `labels` argument are applied to every message that the component reads.

The `relabel_rules` field can make use of the `rules` export value from a [`loki.relabel`](../loki.relabel/) component to apply one or more relabeling rules to log entries before they’re forwarded to the list of receivers in `forward_to`.

In addition to custom labels, the following internal labels prefixed with `__` are available:

- `__meta_kafka_group_id`
- `__meta_kafka_member_id`
- `__meta_kafka_message_key`
- `__meta_kafka_message_offset`
- `__meta_kafka_partition`
- `__meta_kafka_topic`

All labels starting with `__` are removed prior to forwarding log entries. To keep these labels, relabel them using a [`loki.relabel`](../loki.relabel/) component and pass its `rules` export to the `relabel_rules` argument.

## Blocks

You can use the following blocks with `loki.source.kafka`:

No valid configuration blocks found.

### `authentication`

The `authentication` block defines the authentication method when communicating with the Kafka event brokers.

Expand table

| Name   | Type     | Description             | Default  | Required |
|--------|----------|-------------------------|----------|----------|
| `type` | `string` | Type of authentication. | `"none"` | no       |

`type` supports the values `"none"`, `"ssl"`, and `"sasl"`. If `"ssl"` is used, you must set the `tls_config` block. If `"sasl"` is used, you must set the `sasl_config` block.

### `sasl_config`

The `sasl_config` block defines the listen address and port where the listener expects Kafka messages to be sent to.

Expand table

| Name        | Type     | Description                                                                   | Default    | Required |
|-------------|----------|-------------------------------------------------------------------------------|------------|----------|
| `mechanism` | `string` | Specifies the SASL mechanism the client uses to authenticate with the broker. | `"PLAIN""` | no       |
| `password`  | `secret` | The password to use for SASL authentication.                                  | `""`       | no       |
| `use_tls`   | `bool`   | If true, SASL authentication is executed over TLS.                            | `false`    | no       |
| `user`      | `string` | The user name to use for SASL authentication.                                 | `""`       | no       |

### `oauth_config`

The `oauth_config` is required when the SASL mechanism is set to `OAUTHBEARER`.

Expand table

| Name             | Type           | Description                                                                | Default | Required |
|------------------|----------------|----------------------------------------------------------------------------|---------|----------|
| `scopes`         | `list(string)` | The scopes to set in the access token                                      | `[]`    | yes      |
| `token_provider` | `string`       | The OAuth 2.0 provider to be used. The only supported provider is `azure`. | `""`    | yes      |

### `tls_config`

Expand table

| Name                   | Type     | Description                                              | Default | Required |
|------------------------|----------|----------------------------------------------------------|---------|----------|
| `ca_pem`               | `string` | CA PEM-encoded text to validate the server with.         |         | no       |
| `ca_file`              | `string` | CA certificate to validate the server with.              |         | no       |
| `cert_pem`             | `string` | Certificate PEM-encoded text for client authentication.  |         | no       |
| `cert_file`            | `string` | Certificate file for client authentication.              |         | no       |
| `insecure_skip_verify` | `bool`   | Disables validation of the server certificate.           |         | no       |
| `key_file`             | `string` | Key file for client authentication.                      |         | no       |
| `key_pem`              | `secret` | Key PEM-encoded text for client authentication.          |         | no       |
| `min_version`          | `string` | Minimum acceptable TLS version.                          |         | no       |
| `server_name`          | `string` | ServerName extension to indicate the name of the server. |         | no       |

The following pairs of arguments are mutually exclusive and can’t both be set simultaneously:

- `ca_pem` and `ca_file`
- `cert_pem` and `cert_file`
- `key_pem` and `key_file`

When configuring client authentication, both the client certificate (using `cert_pem` or `cert_file`) and the client key (using `key_pem` or `key_file`) must be provided.

When `min_version` isn’t provided, the minimum acceptable TLS version is inherited from Go’s default minimum version, TLS 1.2. If `min_version` is provided, it must be set to one of the following strings:

- `"TLS10"` (TLS 1.0)
- `"TLS11"` (TLS 1.1)
- `"TLS12"` (TLS 1.2)
- `"TLS13"` (TLS 1.3)

## Exported fields

`loki.source.kafka` doesn’t export any fields.

## Component health

`loki.source.kafka` is only reported as unhealthy if given an invalid configuration.

## Debug information

`loki.source.kafka` doesn’t expose additional debug info.

## Example

This example consumes Kafka events from the specified brokers and topics then forwards them to a `loki.write` component using the Kafka timestamp.

Alloy ![Copy code to clipboard](/media/images/icons/icon-copy-small-2.svg) Copy

```alloy
loki.source.kafka "local" {
  brokers                = ["localhost:9092"]
  topics                 = ["quickstart-events"]
  labels                 = {component = "loki.source.kafka"}
  forward_to             = [loki.write.local.receiver]
  use_incoming_timestamp = true
  relabel_rules          = loki.relabel.kafka.rules
}

loki.relabel "kafka" {
  forward_to      = [loki.write.local.receiver]

  rule {
    source_labels = ["__meta_kafka_topic"]
    target_label  = "topic"
  }
}

loki.write "local" {
  endpoint {
    url = "loki:3100/api/v1/push"
  }
}
```

## Compatible components

`loki.source.kafka` can accept arguments from the following components:

- Components that export [Loki `LogsReceiver`](../../../compatibility/#loki-logsreceiver-exporters)

> Note
> 
> Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. Refer to the linked documentation for more details.
