Skip to content

Commit af5b282

Browse files
committed
kafka: added kafka source, updated kafka dest
Signed-off-by: Hofi <[email protected]>
1 parent 5514976 commit af5b282

File tree

11 files changed

+331
-79
lines changed

11 files changed

+331
-79
lines changed

_data/link_aliases.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,12 @@ adm-src-program:
117117
adm-dest-program:
118118
aliases: [ "program() destination" ]
119119

120+
adm-src-kafka:
121+
aliases: [ "kafka() source" ]
122+
123+
adm-dest-kafkac:
124+
aliases: [ "kafka() destination" ]
125+
120126
adm-src-mqtt:
121127
aliases: [ "mqtt() source" ]
122128

_data/navigation.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,11 @@ admin-guide-nav:
165165
- title: "Jellyfin"
166166
url: /admin-guide/060_Sources/035_Jellyfin/README
167167
subnav:
168+
- title: "kafka"
169+
url: /admin-guide/060_Sources/038_Kafka/README
170+
subnav:
171+
- title: "Options of the kafka() source"
172+
url: /admin-guide/060_Sources/038_Kafka/001_Kafka_options
168173
- title: "kubernetes"
169174
url: /admin-guide/060_Sources/040_Kubernetes/README
170175
subnav:
@@ -1159,7 +1164,7 @@ dev-guide-nav:
11591164
subnav:
11601165
- title: "file() Destination Driver"
11611166
url: /dev-guide/chapter_4/section_2/macos-testing-status/affile/file-destination-driver
1162-
- title: "file() Source Driver (DEPRECATED)"
1167+
- title: "file() Source Driver"
11631168
url: /dev-guide/chapter_4/section_2/macos-testing-status/affile/file-source-driver
11641169
- title: "pipe() Destination Driver"
11651170
url: /dev-guide/chapter_4/section_2/macos-testing-status/affile/pipe-destination-driver
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
## bootstrap-servers()
2+
3+
| Type: | string |
4+
|Default: | N/A |
5+
|Mandatory:| yes |
6+
7+
*Description:* Specifies the hostname or IP address of the Kafka server.
8+
When specifying an IP address, IPv4 (for example, 192.168.0.1) or IPv6
9+
(for example, \[::1\]) can be used as well. Use a colon (**:**) after
10+
the address to specify the port number of the server. When specifying
11+
multiple addresses, use a comma to separate the addresses, for example:
12+
13+
``` config
14+
bootstrap-servers(
15+
"127.0.0.1:2525,remote-server-hostname:6464"
16+
)
17+
```
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
## config()
2+
3+
| Type: | key-value pairs |
4+
|Default:| N/A |
5+
6+
*Description:* You can use this option to set the properties of the kafka {{ include.kafka_type }}.
7+
8+
The {{ site.product.short_name }} kafka {{ include.type }} supports all properties of the official Kafka {{ include.kafka_type }}. For details, see the librdkafka documentation.
9+
10+
The syntax of the config() option is the following:
11+
12+
``` config
13+
config(
14+
“key1” => “value1”
15+
“key2” => “value2”
16+
)
17+
```
18+
19+
**NOTE:** The following kafka {{ include.kafka_type }} config options are protected and cannot be overriden in the `config()` list: {{ include.protected_options }}
20+
{: .notice--info}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
## disable-bookmarks()
2+
3+
| Type: | boolean |
4+
| Default: | no |
5+
6+
*Description:* This option prevents {{ site.product.short_name }} from storing a bookmark (such as position or offset) in its persist file for the last processed message.
7+
8+
**NOTE:** This will not prevent usage of an already presented bookmark entry, for ignoring those bookmark entries specify `ignore-saved-bookmarks(yes)` as well.
9+
{: .notice--info}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
## kafka-logging()
2+
3+
| Accepted values: | disabled \| trace \| kafka |
4+
| Default: | disabled |
5+
6+
*Description:* This option allows you to control how internal Kafka logs appear in the {{ site.product.short_name }} logs.
7+
8+
- disabled: Disables internal Kafka log messages in the {{ site.product.short_name }} logs.
9+
- trace: Logs all internal Kafka messages at the `trace` level of {{ site.product.short_name }}.
10+
- kafka: Logs internal Kafka messages using log levels mapped to those of {{ site.product.short_name }}.
11+
12+
**NOTE:** The internal Kafka logging level itself can be configured using the config() Kafka options. For details, refer to the librdkafka documentation.
13+
{: .notice--info}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
- One **main worker** that fetches messages from the Kafka broker and stores them into an internal queue.
2+
- A second worker that processes the queued messages and forwards them to the configured destination.
3+
4+
Although the source can operate using a single worker, this configuration typically results in a significant performance penalty compared to the default multi-worker setup.
5+
6+
Increasing the number of workers beyond two may further improve throughput, especially when the main worker can fetch messages at high speed. In such cases, you may also need to fine-tune related options such as separated-worker-queues(), log-fetch-limit(), log-fetch-delay(), log-fetch-retry-delay(), log-fetch-queue-full-delay().
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
---
2+
title: "Options of the kafka() source"
3+
id: adm-src-kafka-opt
4+
description: >-
5+
This section describes the options of the kafka() source in {{ site.product.short_name }}.
6+
---
7+
8+
The kafka() source of {{ site.product.short_name }} can directly consume log messages from the Apache Kafka message bus. The source has the following options.
9+
10+
## Required options
11+
12+
To use the kafka() source, the following two options are required: bootstrap-servers() and topic(). Both must appear at the beginning of your {{ site.product.short_name }} configuration.
13+
14+
{% include doc/admin-guide/options/bootstrap-servers.md %}
15+
16+
{% include doc/admin-guide/options/config-kafka.md kafka_type='consumer' type='source' protected_options='`bootstrap.servers` `metadata.broker.list` `enable.auto.offset.store` `auto.offset.reset` `enable.auto.commit` `auto.commit.enable`' %}
17+
18+
{% include doc/admin-guide/options/disable-bookmarks.md %}
19+
See Bookmarking in the kafka() source for more details.
20+
21+
{% include doc/admin-guide/options/hook.md %}
22+
23+
{% include doc/admin-guide/options/ignore-saved-bookmarks.md %} (depending on the setting of the read-old-records() option.\
24+
See Bookmarking in the kafka() source for more details.
25+
26+
{% include doc/admin-guide/options/kafka-logging.md %}
27+
28+
## log-fetch-limit()
29+
30+
| Type: | integer |
31+
|Default:| 10000 |
32+
33+
*Description:* Specifies the maximum number of messages the main worker will consume and queue from the Kafka broker. This effectively determines the size of the internally used Kafka message queue. If the limit is reached, the kafka() source stops fetching messages from the broker, logs the situation, and waits the amount of time specified by fetch-queue-full-delay() before attempting to fetch new data again.
34+
35+
**NOTE:** If more than 2 workers are configured and separated-worker-queues() is set to `yes`, then all processor workers share this total queue size.
36+
For example, with `workers(3)` and `fetch-limit(100000)`, the 2 processor workers (remember, the first of the configured 3 is always the main worker) will each receive their own queue, and neither queue will grow beyond 50,000 messages.
37+
{: .notice--info}
38+
39+
**NOTE:** This options worth align with the kafka config options `queued.min.messages` and `queued.max.messages.kbytes`, For details, refer to the librdkafka documentation.
40+
{: .notice--info}
41+
42+
## log-fetch-delay()
43+
44+
| Type: | integer [1 second / fetch_retry_delay * 1000000 milliseconds] |
45+
|Default:| 1000 (1 millisecond) |
46+
47+
*Description:* Specifies the time the main worker will wait between attempts to fetch new data.
48+
49+
## log-fetch-retry-delay()
50+
51+
| Type: | integer [1 second / fetch_retry_delay * 1000000 milliseconds] |
52+
|Default:| 10000 (10 milliseconds)|
53+
54+
*Description:* Specifies the time the main worker will wait before attempting to fetch new data again when the broker signals no more data is available.
55+
56+
## log-fetch-queue-full-delay()
57+
58+
| Type: | integer in milliseconds |
59+
|Default:| 1000 |
60+
61+
*Description:* When the main worker reaches the queued message limit defined by fetch-limit(), the kafka() source temporarily stops retrieving messages from the broker. It then waits for the duration specified by `fetch-queue-full-delay()` before attempting to fetch additional messages.
62+
63+
{% include doc/admin-guide/options/persist-name.md %}
64+
65+
## poll-timeout()
66+
67+
| Type: | integer in milliseconds |
68+
|Default:| 10000 |
69+
70+
*Description:* Specifies the maximum amount of time {{ site.product.short_name }} waits during a Kafka broker poll request for new messages to become available.
71+
72+
{% include doc/admin-guide/options/read-old-records.md %}\
73+
See Bookmarking in the kafka() source for more details.
74+
75+
## separated-worker-queues()
76+
77+
| Type: | yes \| no |
78+
|Default:| no |
79+
80+
*Description:* When the value of workers() is greater than 2 (meaning multiple processor threads are used to handle queued messages), and `separated-worker-queues()` is set to `yes`, the main worker of the kafka() source distributes the consumed messages into separate queues, one for each processor worker.
81+
82+
**NOTE:** This approach can improve performance, especially in high-throughput scenarios, but may also lead to significantly increased memory usage.
83+
{: .notice--info}
84+
85+
## strategy-hint()
86+
87+
| Accepted values: | assign, subscribe |
88+
| Default: | assign |
89+
90+
*Description:* This option provides a hint about which Kafka consumer strategy the kafka() source should use when the topic() list contains topic/partition definitions that could be handled in either way.
91+
92+
Why is it worth using dual consumer strategies? describes the differences between the two.
93+
94+
For details about how the resulting topic names, partitions, and Kafka assign/subscribe strategies are determined in different scenarios, see Basic startegy usage cross-reference of the different topic configuration cases
95+
96+
## time-reopen()
97+
98+
| Type: | integer in seconds |
99+
|Default:| 60 |
100+
101+
*Description:* The time {{ site.product.short_name }} waits between attempts to recover from errors that require re-initialization of the full kafka connection and its internally used data structures.
102+
103+
## topic()
104+
105+
| Type: | key-value pairs |
106+
|Default: | N/A |
107+
|Mandatory:| yes |
108+
109+
*Description:* A list of pairs consisting of Kafka topic name(s) and partition number(s) from which messages are consumed, for example:
110+
111+
``` config
112+
topic(
113+
"^topic-name-[13]$" => "-1"
114+
"topic-name-2" => "1"
115+
"topic-name-4" => "-1"
116+
"topic-name-5" => "0,1,4"
117+
}
118+
```
119+
120+
Valid topic names have the following limitations:
121+
122+
- The topic name must either contain only characters matching the pattern `[-._a-zA-Z0-9]`, or it can be a regular expression.
123+
For example: `^topic-name-[13]$` (which expands to `topic-name-1` and `topic-name-3`).
124+
- The length of the topic name must be between 1 and 249 characters.
125+
126+
The partition number must be:
127+
128+
- either a single partition number or a comma-separated list of partition numbers
129+
- a positive integer, or `-1`, which means all partitions of the topic
130+
131+
For details about how the resulting topic names, partitions, and Kafka assign/subscribe strategies are determined in different scenarios, see Basic startegy usage cross-reference of the different topic configuration cases and Why is it worth using dual consumer strategies?
132+
133+
## workers()
134+
135+
| Type: | integer |
136+
|Default:| 2 |
137+
138+
*Description:* The number of workers the `kafka()` source uses to consume and process messages from the kafka broker. By default, uses two of them:
139+
140+
{% include doc/admin-guide/options/kafka-source-workers.md %}
141+
142+
![]({{ site.baseurl}}/assets/images/caution.png) **CAUTION:**
143+
Only kafka() sources with `workers()` set to less than 3 can guarantee ordered message forwarding.
144+
{: .notice--warning}
145+
146+
**NOTE:** Kafka clients have their own threadpool, entirely independent from
147+
any {{ site.product.short_name }} settings. The `workers()` option has no effect on this threadpool.
148+
{: .notice--info}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
---
2+
title: 'kafka(): Consuming messages from Apache Kafka using the librdkafka client'
3+
short_title: kafka
4+
id: adm-src-kafka
5+
description: >-
6+
Starting with version 4.11, {{ site.product.name }} can directly fetch log messages from the Apache Kafka message bus.
7+
---
8+
9+
The kafka() source can fetch messages from explicitly named or wildcard-matching Kafka topics, and from a single partition,
10+
explicitly listed partitions, or all partitions of the selected topic(s). It can use two different strategies
11+
`assign` or `subscribe` — to start consuming messages from the selected partition(s).
12+
The strategy is determined automatically based on the topic() option definitions and the strategy-hint() option.\
13+
The basic rule is the following:
14+
15+
`subscribe` is used if the topic name contains characters that are not allowed in standard Kafka topic names
16+
(in which case the topic name is treated as a regular expression), if the partition number is `-1`, or if the value
17+
of strategy-hint() is `subscribe` (except when multiple partition numbers are provided for the
18+
same topic name — this will raise an error).
19+
20+
`assign` (the default) is used if the topic name contains only valid Kafka topic characters (for example,
21+
no regexp-related characters) and only positive partition numbers are specified.
22+
23+
## Basic startegy usage cross-reference of the different topic configuration cases
24+
25+
| topic(...) in config | topic name(s) | part. number(s) | strategy-hint() | resulting strategy |
26+
|-------------------------------------------------------|----------------------------|-----------------|-----------------|--------------------|
27+
| topic( "topic-name-1" => "1" } | topic-name-1 | 1 | assign | assign |
28+
| topic( "topic-name-1" => "1" } | topic-name-1 | 1 | subscribe | subscribe |
29+
| topic( "topic-name-1" => "1,2" } | topic-name-1 | 1-2 | assign | assign |
30+
| topic( "topic-name-1" => "1,2" } | topic-name-1 | 1-2 | subscribe | N/A (error) |
31+
| topic( "topic-name-1" => "1" "topic-name-1" => "2" } | topic-name-1 | 1-2 | assign | assign |
32+
| topic( "topic-name-1" => "1" "topic-name-1" => "2" } | topic-name-1 | 1-2 | subscribe | N/A (error) |
33+
| topic( "topic-name-1" => "1" "topic-name-3" => "2" } | topic-name-1, topic-name-3 | 1, 2 | assign | assign |
34+
| topic( "topic-name-1" => "1" "topic-name-3" => "2" } | topic-name-1, topic-name-3 | 1, 2 | subscribe | subscribe |
35+
| topic( "topic-name-1" => "-1" } | topic-name-1 | all | assign | subscribe |
36+
| topic( "topic-name-1" => "-1" } | topic-name-1 | all | subscribe | subscribe |
37+
| topic( "topic-name-1" => "1" "topic-name-3" => "-1" } | topic-name-1, topic-name-3 | 1, all | assign | subscribe |
38+
| topic( "topic-name-1" => "1" "topic-name-3" => "-1" } | topic-name-1, topic-name-3 | 1, all | subscribe | subscribe |
39+
| topic( "topic-name-3" => "1" "topic-name-3" => "-1" } | topic-name-1, topic-name-3 | 1, all | assign | subscribe |
40+
| topic( "topic-name-3" => "1" "topic-name-3" => "-1" } | topic-name-1, topic-name-3 | 1, all | subscribe | subscribe |
41+
| topic( "^topic-name-[13]$" => "2" } | topic-name-1, topic-name-3 | 2, 2 | assign | subscribe |
42+
| topic( "^topic-name-[13]$" => "2" } | topic-name-1, topic-name-3 | 2, 2 | subscribe | subscribe |
43+
| topic( "^topic-name-[13]$" => "-1" } | topic-name-1, topic-name-3 | all, all | assign | subscribe |
44+
| topic( "^topic-name-[13]$" => "-1" } | topic-name-1, topic-name-3 | all, all | subscribe | subscribe |
45+
46+
## Why is it worth using dual consumer strategies?
47+
48+
Using both consumer strategies — `assign` and `subscribe` — provides the flexibility to adapt to a wide range of Kafka setups and practical use cases, instead of forcing a single approach that may not fit all scenarios.
49+
50+
- `assign` is ideal when full control and predictability are required.
51+
- You can explicitly target a known set of topics and partitions.
52+
- Guarantees ordering semantics more reliably in single-partition or controlled multi-partition scenarios.
53+
- Works well in environments where the topic layout is static and predefined.
54+
55+
- `subscribe` is valuable when flexibility matters more than strict control.
56+
- It supports regular expressions, making it suitable when topic names follow patterns or when topics may appear dynamically.
57+
- It automatically handles partition assignments inside a consumer group, reducing configuration overhead.
58+
- It integrates better with scaling scenarios or when consumers should share workload automatically.
59+
- The possible drawbacks of unordered and/or repeated messages are acceptable.
60+
61+
By supporting both approaches, {{ site.product.short_name }} can be used effectively in a variety of Kafka consumption models — from tightly controlled, partition-specific pipelines to dynamic and scalable consumer setups that evolve with the broker configuration.
62+
63+
## Bookmarking in the kafka() source
64+
65+
By default, {{ site.product.short_name }} stores the offset of the last read message of each topic it consumes in its own persist file. This can be disabled using the disable-bookmarks() option. Automatic offset restoration takes effect at startup or reload, based on the saved offset value and the ignore-saved-bookmarks() and read-old-record() settings. If ignore-saved-bookmarks() is set to `yes`, it will not use the saved offset. Instead, if read-old-record() is set to `yes`, it will start fetching from the oldest available message, otherwise it will start from the newest one.
66+
67+
## Multiple workers in the kafka() source
68+
69+
The kafka() source can fetch and process messages from the fafka broker using multiple workers(), by default 2 of them:
70+
71+
{% include doc/admin-guide/options/kafka-source-workers.md %}

0 commit comments

Comments
 (0)