Skip to content

feat: Add rate limiter for Kafka sources#357

Merged
ferenc-csaky merged 4 commits into
mainfrom
feat/kafka-rate-limiter
Jun 29, 2026
Merged

feat: Add rate limiter for Kafka sources#357
ferenc-csaky merged 4 commits into
mainfrom
feat/kafka-rate-limiter

Conversation

@mateczagany

Copy link
Copy Markdown
Collaborator

Adds optional Kafka source emission rate limiting via scan.rate-limit.records-per-second.

It's being implemented by using a wrapper that preserves Kafka checkpoint/restart behavior by delegating method calls to the underlying Kafka source. The limiter throttles emitted records, not Kafka fetches. If Kafka fetches ahead, un-emitted records are not checkpointed as consumed and will be refetched after restart.

CountingSourceOutput is used because SourceReader.pollNext() only returns InputStatus, not an emitted-record count. This keeps limiting based on actual collect() calls while avoiding Flink 2.2.1’s built-in RateLimitedSourceReader, which does not delegate checkpoint-complete callbacks to the actual input source.


public class RateLimitOptions {

public static final ConfigOption<Double> SCAN_RATE_LIMIT_RECORDS_PER_SECOND =

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use Integer for the config property, i don't think it's realistic that anybody will give a fractional number, it's only double cause it will be divided by the parallelism, cause this is applied per-subtask.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, thank you!

@ferenc-csaky ferenc-csaky left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ferenc-csaky ferenc-csaky enabled auto-merge (squash) June 29, 2026 14:52
@ferenc-csaky ferenc-csaky merged commit 9ea6a14 into main Jun 29, 2026
13 checks passed
@ferenc-csaky ferenc-csaky deleted the feat/kafka-rate-limiter branch June 29, 2026 15:01
@ferenc-csaky ferenc-csaky added this to the 0.10.4 milestone Jun 29, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants