From f804864d5064a42b0248ea1d2912498c978347e7 Mon Sep 17 00:00:00 2001 From: Mate Czagany Date: Wed, 24 Jun 2026 14:27:16 +0200 Subject: [PATCH 1/4] feat: Add rate limiter for Kafka sources --- .../connector/kafka/RateLimitOptions.java | 48 +++ .../connector/kafka/RateLimitedSource.java | 311 ++++++++++++++++ .../kafka/table/SafeKafkaDynamicSource.java | 38 +- .../table/SafeKafkaDynamicTableFactory.java | 6 + .../SafeUpsertKafkaDynamicTableFactory.java | 4 + .../connector/kafka/RateLimitOptionsTest.java | 54 +++ .../kafka/RateLimitedSourceTest.java | 340 ++++++++++++++++++ 7 files changed, 797 insertions(+), 4 deletions(-) create mode 100644 connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitOptions.java create mode 100644 connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitedSource.java create mode 100644 connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitOptionsTest.java create mode 100644 connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitedSourceTest.java diff --git a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitOptions.java b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitOptions.java new file mode 100644 index 00000000..503b2a33 --- /dev/null +++ b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitOptions.java @@ -0,0 +1,48 @@ +/* + * Copyright © 2026 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.flinkrunner.connector.kafka; + +import java.util.Optional; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.ValidationException; + +public class RateLimitOptions { + + public static final ConfigOption SCAN_RATE_LIMIT_RECORDS_PER_SECOND = + ConfigOptions.key("scan.rate-limit.records-per-second") + .doubleType() + .noDefaultValue() + .withDescription( + "Optional maximum number of records per second emitted by the Kafka source."); + + public static Optional scanRateLimitRecordsPerSecond(ReadableConfig tableOptions) { + Optional recordsPerSecond = + tableOptions.getOptional(SCAN_RATE_LIMIT_RECORDS_PER_SECOND); + + recordsPerSecond.ifPresent( + value -> { + if (!Double.isFinite(value) || value <= 0D) { + throw new ValidationException( + "'%s' must be a finite number greater than 0." + .formatted(SCAN_RATE_LIMIT_RECORDS_PER_SECOND.key())); + } + }); + + return recordsPerSecond; + } +} diff --git a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitedSource.java b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitedSource.java new file mode 100644 index 00000000..391f3c47 --- /dev/null +++ b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitedSource.java @@ -0,0 +1,311 @@ +/* + * Copyright © 2026 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.flinkrunner.connector.kafka; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.watermark.WatermarkDeclaration; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; +import org.apache.flink.util.Preconditions; + +public final class RateLimitedSource + implements Source, LineageVertexProvider { + + private final Source delegate; + private final RateLimiterStrategy rateLimiterStrategy; + + public RateLimitedSource( + Source delegate, RateLimiterStrategy rateLimiterStrategy) { + this.delegate = Preconditions.checkNotNull(delegate, "Delegate source must not be null."); + this.rateLimiterStrategy = + Preconditions.checkNotNull(rateLimiterStrategy, "Rate limiter strategy must not be null."); + } + + @Override + public Boundedness getBoundedness() { + return delegate.getBoundedness(); + } + + @Override + public SourceReader createReader(SourceReaderContext readerContext) throws Exception { + return new RateLimitedSourceReader<>( + delegate.createReader(readerContext), + rateLimiterStrategy.createRateLimiter(readerContext.currentParallelism())); + } + + @Override + public SplitEnumerator createEnumerator( + SplitEnumeratorContext enumContext) throws Exception { + return delegate.createEnumerator(enumContext); + } + + @Override + public SplitEnumerator restoreEnumerator( + SplitEnumeratorContext enumContext, EnumChkT checkpoint) throws Exception { + return delegate.restoreEnumerator(enumContext, checkpoint); + } + + @Override + public SimpleVersionedSerializer getSplitSerializer() { + return delegate.getSplitSerializer(); + } + + @Override + public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { + return delegate.getEnumeratorCheckpointSerializer(); + } + + @Override + public Set declareWatermarks() { + return delegate.declareWatermarks(); + } + + @Override + public LineageVertex getLineageVertex() { + if (delegate instanceof LineageVertexProvider lineageVertexProvider) { + return lineageVertexProvider.getLineageVertex(); + } + return new EmptySourceLineageVertex(getBoundedness()); + } + + private record EmptySourceLineageVertex(Boundedness boundedness) implements SourceLineageVertex { + + @Override + public List datasets() { + return List.of(); + } + } + + private static final class RateLimitedSourceReader + implements SourceReader { + + private final SourceReader delegate; + private final RateLimiter rateLimiter; + private final CountingReaderOutput countingOutput = new CountingReaderOutput<>(); + private CompletableFuture rateLimitPermissionFuture = + CompletableFuture.completedFuture(null); + + private RateLimitedSourceReader( + SourceReader delegate, RateLimiter rateLimiter) { + this.delegate = Preconditions.checkNotNull(delegate, "Delegate reader must not be null."); + this.rateLimiter = Preconditions.checkNotNull(rateLimiter, "Rate limiter must not be null."); + } + + @Override + public void start() { + delegate.start(); + } + + @Override + public InputStatus pollNext(ReaderOutput output) throws Exception { + if (!rateLimitPermissionFuture.isDone()) { + return InputStatus.NOTHING_AVAILABLE; + } + + countingOutput.reset(output); + InputStatus status = delegate.pollNext(countingOutput); + int emittedRecords = countingOutput.getEmittedRecords(); + if (emittedRecords > 0) { + rateLimitPermissionFuture = rateLimiter.acquire(emittedRecords).toCompletableFuture(); + if (status == InputStatus.MORE_AVAILABLE && !rateLimitPermissionFuture.isDone()) { + return InputStatus.NOTHING_AVAILABLE; + } + } + return status; + } + + @Override + public List snapshotState(long checkpointId) { + return delegate.snapshotState(checkpointId); + } + + @Override + public CompletableFuture isAvailable() { + if (!rateLimitPermissionFuture.isDone()) { + return rateLimitPermissionFuture; + } + return delegate.isAvailable(); + } + + @Override + public void addSplits(List splits) { + splits.forEach(rateLimiter::notifyAddingSplit); + delegate.addSplits(splits); + } + + @Override + public void notifyNoMoreSplits() { + delegate.notifyNoMoreSplits(); + } + + @Override + public void handleSourceEvents(SourceEvent sourceEvent) { + delegate.handleSourceEvents(sourceEvent); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + delegate.notifyCheckpointComplete(checkpointId); + rateLimiter.notifyCheckpointComplete(checkpointId); + } + + @Override + public void pauseOrResumeSplits( + Collection splitsToPause, Collection splitsToResume) { + delegate.pauseOrResumeSplits(splitsToPause, splitsToResume); + } + + @Override + public void close() throws Exception { + delegate.close(); + } + } + + private static final class CountingReaderOutput implements ReaderOutput { + + private ReaderOutput delegate; + private final Map> splitOutputs = new HashMap<>(); + private int emittedRecords; + + private void reset(ReaderOutput delegate) { + ReaderOutput checkedDelegate = + Preconditions.checkNotNull(delegate, "Delegate output must not be null."); + if (this.delegate != checkedDelegate) { + splitOutputs.values().forEach(CountingSourceOutput::resetDelegateOutput); + } + this.delegate = checkedDelegate; + emittedRecords = 0; + } + + @Override + public void collect(T record) { + delegate.collect(record); + emittedRecords++; + } + + @Override + public void collect(T record, long timestamp) { + delegate.collect(record, timestamp); + emittedRecords++; + } + + @Override + public void emitWatermark(Watermark watermark) { + delegate.emitWatermark(watermark); + } + + @Override + public void markIdle() { + delegate.markIdle(); + } + + @Override + public void markActive() { + delegate.markActive(); + } + + @Override + public SourceOutput createOutputForSplit(String splitId) { + return splitOutputs.computeIfAbsent(splitId, id -> new CountingSourceOutput<>(id, this)); + } + + @Override + public void releaseOutputForSplit(String splitId) { + splitOutputs.remove(splitId); + delegate.releaseOutputForSplit(splitId); + } + + private SourceOutput createDelegateOutputForSplit(String splitId) { + return delegate.createOutputForSplit(splitId); + } + + private int getEmittedRecords() { + return emittedRecords; + } + } + + private static final class CountingSourceOutput implements SourceOutput { + + private final String splitId; + private final CountingReaderOutput readerOutput; + private SourceOutput delegateOutput; + + private CountingSourceOutput(String splitId, CountingReaderOutput readerOutput) { + this.splitId = Preconditions.checkNotNull(splitId, "Split ID must not be null."); + this.readerOutput = readerOutput; + } + + @Override + public void collect(T record) { + delegateOutput().collect(record); + readerOutput.emittedRecords++; + } + + @Override + public void collect(T record, long timestamp) { + delegateOutput().collect(record, timestamp); + readerOutput.emittedRecords++; + } + + @Override + public void emitWatermark(Watermark watermark) { + delegateOutput().emitWatermark(watermark); + } + + @Override + public void markIdle() { + delegateOutput().markIdle(); + } + + @Override + public void markActive() { + delegateOutput().markActive(); + } + + private SourceOutput delegateOutput() { + if (delegateOutput == null) { + delegateOutput = readerOutput.createDelegateOutputForSplit(splitId); + } + return delegateOutput; + } + + private void resetDelegateOutput() { + delegateOutput = null; + } + } +} diff --git a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java index 3ec44b50..1dd595cd 100644 --- a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java +++ b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java @@ -20,11 +20,15 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; +import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState; import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -57,6 +61,7 @@ import com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandler; import com.datasqrl.flinkrunner.connector.kafka.KafkaAdminIdleAdvanceReadinessChecker; import com.datasqrl.flinkrunner.connector.kafka.KafkaRecordTimestampWatermarkStrategy; +import com.datasqrl.flinkrunner.connector.kafka.RateLimitedSource; import com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SourceWatermarkConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -199,6 +204,9 @@ public class SafeKafkaDynamicSource protected final DeserFailureHandler deserFailureHandler; + /** Maximum records per second emitted by this source. */ + protected final Optional rateLimitRecordsPerSecond; + public SafeKafkaDynamicSource( DataType physicalDataType, @Nullable DecodingFormat> keyDecodingFormat, @@ -219,6 +227,7 @@ public SafeKafkaDynamicSource( String tableIdentifier, @Nullable Integer parallelism, DeserFailureHandler deserFailureHandler, + Optional rateLimitRecordsPerSecond, WatermarkEmitStrategy sourceWatermarkEmitStrategy, Optional sourceWatermarkIdleTimeout, SourceWatermarkConfig sourceWatermarkConfig) { @@ -264,6 +273,9 @@ public SafeKafkaDynamicSource( this.tableIdentifier = tableIdentifier; this.parallelism = parallelism; this.deserFailureHandler = deserFailureHandler; + this.rateLimitRecordsPerSecond = + Preconditions.checkNotNull( + rateLimitRecordsPerSecond, "Rate limit must not be null."); this.sourceWatermarkEmitStrategy = Preconditions.checkNotNull( sourceWatermarkEmitStrategy, @@ -293,7 +305,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { final TypeInformation producedTypeInfo = context.createTypeInformation(producedDataType); - final KafkaSource kafkaSource = + final Source kafkaSource = createKafkaSource(keyDeserialization, valueDeserialization, producedTypeInfo); return new DataStreamScanProvider() { @@ -303,7 +315,10 @@ public DataStream produceDataStream( final WatermarkStrategy watermarkStrategy = getWatermarkStrategy(); DataStreamSource sourceStream = execEnv.fromSource( - kafkaSource, watermarkStrategy, "KafkaSource-" + tableIdentifier); + kafkaSource, + watermarkStrategy, + "KafkaSource-" + tableIdentifier, + producedTypeInfo); providerContext.generateUid(KAFKA_TRANSFORMATION).ifPresent(sourceStream::uid); return sourceStream; } @@ -402,6 +417,7 @@ public DynamicTableSource copy() { tableIdentifier, parallelism, deserFailureHandler, + rateLimitRecordsPerSecond, sourceWatermarkEmitStrategy, sourceWatermarkIdleTimeout, sourceWatermarkConfig); @@ -450,6 +466,7 @@ public boolean equals(Object o) { && sourceWatermarkEmitStrategy == that.sourceWatermarkEmitStrategy && Objects.equals(sourceWatermarkIdleTimeout, that.sourceWatermarkIdleTimeout) && Objects.equals(sourceWatermarkConfig, that.sourceWatermarkConfig) + && Objects.equals(rateLimitRecordsPerSecond, that.rateLimitRecordsPerSecond) && Objects.equals(parallelism, that.parallelism); } @@ -480,12 +497,13 @@ public int hashCode() { sourceWatermarkEmitStrategy, sourceWatermarkIdleTimeout, sourceWatermarkConfig, + rateLimitRecordsPerSecond, parallelism); } // -------------------------------------------------------------------------------------------- - protected KafkaSource createKafkaSource( + protected Source createKafkaSource( DeserializationSchema keyDeserialization, DeserializationSchema valueDeserialization, TypeInformation producedTypeInfo) { @@ -557,7 +575,19 @@ protected KafkaSource createKafkaSource( kafkaSourceBuilder.setProperties(properties).setDeserializer(kafkaDeserializer); - return kafkaSourceBuilder.build(); + KafkaSource kafkaSource = kafkaSourceBuilder.build(); + if (rateLimitRecordsPerSecond.isEmpty()) { + return kafkaSource; + } + + return new RateLimitedSource<>( + kafkaSource, recordsPerSecondRateLimiterStrategy(rateLimitRecordsPerSecond.get())); + } + + @SuppressWarnings("unchecked") + private static RateLimiterStrategy recordsPerSecondRateLimiterStrategy( + double recordsPerSecond) { + return RateLimiterStrategy.perSecond(recordsPerSecond); } private WatermarkStrategy getWatermarkStrategy() { diff --git a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicTableFactory.java b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicTableFactory.java index 2cf2b714..6d3148f6 100644 --- a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicTableFactory.java +++ b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicTableFactory.java @@ -71,6 +71,8 @@ import static com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandlerOptions.SCAN_DESER_FAILURE_HANDLER; import static com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandlerOptions.SCAN_DESER_FAILURE_TOPIC; import static com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandlerOptions.validateDeserFailureHandlerOptions; +import static com.datasqrl.flinkrunner.connector.kafka.RateLimitOptions.SCAN_RATE_LIMIT_RECORDS_PER_SECOND; +import static com.datasqrl.flinkrunner.connector.kafka.RateLimitOptions.scanRateLimitRecordsPerSecond; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_BROKER_CHECK_TIMEOUT; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_BROKER_CHECK_TTL; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_SAFETY_MARGIN; @@ -175,6 +177,7 @@ public Set> optionalOptions() { options.add(TRANSACTION_NAMING_STRATEGY); options.add(SCAN_DESER_FAILURE_HANDLER); options.add(SCAN_DESER_FAILURE_TOPIC); + options.add(SCAN_RATE_LIMIT_RECORDS_PER_SECOND); options.add(WATERMARK_EMIT_STRATEGY); options.add(SOURCE_IDLE_TIMEOUT); options.add(SCAN_SOURCE_WATERMARK_MIN_RECORDS); @@ -276,6 +279,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { context.getObjectIdentifier().asSummaryString(), parallelism, deserFailureHandler, + scanRateLimitRecordsPerSecond(tableOptions), tableOptions.get(WATERMARK_EMIT_STRATEGY), tableOptions.getOptional(SOURCE_IDLE_TIMEOUT), sourceWatermarkConfiguration(tableOptions)); @@ -447,6 +451,7 @@ protected SafeKafkaDynamicSource createKafkaTableSource( String tableIdentifier, Integer parallelism, DeserFailureHandler deserFailureHandler, + Optional rateLimitRecordsPerSecond, WatermarkEmitStrategy sourceWatermarkEmitStrategy, Optional sourceWatermarkIdleTimeout, SourceWatermarkConfig sourceWatermarkConfig) { @@ -470,6 +475,7 @@ protected SafeKafkaDynamicSource createKafkaTableSource( tableIdentifier, parallelism, deserFailureHandler, + rateLimitRecordsPerSecond, sourceWatermarkEmitStrategy, sourceWatermarkIdleTimeout, sourceWatermarkConfig); diff --git a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeUpsertKafkaDynamicTableFactory.java b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeUpsertKafkaDynamicTableFactory.java index 7bc7ffd5..469510f4 100644 --- a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeUpsertKafkaDynamicTableFactory.java +++ b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeUpsertKafkaDynamicTableFactory.java @@ -56,6 +56,8 @@ import java.util.stream.Stream; import static com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandlerOptions.*; +import static com.datasqrl.flinkrunner.connector.kafka.RateLimitOptions.SCAN_RATE_LIMIT_RECORDS_PER_SECOND; +import static com.datasqrl.flinkrunner.connector.kafka.RateLimitOptions.scanRateLimitRecordsPerSecond; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_BROKER_CHECK_TIMEOUT; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_BROKER_CHECK_TTL; import static com.datasqrl.flinkrunner.connector.kafka.SourceWatermarkOptions.SCAN_SOURCE_WATERMARK_IDLE_ADVANCE_SAFETY_MARGIN; @@ -131,6 +133,7 @@ public Set> optionalOptions() { options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS); options.add(SCAN_DESER_FAILURE_HANDLER); options.add(SCAN_DESER_FAILURE_TOPIC); + options.add(SCAN_RATE_LIMIT_RECORDS_PER_SECOND); options.add(DELIVERY_GUARANTEE); options.add(TRANSACTIONAL_ID_PREFIX); options.add(SCAN_PARALLELISM); @@ -206,6 +209,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { context.getObjectIdentifier().asSummaryString(), parallelism, deserFailureHandler, + scanRateLimitRecordsPerSecond(tableOptions), tableOptions.get(WATERMARK_EMIT_STRATEGY), tableOptions.getOptional(SOURCE_IDLE_TIMEOUT), sourceWatermarkConfiguration(tableOptions)); diff --git a/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitOptionsTest.java b/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitOptionsTest.java new file mode 100644 index 00000000..0ab9f789 --- /dev/null +++ b/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitOptionsTest.java @@ -0,0 +1,54 @@ +/* + * Copyright © 2026 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.flinkrunner.connector.kafka; + +import static com.datasqrl.flinkrunner.connector.kafka.RateLimitOptions.SCAN_RATE_LIMIT_RECORDS_PER_SECOND; +import static com.datasqrl.flinkrunner.connector.kafka.RateLimitOptions.scanRateLimitRecordsPerSecond; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.ValidationException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class RateLimitOptionsTest { + + @Test + void returnsEmptyWhenRateLimitIsNotConfigured() { + assertThat(scanRateLimitRecordsPerSecond(new Configuration())).isEmpty(); + } + + @Test + void returnsConfiguredRateLimit() { + Configuration configuration = new Configuration(); + configuration.set(SCAN_RATE_LIMIT_RECORDS_PER_SECOND, 123.45D); + + assertThat(scanRateLimitRecordsPerSecond(configuration)).contains(123.45D); + } + + @ParameterizedTest + @ValueSource(doubles = {0D, -1D, Double.NaN, Double.POSITIVE_INFINITY}) + void rejectsInvalidRateLimits(double recordsPerSecond) { + Configuration configuration = new Configuration(); + configuration.set(SCAN_RATE_LIMIT_RECORDS_PER_SECOND, recordsPerSecond); + + assertThatThrownBy(() -> scanRateLimitRecordsPerSecond(configuration)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining(SCAN_RATE_LIMIT_RECORDS_PER_SECOND.key()); + } +} diff --git a/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitedSourceTest.java b/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitedSourceTest.java new file mode 100644 index 00000000..a69ff0b2 --- /dev/null +++ b/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitedSourceTest.java @@ -0,0 +1,340 @@ +/* + * Copyright © 2026 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.flinkrunner.connector.kafka; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.watermark.WatermarkDeclaration; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; +import org.junit.jupiter.api.Test; + +class RateLimitedSourceTest { + + @Test + void delegatesSourceMethodsAndCreatesRateLimitedReader() throws Exception { + FakeSourceReader delegateReader = new FakeSourceReader(); + RateLimitedSource source = + new RateLimitedSource<>(new FakeSource(delegateReader), new RecordingStrategy()); + + assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED); + assertThat(source.declareWatermarks()).hasSize(1); + assertThat(source.declareWatermarks().iterator().next()) + .isSameAs(TestWatermarkDeclaration.INSTANCE); + assertThat(source.getLineageVertex()).isSameAs(TestLineageVertex.INSTANCE); + } + + @Test + void rateLimitsRecordsEmittedThroughSplitOutputs() throws Exception { + SourceReaderContext readerContext = mock(SourceReaderContext.class); + when(readerContext.currentParallelism()).thenReturn(2); + + FakeSourceReader delegateReader = new FakeSourceReader(); + RecordingStrategy strategy = new RecordingStrategy(); + SourceReader reader = + new RateLimitedSource<>(new FakeSource(delegateReader), strategy) + .createReader(readerContext); + RecordingReaderOutput output = new RecordingReaderOutput(); + + assertThat(strategy.parallelism).isEqualTo(2); + + assertThat(reader.pollNext(output)).isEqualTo(InputStatus.NOTHING_AVAILABLE); + assertThat(output.records).containsExactly("record-1"); + assertThat(strategy.rateLimiter.acquiredEvents).containsExactly(1); + + assertThat(reader.isAvailable()).isSameAs(strategy.rateLimiter.lastAcquireFuture); + assertThat(reader.pollNext(output)).isEqualTo(InputStatus.NOTHING_AVAILABLE); + assertThat(delegateReader.polls).isEqualTo(1); + + strategy.rateLimiter.lastAcquireFuture.complete(null); + assertThat(reader.pollNext(output)).isEqualTo(InputStatus.NOTHING_AVAILABLE); + assertThat(delegateReader.polls).isEqualTo(2); + assertThat(output.records).containsExactly("record-1", "record-2"); + } + + @Test + void forwardsSplitAndCheckpointNotifications() throws Exception { + SourceReaderContext readerContext = mock(SourceReaderContext.class); + when(readerContext.currentParallelism()).thenReturn(1); + + FakeSourceReader delegateReader = new FakeSourceReader(); + RecordingStrategy strategy = new RecordingStrategy(); + SourceReader reader = + new RateLimitedSource<>(new FakeSource(delegateReader), strategy) + .createReader(readerContext); + + TestSplit split = new TestSplit("split-1"); + reader.addSplits(List.of(split)); + reader.notifyCheckpointComplete(42L); + + assertThat(strategy.rateLimiter.addedSplits).containsExactly(split); + assertThat(strategy.rateLimiter.completedCheckpoints).containsExactly(42L); + assertThat(delegateReader.splits).containsExactly(split); + assertThat(delegateReader.completedCheckpoints).containsExactly(42L); + } + + private record TestSplit(String splitId) implements SourceSplit {} + + private enum TestWatermarkDeclaration implements WatermarkDeclaration { + INSTANCE; + + @Override + public String getIdentifier() { + return "test-watermark"; + } + } + + private enum TestLineageVertex implements LineageVertex { + INSTANCE; + + @Override + public List datasets() { + return List.of(); + } + } + + private static final class FakeSource + implements Source, LineageVertexProvider { + + private final FakeSourceReader reader; + + private FakeSource(FakeSourceReader reader) { + this.reader = reader; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SourceReader createReader(SourceReaderContext readerContext) { + return reader; + } + + @Override + public SplitEnumerator createEnumerator( + SplitEnumeratorContext enumContext) { + throw new UnsupportedOperationException(); + } + + @Override + public SplitEnumerator restoreEnumerator( + SplitEnumeratorContext enumContext, Integer checkpoint) { + throw new UnsupportedOperationException(); + } + + @Override + public SimpleVersionedSerializer getSplitSerializer() { + throw new UnsupportedOperationException(); + } + + @Override + public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { + throw new UnsupportedOperationException(); + } + + @Override + public Set declareWatermarks() { + return Set.of(TestWatermarkDeclaration.INSTANCE); + } + + @Override + public LineageVertex getLineageVertex() { + return TestLineageVertex.INSTANCE; + } + } + + private static final class FakeSourceReader implements SourceReader { + + private final List splits = new ArrayList<>(); + private final List completedCheckpoints = new ArrayList<>(); + private SourceOutput splitOutput; + private int polls; + + @Override + public void start() {} + + @Override + public InputStatus pollNext(ReaderOutput output) { + polls++; + if (splitOutput == null) { + splitOutput = output.createOutputForSplit("split-1"); + } + splitOutput.collect("record-" + polls); + return InputStatus.MORE_AVAILABLE; + } + + @Override + public List snapshotState(long checkpointId) { + return splits; + } + + @Override + public CompletableFuture isAvailable() { + return CompletableFuture.completedFuture(null); + } + + @Override + public void addSplits(List splits) { + this.splits.addAll(splits); + } + + @Override + public void notifyNoMoreSplits() {} + + @Override + public void handleSourceEvents(SourceEvent sourceEvent) {} + + @Override + public void notifyCheckpointComplete(long checkpointId) { + completedCheckpoints.add(checkpointId); + } + + @Override + public void pauseOrResumeSplits( + Collection splitsToPause, Collection splitsToResume) {} + + @Override + public void close() {} + } + + private static final class RecordingStrategy implements RateLimiterStrategy { + + private final RecordingRateLimiter rateLimiter = new RecordingRateLimiter(); + private int parallelism; + + @Override + public RateLimiter createRateLimiter(int parallelism) { + this.parallelism = parallelism; + return rateLimiter; + } + } + + private static final class RecordingRateLimiter implements RateLimiter { + + private final List acquiredEvents = new ArrayList<>(); + private final List addedSplits = new ArrayList<>(); + private final List completedCheckpoints = new ArrayList<>(); + private CompletableFuture lastAcquireFuture = CompletableFuture.completedFuture(null); + + @Override + public CompletionStage acquire(int numberOfEvents) { + acquiredEvents.add(numberOfEvents); + lastAcquireFuture = new CompletableFuture<>(); + return lastAcquireFuture; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + completedCheckpoints.add(checkpointId); + } + + @Override + public void notifyAddingSplit(TestSplit split) { + addedSplits.add(split); + } + } + + private static final class RecordingReaderOutput implements ReaderOutput { + + private final List records = new ArrayList<>(); + private final Map> splitOutputs = new HashMap<>(); + + @Override + public void collect(String record) { + records.add(record); + } + + @Override + public void collect(String record, long timestamp) { + records.add(record); + } + + @Override + public void emitWatermark(Watermark watermark) {} + + @Override + public void markIdle() {} + + @Override + public void markActive() {} + + @Override + public SourceOutput createOutputForSplit(String splitId) { + return splitOutputs.computeIfAbsent(splitId, ignored -> new RecordingSourceOutput(this)); + } + + @Override + public void releaseOutputForSplit(String splitId) { + splitOutputs.remove(splitId); + } + } + + private static final class RecordingSourceOutput implements SourceOutput { + + private final RecordingReaderOutput readerOutput; + + private RecordingSourceOutput(RecordingReaderOutput readerOutput) { + this.readerOutput = readerOutput; + } + + @Override + public void collect(String record) { + readerOutput.collect(record); + } + + @Override + public void collect(String record, long timestamp) { + readerOutput.collect(record, timestamp); + } + + @Override + public void emitWatermark(Watermark watermark) {} + + @Override + public void markIdle() {} + + @Override + public void markActive() {} + } +} From fa1a334b18a7fbbf0ba0673fb55149194ba2588a Mon Sep 17 00:00:00 2001 From: Mate Czagany Date: Fri, 26 Jun 2026 13:30:29 +0200 Subject: [PATCH 2/4] Add docs where needed --- .../connector/kafka/RateLimitedSource.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitedSource.java b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitedSource.java index 391f3c47..366c6bf6 100644 --- a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitedSource.java +++ b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitedSource.java @@ -43,6 +43,12 @@ import org.apache.flink.streaming.api.lineage.SourceLineageVertex; import org.apache.flink.util.Preconditions; +/** + * Rate-limits emitted records while preserving all callbacks of the wrapped source. + * + *

Flink's built-in rate-limited reader does not forward Kafka's checkpoint-complete callback, + * which Kafka uses for offset commits. + */ public final class RateLimitedSource implements Source, LineageVertexProvider { @@ -137,12 +143,15 @@ public InputStatus pollNext(ReaderOutput output) throws Exception { return InputStatus.NOTHING_AVAILABLE; } + // pollNext() returns only InputStatus, not the emitted-record count. Count actual collect() + // calls and acquire permits after delegation. countingOutput.reset(output); InputStatus status = delegate.pollNext(countingOutput); int emittedRecords = countingOutput.getEmittedRecords(); if (emittedRecords > 0) { rateLimitPermissionFuture = rateLimiter.acquire(emittedRecords).toCompletableFuture(); if (status == InputStatus.MORE_AVAILABLE && !rateLimitPermissionFuture.isDone()) { + // Force the runtime through isAvailable() so it waits on the limiter before polling again. return InputStatus.NOTHING_AVAILABLE; } } @@ -180,6 +189,7 @@ public void handleSourceEvents(SourceEvent sourceEvent) { @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { + // KafkaSourceReader uses checkpoint completion for offset commits; keep it delegated. delegate.notifyCheckpointComplete(checkpointId); rateLimiter.notifyCheckpointComplete(checkpointId); } @@ -196,6 +206,8 @@ public void close() throws Exception { } } + // Records can be emitted through either ReaderOutput or split-specific SourceOutput, so both + // paths need to be counted. private static final class CountingReaderOutput implements ReaderOutput { private ReaderOutput delegate; @@ -206,6 +218,8 @@ private void reset(ReaderOutput delegate) { ReaderOutput checkedDelegate = Preconditions.checkNotNull(delegate, "Delegate output must not be null."); if (this.delegate != checkedDelegate) { + // Split outputs are scoped to the current ReaderOutput. Invalidate cached delegates if + // Flink supplies a different output instance. splitOutputs.values().forEach(CountingSourceOutput::resetDelegateOutput); } this.delegate = checkedDelegate; From 6a29418ed7380b168aba1ab59b14bc33295cb21a Mon Sep 17 00:00:00 2001 From: Mate Czagany Date: Fri, 26 Jun 2026 14:19:40 +0200 Subject: [PATCH 3/4] Fix CI --- .../flinkrunner/connector/kafka/RateLimitedSource.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitedSource.java b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitedSource.java index 366c6bf6..b38d3c4c 100644 --- a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitedSource.java +++ b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitedSource.java @@ -151,7 +151,8 @@ public InputStatus pollNext(ReaderOutput output) throws Exception { if (emittedRecords > 0) { rateLimitPermissionFuture = rateLimiter.acquire(emittedRecords).toCompletableFuture(); if (status == InputStatus.MORE_AVAILABLE && !rateLimitPermissionFuture.isDone()) { - // Force the runtime through isAvailable() so it waits on the limiter before polling again. + // Force the runtime through isAvailable() so it waits on the limiter before polling + // again. return InputStatus.NOTHING_AVAILABLE; } } From 2b71f51e0e43acbb2f469b48242ed0f0b4a91b42 Mon Sep 17 00:00:00 2001 From: Mate Czagany Date: Mon, 29 Jun 2026 16:50:54 +0200 Subject: [PATCH 4/4] update records-per-second to int type --- .../connector/kafka/RateLimitOptions.java | 13 ++++++------- .../kafka/table/SafeKafkaDynamicSource.java | 6 +++--- .../kafka/table/SafeKafkaDynamicTableFactory.java | 2 +- .../connector/kafka/RateLimitOptionsTest.java | 8 ++++---- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitOptions.java b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitOptions.java index 503b2a33..c88d5320 100644 --- a/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitOptions.java +++ b/connectors/kafka-safe-connector/src/main/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitOptions.java @@ -23,23 +23,22 @@ public class RateLimitOptions { - public static final ConfigOption SCAN_RATE_LIMIT_RECORDS_PER_SECOND = + public static final ConfigOption SCAN_RATE_LIMIT_RECORDS_PER_SECOND = ConfigOptions.key("scan.rate-limit.records-per-second") - .doubleType() + .intType() .noDefaultValue() .withDescription( "Optional maximum number of records per second emitted by the Kafka source."); - public static Optional scanRateLimitRecordsPerSecond(ReadableConfig tableOptions) { - Optional recordsPerSecond = + public static Optional scanRateLimitRecordsPerSecond(ReadableConfig tableOptions) { + Optional recordsPerSecond = tableOptions.getOptional(SCAN_RATE_LIMIT_RECORDS_PER_SECOND); recordsPerSecond.ifPresent( value -> { - if (!Double.isFinite(value) || value <= 0D) { + if (value <= 0) { throw new ValidationException( - "'%s' must be a finite number greater than 0." - .formatted(SCAN_RATE_LIMIT_RECORDS_PER_SECOND.key())); + "'%s' must be greater than 0.".formatted(SCAN_RATE_LIMIT_RECORDS_PER_SECOND.key())); } }); diff --git a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java index 1dd595cd..4d7917e6 100644 --- a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java +++ b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicSource.java @@ -205,7 +205,7 @@ public class SafeKafkaDynamicSource protected final DeserFailureHandler deserFailureHandler; /** Maximum records per second emitted by this source. */ - protected final Optional rateLimitRecordsPerSecond; + protected final Optional rateLimitRecordsPerSecond; public SafeKafkaDynamicSource( DataType physicalDataType, @@ -227,7 +227,7 @@ public SafeKafkaDynamicSource( String tableIdentifier, @Nullable Integer parallelism, DeserFailureHandler deserFailureHandler, - Optional rateLimitRecordsPerSecond, + Optional rateLimitRecordsPerSecond, WatermarkEmitStrategy sourceWatermarkEmitStrategy, Optional sourceWatermarkIdleTimeout, SourceWatermarkConfig sourceWatermarkConfig) { @@ -586,7 +586,7 @@ protected Source createKafka @SuppressWarnings("unchecked") private static RateLimiterStrategy recordsPerSecondRateLimiterStrategy( - double recordsPerSecond) { + int recordsPerSecond) { return RateLimiterStrategy.perSecond(recordsPerSecond); } diff --git a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicTableFactory.java b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicTableFactory.java index 6d3148f6..e9a81f8e 100644 --- a/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicTableFactory.java +++ b/connectors/kafka-safe-connector/src/main/java/org/apache/flink/streaming/connectors/kafka/table/SafeKafkaDynamicTableFactory.java @@ -451,7 +451,7 @@ protected SafeKafkaDynamicSource createKafkaTableSource( String tableIdentifier, Integer parallelism, DeserFailureHandler deserFailureHandler, - Optional rateLimitRecordsPerSecond, + Optional rateLimitRecordsPerSecond, WatermarkEmitStrategy sourceWatermarkEmitStrategy, Optional sourceWatermarkIdleTimeout, SourceWatermarkConfig sourceWatermarkConfig) { diff --git a/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitOptionsTest.java b/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitOptionsTest.java index 0ab9f789..c3296fe7 100644 --- a/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitOptionsTest.java +++ b/connectors/kafka-safe-connector/src/test/java/com/datasqrl/flinkrunner/connector/kafka/RateLimitOptionsTest.java @@ -36,14 +36,14 @@ void returnsEmptyWhenRateLimitIsNotConfigured() { @Test void returnsConfiguredRateLimit() { Configuration configuration = new Configuration(); - configuration.set(SCAN_RATE_LIMIT_RECORDS_PER_SECOND, 123.45D); + configuration.set(SCAN_RATE_LIMIT_RECORDS_PER_SECOND, 123); - assertThat(scanRateLimitRecordsPerSecond(configuration)).contains(123.45D); + assertThat(scanRateLimitRecordsPerSecond(configuration)).contains(123); } @ParameterizedTest - @ValueSource(doubles = {0D, -1D, Double.NaN, Double.POSITIVE_INFINITY}) - void rejectsInvalidRateLimits(double recordsPerSecond) { + @ValueSource(ints = {0, -1}) + void rejectsInvalidRateLimits(int recordsPerSecond) { Configuration configuration = new Configuration(); configuration.set(SCAN_RATE_LIMIT_RECORDS_PER_SECOND, recordsPerSecond);