Skip to content

[SPARK-55131][SS] Change the default merge operator delimiter for RocksDB to empty string to concat without delimiter#54083

Closed
HeartSaVioR wants to merge 13 commits intoapache:masterfrom
HeartSaVioR:SPARK-55131
Closed

[SPARK-55131][SS] Change the default merge operator delimiter for RocksDB to empty string to concat without delimiter#54083
HeartSaVioR wants to merge 13 commits intoapache:masterfrom
HeartSaVioR:SPARK-55131

Conversation

@HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This PR proposes to change the default delimiter for the merge operator of RocksDB to an empty string, so that merge operation does not add a delimiter and concat two without any character.

Changing the delimiter isn't compatible with existing checkpoints, so this change is coupled with SQLConf, with known offset log metadata trick, to apply the change only for new streaming queries.

  • New SQL config: spark.sql.streaming.stateStore.rocksdb.mergeOperatorVersion
  • Default: 2 ('' as delimiter)
  • Default for existing checkpoints: 1 (',' as delimiter)

Why are the changes needed?

We found out there is no way to distinguish two cases of 1) put against non-existence value then merge and 2) merge against non-existence value then merge, from the current delimiter. There has been an "implication" that operators do ensure they call merge only when they know the operation is against existing key. This effectively requires GET operation which can be an outstanding performance impact depending on the logic.

Making delimiter to an empty string (none) would eliminate the difference between the two cases, allowing operators to perform blind merge without checking the existence of the key.

Does this PR introduce any user-facing change?

No, the change is internal and there is no user-facing change.

How was this patch tested?

Added UTs.

Was this patch authored or co-authored using generative AI tooling?

Co-authored by claude-4.5-sonnet

@github-actions
Copy link

github-actions bot commented Feb 1, 2026

JIRA Issue Information

=== Bug SPARK-55131 ===
Summary: The default delimiter of StringAppendOperator (merge operator for RocksDB) conflicts when merge is used with non-existence value
Assignee: None
Status: Open
Affected: ["4.2.0"]


This comment was automatically generated by GitHub Actions

buildConf("spark.sql.streaming.stateStore.rocksdb.mergeOperatorVersion")
.internal()
.doc("Set the RocksDB merge operator version. This will be stored in the checkpoint when " +
"starting a streaming query. The checkpoint will use this merge operator version in the " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for the entire

rocksDbOptions.setAvoidFlushDuringShutdown(true)
rocksDbOptions.setMergeOperator(new StringAppendOperator())
// Set merge operator based on version for backward compatibility
// Version 1: comma delimiter ",", Version 2: empty string ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we note that this not documented but supported in RocksDB ?

case v => throw new IllegalArgumentException(
s"Invalid merge operator version: $v. Supported versions are 1 and 2")
}
rocksDbOptions.setMergeOperator(new StringAppendOperator(mergeDelimiter))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is little different than using default constructor we were using earlier ? Will this be equivalent ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be ',' vs "," and should be the same effect. There is a test confirming like merge("key1", "a") -> merge("key1", "b") -> get("key") with version 1 = "a,b" while with version 2 = "ab".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I mean earlier we had this

 rocksDbOptions.setMergeOperator(new StringAppendOperator())

but now we have this

 rocksDbOptions.setMergeOperator(new StringAppendOperator(mergeDelimiter))

Would this be equivalent for the existing format ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't checked with JNI code of RocksDB but I think they are equivalent - I think we did not change the expected result for version 1 in the tests and the tests still pass.

// Version 2: Uses empty string "" as delimiter (no delimiter, direct concatenation)
//
// Note: this is also defined in `SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION`.
// These two places should be updated together.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this in 2 places ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need SQLConf to do the offset log trick. But in the meanwhile, it is easier for us to have the same entry here to automatically take the conf into account for RocksDBConf. We did this earlier - SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION & FORMAT_VERSION.


// include the number of delimiters used for merge
resultSize += numValues - 1
resultSize += (numValues - 1) * delimiterSize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For empty delimiter, resultSize would be 0 then ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just that we do not add the size for delimiters. resultSize being calculated in above logic retains the same.

assert(valueRowToData(iterator0.next()) === 1)
assert(!iterator0.hasNext)

merge(store, "a", 0, 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add a test for some combination of put and merge followed by get as needed to verify the result for both merge delimiter versions ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add this tomorrow. This is the only review comment I'm yet to address.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

959b0d6

Addressed.


testMergeWithOperatorVersions(
"validate rocksdb values iterator correctness - put then merge") { _ =>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra newline ?

Copy link
Contributor

@anishshri-db anishshri-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm pending nits and green CI

@HeartSaVioR
Copy link
Contributor Author

The CI result of the commit f138772 was green.

https://github.com/apache/spark/runs/62718228501

The new commit after the commit only removed the new line which should be no-op.

88b369c

@HeartSaVioR
Copy link
Contributor Author

Thanks! Merging to master.

Yicong-Huang pushed a commit to Yicong-Huang/spark that referenced this pull request Feb 8, 2026
…ksDB to empty string to concat without delimiter

### What changes were proposed in this pull request?

This PR proposes to change the default delimiter for the merge operator of RocksDB to an empty string, so that merge operation does not add a delimiter and concat two without any character.

Changing the delimiter isn't compatible with existing checkpoints, so this change is coupled with SQLConf, with known offset log metadata trick, to apply the change only for new streaming queries.

* New SQL config: `spark.sql.streaming.stateStore.rocksdb.mergeOperatorVersion`
* Default: 2 ('' as delimiter)
* Default for existing checkpoints: 1 (',' as delimiter)

### Why are the changes needed?

We found out there is no way to distinguish two cases of 1) put against non-existence value then merge and 2) merge against non-existence value then merge, from the current delimiter. There has been an "implication" that operators do ensure they call merge only when they know the operation is against existing key. This effectively requires GET operation which can be an outstanding performance impact depending on the logic.

Making delimiter to an empty string (none) would eliminate the difference between the two cases, allowing operators to perform blind merge without checking the existence of the key.

### Does this PR introduce _any_ user-facing change?

No, the change is internal and there is no user-facing change.

### How was this patch tested?

Added UTs.

### Was this patch authored or co-authored using generative AI tooling?

Co-authored by claude-4.5-sonnet

Closes apache#54083 from HeartSaVioR/SPARK-55131.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
rpnkv pushed a commit to rpnkv/spark that referenced this pull request Feb 18, 2026
…ksDB to empty string to concat without delimiter

### What changes were proposed in this pull request?

This PR proposes to change the default delimiter for the merge operator of RocksDB to an empty string, so that merge operation does not add a delimiter and concat two without any character.

Changing the delimiter isn't compatible with existing checkpoints, so this change is coupled with SQLConf, with known offset log metadata trick, to apply the change only for new streaming queries.

* New SQL config: `spark.sql.streaming.stateStore.rocksdb.mergeOperatorVersion`
* Default: 2 ('' as delimiter)
* Default for existing checkpoints: 1 (',' as delimiter)

### Why are the changes needed?

We found out there is no way to distinguish two cases of 1) put against non-existence value then merge and 2) merge against non-existence value then merge, from the current delimiter. There has been an "implication" that operators do ensure they call merge only when they know the operation is against existing key. This effectively requires GET operation which can be an outstanding performance impact depending on the logic.

Making delimiter to an empty string (none) would eliminate the difference between the two cases, allowing operators to perform blind merge without checking the existence of the key.

### Does this PR introduce _any_ user-facing change?

No, the change is internal and there is no user-facing change.

### How was this patch tested?

Added UTs.

### Was this patch authored or co-authored using generative AI tooling?

Co-authored by claude-4.5-sonnet

Closes apache#54083 from HeartSaVioR/SPARK-55131.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants