[SPARK-55131][SS] Change the default merge operator delimiter for RocksDB to empty string to concat without delimiter#54083
[SPARK-55131][SS] Change the default merge operator delimiter for RocksDB to empty string to concat without delimiter#54083HeartSaVioR wants to merge 13 commits intoapache:masterfrom
Conversation
JIRA Issue Information=== Bug SPARK-55131 === This comment was automatically generated by GitHub Actions |
bd195fe to
aab929f
Compare
| buildConf("spark.sql.streaming.stateStore.rocksdb.mergeOperatorVersion") | ||
| .internal() | ||
| .doc("Set the RocksDB merge operator version. This will be stored in the checkpoint when " + | ||
| "starting a streaming query. The checkpoint will use this merge operator version in the " + |
| rocksDbOptions.setAvoidFlushDuringShutdown(true) | ||
| rocksDbOptions.setMergeOperator(new StringAppendOperator()) | ||
| // Set merge operator based on version for backward compatibility | ||
| // Version 1: comma delimiter ",", Version 2: empty string "" |
There was a problem hiding this comment.
Should we note that this not documented but supported in RocksDB ?
| case v => throw new IllegalArgumentException( | ||
| s"Invalid merge operator version: $v. Supported versions are 1 and 2") | ||
| } | ||
| rocksDbOptions.setMergeOperator(new StringAppendOperator(mergeDelimiter)) |
There was a problem hiding this comment.
This is little different than using default constructor we were using earlier ? Will this be equivalent ?
There was a problem hiding this comment.
It should be ',' vs "," and should be the same effect. There is a test confirming like merge("key1", "a") -> merge("key1", "b") -> get("key") with version 1 = "a,b" while with version 2 = "ab".
There was a problem hiding this comment.
No I mean earlier we had this
rocksDbOptions.setMergeOperator(new StringAppendOperator())
but now we have this
rocksDbOptions.setMergeOperator(new StringAppendOperator(mergeDelimiter))
Would this be equivalent for the existing format ?
There was a problem hiding this comment.
I haven't checked with JNI code of RocksDB but I think they are equivalent - I think we did not change the expected result for version 1 in the tests and the tests still pass.
| // Version 2: Uses empty string "" as delimiter (no delimiter, direct concatenation) | ||
| // | ||
| // Note: this is also defined in `SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION`. | ||
| // These two places should be updated together. |
There was a problem hiding this comment.
Why do we need this in 2 places ?
There was a problem hiding this comment.
We need SQLConf to do the offset log trick. But in the meanwhile, it is easier for us to have the same entry here to automatically take the conf into account for RocksDBConf. We did this earlier - SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION & FORMAT_VERSION.
|
|
||
| // include the number of delimiters used for merge | ||
| resultSize += numValues - 1 | ||
| resultSize += (numValues - 1) * delimiterSize |
There was a problem hiding this comment.
For empty delimiter, resultSize would be 0 then ?
There was a problem hiding this comment.
It's just that we do not add the size for delimiters. resultSize being calculated in above logic retains the same.
| assert(valueRowToData(iterator0.next()) === 1) | ||
| assert(!iterator0.hasNext) | ||
|
|
||
| merge(store, "a", 0, 2) |
There was a problem hiding this comment.
Can we also add a test for some combination of put and merge followed by get as needed to verify the result for both merge delimiter versions ?
There was a problem hiding this comment.
I'll add this tomorrow. This is the only review comment I'm yet to address.
There was a problem hiding this comment.
Addressed.
959b0d6 to
f138772
Compare
|
|
||
| testMergeWithOperatorVersions( | ||
| "validate rocksdb values iterator correctness - put then merge") { _ => | ||
|
|
anishshri-db
left a comment
There was a problem hiding this comment.
lgtm pending nits and green CI
|
The CI result of the commit f138772 was green. https://github.com/apache/spark/runs/62718228501 The new commit after the commit only removed the new line which should be no-op. |
|
Thanks! Merging to master. |
…ksDB to empty string to concat without delimiter
### What changes were proposed in this pull request?
This PR proposes to change the default delimiter for the merge operator of RocksDB to an empty string, so that merge operation does not add a delimiter and concat two without any character.
Changing the delimiter isn't compatible with existing checkpoints, so this change is coupled with SQLConf, with known offset log metadata trick, to apply the change only for new streaming queries.
* New SQL config: `spark.sql.streaming.stateStore.rocksdb.mergeOperatorVersion`
* Default: 2 ('' as delimiter)
* Default for existing checkpoints: 1 (',' as delimiter)
### Why are the changes needed?
We found out there is no way to distinguish two cases of 1) put against non-existence value then merge and 2) merge against non-existence value then merge, from the current delimiter. There has been an "implication" that operators do ensure they call merge only when they know the operation is against existing key. This effectively requires GET operation which can be an outstanding performance impact depending on the logic.
Making delimiter to an empty string (none) would eliminate the difference between the two cases, allowing operators to perform blind merge without checking the existence of the key.
### Does this PR introduce _any_ user-facing change?
No, the change is internal and there is no user-facing change.
### How was this patch tested?
Added UTs.
### Was this patch authored or co-authored using generative AI tooling?
Co-authored by claude-4.5-sonnet
Closes apache#54083 from HeartSaVioR/SPARK-55131.
Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…ksDB to empty string to concat without delimiter
### What changes were proposed in this pull request?
This PR proposes to change the default delimiter for the merge operator of RocksDB to an empty string, so that merge operation does not add a delimiter and concat two without any character.
Changing the delimiter isn't compatible with existing checkpoints, so this change is coupled with SQLConf, with known offset log metadata trick, to apply the change only for new streaming queries.
* New SQL config: `spark.sql.streaming.stateStore.rocksdb.mergeOperatorVersion`
* Default: 2 ('' as delimiter)
* Default for existing checkpoints: 1 (',' as delimiter)
### Why are the changes needed?
We found out there is no way to distinguish two cases of 1) put against non-existence value then merge and 2) merge against non-existence value then merge, from the current delimiter. There has been an "implication" that operators do ensure they call merge only when they know the operation is against existing key. This effectively requires GET operation which can be an outstanding performance impact depending on the logic.
Making delimiter to an empty string (none) would eliminate the difference between the two cases, allowing operators to perform blind merge without checking the existence of the key.
### Does this PR introduce _any_ user-facing change?
No, the change is internal and there is no user-facing change.
### How was this patch tested?
Added UTs.
### Was this patch authored or co-authored using generative AI tooling?
Co-authored by claude-4.5-sonnet
Closes apache#54083 from HeartSaVioR/SPARK-55131.
Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
What changes were proposed in this pull request?
This PR proposes to change the default delimiter for the merge operator of RocksDB to an empty string, so that merge operation does not add a delimiter and concat two without any character.
Changing the delimiter isn't compatible with existing checkpoints, so this change is coupled with SQLConf, with known offset log metadata trick, to apply the change only for new streaming queries.
spark.sql.streaming.stateStore.rocksdb.mergeOperatorVersionWhy are the changes needed?
We found out there is no way to distinguish two cases of 1) put against non-existence value then merge and 2) merge against non-existence value then merge, from the current delimiter. There has been an "implication" that operators do ensure they call merge only when they know the operation is against existing key. This effectively requires GET operation which can be an outstanding performance impact depending on the logic.
Making delimiter to an empty string (none) would eliminate the difference between the two cases, allowing operators to perform blind merge without checking the existence of the key.
Does this PR introduce any user-facing change?
No, the change is internal and there is no user-facing change.
How was this patch tested?
Added UTs.
Was this patch authored or co-authored using generative AI tooling?
Co-authored by claude-4.5-sonnet