Menu

Caution

Grafana Alloy is the new name for our distribution of the OTel collector. Grafana Agent has been deprecated and is in Long-Term Support (LTS) through October 31, 2025. Grafana Agent will reach an End-of-Life (EOL) on November 1, 2025. Read more about why we recommend migrating to Grafana Alloy.
Open source

loki.source.kafka

loki.source.kafka reads messages from Kafka using a consumer group and forwards them to other loki.* components.

The component starts a new Kafka consumer group for the given arguments and fans out incoming entries to the list of receivers in forward_to.

Before using loki.source.kafka, Kafka should have at least one producer writing events to at least one topic. Follow the steps in the Kafka Quick Start to get started with Kafka.

Multiple loki.source.kafka components can be specified by giving them different labels.

Usage

river
loki.source.kafka "LABEL" {
  brokers    = BROKER_LIST
  topics     = TOPIC_LIST
  forward_to = RECEIVER_LIST
}

Arguments

loki.source.kafka supports the following arguments:

NameTypeDescriptionDefaultRequired
brokerslist(string)The list of brokers to connect to Kafka.yes
topicslist(string)The list of Kafka topics to consume.yes
group_idstringThe Kafka consumer group id."loki.source.kafka"no
assignorstringThe consumer group rebalancing strategy to use."range"no
versionstringKafka version to connect to."2.2.1"no
use_incoming_timestampboolWhether or not to use the timestamp received from Kafka.falseno
labelsmap(string)The labels to associate with each received Kafka event.{}no
forward_tolist(LogsReceiver)List of receivers to send log entries to.yes
relabel_rulesRelabelRulesRelabeling rules to apply on log entries.{}no

assignor values can be either "range", "roundrobin", or "sticky".

Labels from the labels argument are applied to every message that the component reads.

The relabel_rules field can make use of the rules export value from a loki.relabel component to apply one or more relabeling rules to log entries before they’re forwarded to the list of receivers in forward_to.

In addition to custom labels, the following internal labels prefixed with __ are available:

  • __meta_kafka_message_key
  • __meta_kafka_message_offset
  • __meta_kafka_topic
  • __meta_kafka_partition
  • __meta_kafka_member_id
  • __meta_kafka_group_id

All labels starting with __ are removed prior to forwarding log entries. To keep these labels, relabel them using a loki.relabel component and pass its rules export to the relabel_rules argument.

Blocks

The following blocks are supported inside the definition of loki.source.kafka:

HierarchyNameDescriptionRequired
authenticationauthenticationOptional authentication configuration with Kafka brokers.no
authentication > tls_configtls_configOptional authentication configuration with Kafka brokers.no
authentication > sasl_configsasl_configOptional authentication configuration with Kafka brokers.no
authentication > sasl_config > tls_configtls_configOptional authentication configuration with Kafka brokers.no
authentication > sasl_config > oauth_configoauth_configOptional authentication configuration with Kafka brokers.no

authentication block

The authentication block defines the authentication method when communicating with the Kafka event brokers.

NameTypeDescriptionDefaultRequired
typestringType of authentication."none"no

type supports the values "none", "ssl", and "sasl". If "ssl" is used, you must set the tls_config block. If "sasl" is used, you must set the sasl_config block.

tls_config block

NameTypeDescriptionDefaultRequired
ca_pemstringCA PEM-encoded text to validate the server with.no
ca_filestringCA certificate to validate the server with.no
cert_pemstringCertificate PEM-encoded text for client authentication.no
cert_filestringCertificate file for client authentication.no
insecure_skip_verifyboolDisables validation of the server certificate.no
key_filestringKey file for client authentication.no
key_pemsecretKey PEM-encoded text for client authentication.no
min_versionstringMinimum acceptable TLS version.no
server_namestringServerName extension to indicate the name of the server.no

The following pairs of arguments are mutually exclusive and can’t both be set simultaneously:

  • ca_pem and ca_file
  • cert_pem and cert_file
  • key_pem and key_file

When configuring client authentication, both the client certificate (using cert_pem or cert_file) and the client key (using key_pem or key_file) must be provided.

When min_version is not provided, the minimum acceptable TLS version is inherited from Go’s default minimum version, TLS 1.2. If min_version is provided, it must be set to one of the following strings:

  • "TLS10" (TLS 1.0)
  • "TLS11" (TLS 1.1)
  • "TLS12" (TLS 1.2)
  • "TLS13" (TLS 1.3)

sasl_config block

The sasl_config block defines the listen address and port where the listener expects Kafka messages to be sent to.

NameTypeDescriptionDefaultRequired
mechanismstringSpecifies the SASL mechanism the client uses to authenticate with the broker."PLAIN""no
userstringThe user name to use for SASL authentication.""no
passwordsecretThe password to use for SASL authentication.""no
use_tlsboolIf true, SASL authentication is executed over TLS.falseno

oauth_config block

The oauth_config is required when the SASL mechanism is set to OAUTHBEARER.

NameTypeDescriptionDefaultRequired
token_providerstringThe OAuth provider to be used. The only supported provider is azure.""yes
scopeslist(string)The scopes to set in the access token[]yes

Exported fields

loki.source.kafka does not export any fields.

Component health

loki.source.kafka is only reported as unhealthy if given an invalid configuration.

Debug information

loki.source.kafka does not expose additional debug info.

Example

This example consumes Kafka events from the specified brokers and topics then forwards them to a loki.write component using the Kafka timestamp.

river
loki.source.kafka "local" {
  brokers                = ["localhost:9092"]
  topics                 = ["quickstart-events"]
  labels                 = {component = "loki.source.kafka"}
  forward_to             = [loki.relabel.kafka.receiver]
  use_incoming_timestamp = true
  relabel_rules          = loki.relabel.kafka.rules
}

loki.relabel "kafka" {
  forward_to      = [loki.write.local.receiver]

  rule {
    source_labels = ["__meta_kafka_topic"]
    target_label  = "topic"
  }
}

loki.write "local" {
  endpoint {
    url = "loki:3100/api/v1/push"
  }
}

Compatible components

loki.source.kafka can accept arguments from the following components:

Note

Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. Refer to the linked documentation for more details.