metrics: add DML rows affected metric#3909
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Summary of ChangesHello @wk989898, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances TiCDC's monitoring capabilities by introducing a new metric that precisely tracks the number of rows affected by Data Manipulation Language (DML) operations as they are flushed to the downstream system. This provides critical insights into the volume and type of data changes being replicated, improving observability and enabling more accurate performance analysis of the data synchronization process. The new metric is also integrated into the existing Grafana dashboards for immediate visualization. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a new metric, ticdc_sink_dml_event_affected_row_count, to monitor the number of rows affected by DML operations in the sink. The changes are well-implemented, including the definition of the new Prometheus counter, the logic in the MySQL sink to track DML row types, and the recording of both expected and actual affected row counts. The Grafana dashboards have also been updated to visualize this new metric. I have a couple of suggestions for pkg/sink/mysql/mysql_writer_dml.go to improve code maintainability and cleanliness.
pkg/sink/mysql/mysql_writer_dml.go
Outdated
| // Considering the batch algorithm in safe mode is O(n^3), which n is the number of rows. | ||
| // So we need to limit the number of rows in one batch to avoid performance issues. | ||
| func (w *Writer) generateBatchSQL(events []*commonEvent.DMLEvent) ([]string, [][]interface{}) { | ||
| func (w *Writer) generateBatchSQL(events []*commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType) { |
There was a problem hiding this comment.
Many SQL generation functions in this file, such as this one, now return three separate slices: []string, [][]interface{}, and []common.RowType. This makes the function signatures complex and harder to maintain. To improve code clarity, consider encapsulating these related values into a struct. For example:
type dmlGroup struct {
sqls []string
values [][]interface{}
rowTypes []common.RowType
}This would simplify function signatures to something like func (w *Writer) generateBatchSQL(...) *dmlGroup. You could even potentially reuse or adapt the existing preparedDMLs struct for this purpose, which would further improve consistency.
pkg/sink/mysql/mysql_writer_dml.go
Outdated
| // if err != nil { | ||
| // log.Warn("get rows affected rows failed", zap.Error(err)) | ||
| // } else { | ||
| // w.statistics.RecordRowsAffected(rowsAffected, dmls.rowTypes[i]) | ||
| // } |
|
Please paste the screenshots of your newly added metrics in the pr description. |
pkg/eventservice/event_scanner.go
Outdated
| if err != nil { | ||
| return err | ||
| } | ||
| shouldSplit, err = event.IsUKChanged(rawEvent, p.currentTxn.CurrentDMLEvent.TableInfo) |
There was a problem hiding this comment.
Do we have a type like 'PkChanged'?
There was a problem hiding this comment.
No. If the PK changed, there should be two raw events.
Signed-off-by: wk989898 <nhsmwk@gmail.com>
Signed-off-by: wk989898 <nhsmwk@gmail.com>
Signed-off-by: wk989898 <nhsmwk@gmail.com>
📝 WalkthroughWalkthroughAdds Grafana panels for DML event types and affected rows; introduces a new Prometheus counter for EventService DML types and a sink counter for affected rows; threads a Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Source as EventSource
participant ES as EventService
participant Metrics as Metrics
participant Sink as Sink
Source->>ES: send raw KV events (include mode)
ES->>ES: classify event via RawKVEntry.GetType()
ES->>Metrics: update EventServiceSendDMLTypeCount(mode, dml_type)
ES->>Sink: forward DML events (include mode, raw type)
Sink->>Sink: generate batched SQLs and collect rowTypes
Sink->>Metrics: update ExecDMLEventRowsAffectedCounter(keyspace,changefeed,type,row_type)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
pkg/sink/mysql/sql_builder.go (1)
144-150:⚠️ Potential issue | 🔴 CriticalBug:
rowTypesis not cleared inreset(), causing stale data when the object is reused fromdmlsPool.Since
preparedDMLsinstances are pooled viadmlsPool(line 134), omittingrowTypesfromreset()means a recycled object will carry over the previous caller's row types, inflatingRowsAffected()and corrupting metrics.Proposed fix
func (d *preparedDMLs) reset() { d.sqls = d.sqls[:0] d.values = d.values[:0] d.tsPairs = d.tsPairs[:0] + d.rowTypes = d.rowTypes[:0] d.rowCount = 0 d.approximateSize = 0 }pkg/sink/mysql/mysql_writer_dml.go (1)
112-123:⚠️ Potential issue | 🔴 Critical
genActiveActiveSQLdoes not returnrowTypes— causes index-out-of-bounds insequenceExecute.When
EnableActiveActiveis true (line 112),genActiveActiveSQLreturns only([]string, [][]interface{})without row types. TherowTypesListremains empty, butdmls.sqlsis populated. Later insequenceExecuteat line 777,dmls.rowTypes[i]will panic.Either
genActiveActiveSQLneeds to be updated to return[]common.RowType, orsequenceExecuteneeds a bounds check ondmls.rowTypes.Proposed fix — add bounds check in sequenceExecute
rowsAffected, err := res.RowsAffected() if err != nil { log.Warn("get rows affected rows failed", zap.Error(err)) - } else { + } else if i < len(dmls.rowTypes) { w.statistics.RecordRowsAffected(rowsAffected, dmls.rowTypes[i]) + } else { + w.statistics.RecordTotalRowsAffected(rowsAffected, 1) }
🤖 Fix all issues with AI agents
In `@pkg/eventservice/event_scanner.go`:
- Around line 760-778: The change causes event.IsUKChanged to be called for all
updates, changing semantics and cost when p.outputRawChangeEvent is true; revert
to the original behavior by short-circuiting when outputRawChangeEvent is true:
call updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, false) and return
p.currentTxn.AppendRow(rawEvent, p.mounter.DecodeToChunk, p.filter) without
invoking event.IsUKChanged; only call event.IsUKChanged and
updateMetricEventServiceSendDMLTypeCount with the real shouldSplit when
outputRawChangeEvent is false so UK-change decoding happens only when splitting
is actually allowed.
In `@pkg/metrics/event_service.go`:
- Around line 173-178: The Help string for the Prometheus metric created in
EventServiceSendDMLTypeCount (prometheus.NewCounterVec with CounterOpts) has a
grammar mistake; update the CounterOpts.Help value from "The number of different
dml events type sent by the event service" to "The number of different dml event
types sent by the event service" so the metric help text reads correctly.
In `@pkg/metrics/statistics.go`:
- Around line 111-124: The Close() method currently omits cleanup for
ExecDMLEventRowsAffectedCounter causing metric label leaks; update Close() to
call ExecDMLEventRowsAffectedCounter.Delete(...) for the same label sets you use
elsewhere by iterating/removing combinations of keyspace
(b.changefeedID.Keyspace()), changefeedID (b.changefeedID.Name()), the
"actual"/"expected" value dimension and the four row_type values ("total",
"insert", "update", "delete") so that any time-series created by
RecordTotalRowsAffected and RecordRowsAffected are deleted when Close() runs.
In `@pkg/sink/mysql/mysql_writer_dml.go`:
- Around line 571-589: There is a duplicate method generateNormalSQLs; remove
this redundant declaration from this file and rely on the single canonical
implementation in mysql_writer_dml_batch.go (or vice versa if you prefer to keep
this one) so the build no longer sees two definitions. If you remove this file's
generateNormalSQLs, ensure any callers still resolve (they can call the
remaining generateNormalSQLs) and, if needed, replace direct logic here with a
call to the kept function (generateNormalSQLs) or to the helper
generateNormalSQL to avoid duplicating behavior; also clean up any now-unused
imports/variables in this file.
- Around line 366-371: The current conflict-detection path in
mysql_writer_dml.go logs full SQL and values with log.Info immediately before
log.Panic (see variables/functions nextRowType, compareKeys, rowKey, rowLists,
and generateBatchSQLsPerEvent), which is noisy and may leak sensitive data;
change the log.Info call to log.Error or log.Warn and remove or redact sensitive
payloads (avoid logging full values array or replace with a safe summary like
value counts/types or a hashed/redacted representation) while keeping contextual
fields (e.g., writer id, i, j, Events metadata) so the panic still includes
useful non-PII diagnostics.
- Around line 870-891: genUpdateSQL may return multiple SQL strings but the TiDB
branch only appends a single common.RowTypeUpdate per call, causing len(sqls) >
len(rowTypes) and panics in sequenceExecute; fix by appending one RowTypeUpdate
per SQL returned: after s, v := w.genUpdateSQL(rows...) and appending s and v to
sqls/values, append len(s) copies of common.RowTypeUpdate to rowTypes (e.g.,
loop or use slice multiplication) so rowTypes, sqls, and values stay aligned.
- Around line 813-818: The current code calls res.RowsAffected() and passes that
to w.statistics.RecordTotalRowsAffected, but res.RowsAffected() on
multi-statement transactions returns only the final statement's count (often 0);
change the logic to use the aggregated count from dmls.RowsAffected() (or if you
can access the driver raw result, use mysql.Result.AllRowsAffected()) when
calling RecordTotalRowsAffected so total rows across all statements are
recorded; update the block around res.RowsAffected() (referencing
res.RowsAffected(), dmls.RowsAffected(), and
w.statistics.RecordTotalRowsAffected()) to prefer dmls.RowsAffected() and fall
back to per-statement totals only if available from the raw mysql result.
🧹 Nitpick comments (7)
pkg/sink/mysql/sql_builder.go (1)
134-142: Pool'sNewfunc doesn't pre-allocaterowTypes, unlike the other slices.For consistency and to avoid an initial nil-slice allocation on first append, consider pre-allocating
rowTypesthe same waysqls,values, andtsPairsare.Proposed fix
var dmlsPool = sync.Pool{ New: func() interface{} { return &preparedDMLs{ sqls: make([]string, 0, 128), values: make([][]interface{}, 0, 128), + rowTypes: make([]common.RowType, 0, 128), tsPairs: make([]tsPair, 0, 128), } }, }pkg/common/kv_entry.go (1)
78-93: Minor: redundant condition inelse if.On line 84, since we already know
len(v.OldValue) != 0(theifon line 82 was false), thelen(v.OldValue) > 0check is always true here. You can simplify to justlen(v.Value) > 0.Also note: an
OpTypePutwithOldValuepresent butValueempty will fall through to"unknown". This matches the existingIsInsert/IsUpdatehelpers, so it seems intentional, but worth a comment if that state is truly impossible.Simplification
func (v *RawKVEntry) GetType() string { switch v.OpType { case OpTypePut: if len(v.OldValue) == 0 { return "insert" - } else if len(v.OldValue) > 0 && len(v.Value) > 0 { + } else if len(v.Value) > 0 { return "update" } case OpTypeDelete:pkg/metrics/statistics.go (1)
111-116: Exported methods missing doc comments.
RecordTotalRowsAffectedandRecordRowsAffectedare exported but lack Go doc comments. The rest of the exported methods in this file (RecordBatchExecution,RecordDDLExecution,Close) all have doc comments.Add doc comments
+// RecordTotalRowsAffected records the total actual and expected rows affected by DML execution. func (b *Statistics) RecordTotalRowsAffected(actualRowsAffected, expectedRowsAffected int64) {+// RecordRowsAffected records the actual rows affected for a specific row type and updates the total counters. func (b *Statistics) RecordRowsAffected(rowsAffected int64, rowType common.RowType) {pkg/sink/mysql/mysql_writer_dml.go (1)
681-685:tx.Commit()error not wrapped witherrors.Trace.Per coding guidelines, errors from library calls should be wrapped immediately. The error from
tx.Commit()is returned bare.Proposed fix
if err = tx.Commit(); err != nil { - return 0, 0, err + return 0, 0, errors.Trace(err) }As per coding guidelines, "When an error comes from a third-party or library call in Go, wrap it immediately with
errors.Trace(err)orerrors.WrapError(...)to attach a stack trace".pkg/sink/mysql/mysql_writer_dml_test.go (2)
149-156:rowTypesvalues not asserted beyond length checks.Throughout the tests,
rowTypesis captured but only its length is sometimes asserted (e.g., line 152). The actual content ofrowTypes(e.g., whether it containsRowTypeInsert,RowTypeDelete,RowTypeUpdate) is never verified. Adding value assertions would strengthen the test coverage for the new metric propagation.For example, at line 152 you could additionally check:
require.Equal(t, common.RowTypeInsert, rowTypes[0]) require.Equal(t, common.RowTypeInsert, rowTypes[1])
160-163: Missinglen(rowTypes)assertion for safe-mode batch SQL test.Line 160 captures
rowTypesfromgenerateBatchSQLin safe mode, but unlike the unsafe-mode test at line 152, there is norequire.Equal(t, 1, len(rowTypes))assertion here. This should be consistent with other test cases.Proposed fix
sql, args, rowTypes = writer.generateBatchSQL([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) + require.Equal(t, 1, len(rowTypes)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?),(?,?),(?,?)", sql[0])pkg/eventservice/event_scanner.go (1)
143-143: Redundantmodeparameter —scanAndMergeEventsalready has access via receiver.
scanAndMergeEventsis a method on*eventScanner, which already storesmodeas a field (s.mode). Passing it as an explicit parameter is unnecessary.♻️ Proposed simplification
-func (s *eventScanner) scanAndMergeEvents( - session *session, - merger *eventMerger, - iter eventstore.EventIterator, - mode int64, -) (bool, error) { +func (s *eventScanner) scanAndMergeEvents( + session *session, + merger *eventMerger, + iter eventstore.EventIterator, +) (bool, error) { tableID := session.dataRange.Span.TableID dispatcher := session.dispatcherStat - processor := newDMLProcessor(s.mounter, s.schemaGetter, dispatcher.filter, dispatcher.info.IsOutputRawChangeEvent(), mode) + processor := newDMLProcessor(s.mounter, s.schemaGetter, dispatcher.filter, dispatcher.info.IsOutputRawChangeEvent(), s.mode)And update the call site:
- interrupted, err := s.scanAndMergeEvents(sess, merger, iter, s.mode) + interrupted, err := s.scanAndMergeEvents(sess, merger, iter)Also applies to: 186-194
| rawType := rawEvent.GetType() | ||
| if !rawEvent.IsUpdate() { | ||
| updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, false) | ||
| return p.currentTxn.AppendRow(rawEvent, p.mounter.DecodeToChunk, p.filter) | ||
| } | ||
|
|
||
| var ( | ||
| shouldSplit bool | ||
| err error | ||
| ) | ||
| if !p.outputRawChangeEvent { | ||
| shouldSplit, err = event.IsUKChanged(rawEvent, p.currentTxn.CurrentDMLEvent.TableInfo) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| shouldSplit, err = event.IsUKChanged(rawEvent, p.currentTxn.CurrentDMLEvent.TableInfo) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, shouldSplit) | ||
|
|
||
| if !shouldSplit { | ||
| if p.outputRawChangeEvent && !shouldSplit { | ||
| return p.currentTxn.AppendRow(rawEvent, p.mounter.DecodeToChunk, p.filter) | ||
| } |
There was a problem hiding this comment.
Behavior change: updates with UK changes are now split even when outputRawChangeEvent is true.
Previously, when outputRawChangeEvent was true, update events were always appended directly without checking for UK changes. Now, IsUKChanged is always evaluated, and if the UK changed, the update falls through to the split-into-delete+insert path (lines 780–788) regardless of outputRawChangeEvent.
This is a semantic change beyond adding metrics:
- Correctness concern: Consumers expecting raw change events will now receive split delete+insert pairs when UK changes, which may break downstream expectations.
- Performance:
IsUKChangeddecodes both old and new row values for every update event, even whenoutputRawChangeEvent=truewhere previously no decoding was needed. This could noticeably increase CPU/memory cost on update-heavy workloads.
Please confirm this behavioral change is intentional. If only the metric is needed and the original append behavior for outputRawChangeEvent=true should be preserved, the logic would need restructuring:
♻️ Suggested restructuring if the old behavior should be preserved
rawType := rawEvent.GetType()
if !rawEvent.IsUpdate() {
updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, false)
return p.currentTxn.AppendRow(rawEvent, p.mounter.DecodeToChunk, p.filter)
}
- var (
- shouldSplit bool
- err error
- )
- shouldSplit, err = event.IsUKChanged(rawEvent, p.currentTxn.CurrentDMLEvent.TableInfo)
- if err != nil {
- return err
- }
- updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, shouldSplit)
-
- if p.outputRawChangeEvent && !shouldSplit {
- return p.currentTxn.AppendRow(rawEvent, p.mounter.DecodeToChunk, p.filter)
- }
+ shouldSplit, err := event.IsUKChanged(rawEvent, p.currentTxn.CurrentDMLEvent.TableInfo)
+ if err != nil {
+ return err
+ }
+ updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, shouldSplit)
+
+ if p.outputRawChangeEvent {
+ return p.currentTxn.AppendRow(rawEvent, p.mounter.DecodeToChunk, p.filter)
+ }🤖 Prompt for AI Agents
In `@pkg/eventservice/event_scanner.go` around lines 760 - 778, The change causes
event.IsUKChanged to be called for all updates, changing semantics and cost when
p.outputRawChangeEvent is true; revert to the original behavior by
short-circuiting when outputRawChangeEvent is true: call
updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, false) and return
p.currentTxn.AppendRow(rawEvent, p.mounter.DecodeToChunk, p.filter) without
invoking event.IsUKChanged; only call event.IsUKChanged and
updateMetricEventServiceSendDMLTypeCount with the real shouldSplit when
outputRawChangeEvent is false so UK-change decoding happens only when splitting
is actually allowed.
| EventServiceSendDMLTypeCount = prometheus.NewCounterVec(prometheus.CounterOpts{ | ||
| Namespace: "ticdc", | ||
| Subsystem: "event_service", | ||
| Name: "send_dml_type_count", | ||
| Help: "The number of different dml events type sent by the event service", | ||
| }, []string{"mode", "dml_type"}) |
There was a problem hiding this comment.
Minor grammar nit in Help text.
"The number of different dml events type sent by the event service" should be "The number of different dml event types sent by the event service".
Proposed fix
EventServiceSendDMLTypeCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "event_service",
Name: "send_dml_type_count",
- Help: "The number of different dml events type sent by the event service",
+ Help: "The number of different dml event types sent by the event service",
}, []string{"mode", "dml_type"})📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| EventServiceSendDMLTypeCount = prometheus.NewCounterVec(prometheus.CounterOpts{ | |
| Namespace: "ticdc", | |
| Subsystem: "event_service", | |
| Name: "send_dml_type_count", | |
| Help: "The number of different dml events type sent by the event service", | |
| }, []string{"mode", "dml_type"}) | |
| EventServiceSendDMLTypeCount = prometheus.NewCounterVec(prometheus.CounterOpts{ | |
| Namespace: "ticdc", | |
| Subsystem: "event_service", | |
| Name: "send_dml_type_count", | |
| Help: "The number of different dml event types sent by the event service", | |
| }, []string{"mode", "dml_type"}) |
🤖 Prompt for AI Agents
In `@pkg/metrics/event_service.go` around lines 173 - 178, The Help string for the
Prometheus metric created in EventServiceSendDMLTypeCount
(prometheus.NewCounterVec with CounterOpts) has a grammar mistake; update the
CounterOpts.Help value from "The number of different dml events type sent by the
event service" to "The number of different dml event types sent by the event
service" so the metric help text reads correctly.
| func (b *Statistics) RecordTotalRowsAffected(actualRowsAffected, expectedRowsAffected int64) { | ||
| keyspace := b.changefeedID.Keyspace() | ||
| changefeedID := b.changefeedID.Name() | ||
| ExecDMLEventRowsAffectedCounter.WithLabelValues(keyspace, changefeedID, "actual", "total").Add(float64(actualRowsAffected)) | ||
| ExecDMLEventRowsAffectedCounter.WithLabelValues(keyspace, changefeedID, "expected", "total").Add(float64(expectedRowsAffected)) | ||
| } | ||
|
|
||
| func (b *Statistics) RecordRowsAffected(rowsAffected int64, rowType common.RowType) { | ||
| keyspace := b.changefeedID.Keyspace() | ||
| changefeedID := b.changefeedID.Name() | ||
| ExecDMLEventRowsAffectedCounter.WithLabelValues(keyspace, changefeedID, "actual", rowType.String()).Add(float64(rowsAffected)) | ||
| ExecDMLEventRowsAffectedCounter.WithLabelValues(keyspace, changefeedID, "expected", rowType.String()).Add(1) | ||
| b.RecordTotalRowsAffected(rowsAffected, 1) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find RowType constants and their String() method
fd -t f '\.go$' | xargs rg -l 'RowType' | head -20Repository: pingcap/ticdc
Length of output: 676
🏁 Script executed:
#!/bin/bash
# Find the RowType definition and String() method
rg 'type RowType' -A 20 --type=goRepository: pingcap/ticdc
Length of output: 872
🏁 Script executed:
#!/bin/bash
# Find the Close() method in statistics.go
rg -n 'func.*Close' pkg/metrics/statistics.go -A 30Repository: pingcap/ticdc
Length of output: 922
🏁 Script executed:
#!/bin/bash
# Check what counters are cleaned up in Close() and find ExecDMLEventRowsAffectedCounter
rg -n 'ExecDMLEventRowsAffectedCounter|DeleteLabelValues' pkg/metrics/statistics.go -B 2 -A 2Repository: pingcap/ticdc
Length of output: 1638
Add missing ExecDMLEventRowsAffectedCounter cleanup in Close().
The Close() method (lines 127–143) deletes label sets for every other counter in the statistics object (DDL histogram, DML counter, errors, bytes, etc.) but omits cleanup for ExecDMLEventRowsAffectedCounter. This causes metric label leaks after a changefeed is removed, as stale time-series for all (keyspace, changefeed, type, row_type) combinations persist in the Prometheus registry.
The four possible row_type label values are:
"total"(fromRecordTotalRowsAffected)"insert","update","delete"(fromRowType.String())
Proposed fix — add cleanup in Close()
func (b *Statistics) Close() {
keyspace := b.changefeedID.Keyspace()
changefeedID := b.changefeedID.Name()
+ for _, typ := range []string{"actual", "expected"} {
+ for _, rowType := range []string{"total", "insert", "update", "delete"} {
+ ExecDMLEventRowsAffectedCounter.DeleteLabelValues(keyspace, changefeedID, typ, rowType)
+ }
+ }
ExecDDLHistogram.DeleteLabelValues(keyspace, changefeedID)🤖 Prompt for AI Agents
In `@pkg/metrics/statistics.go` around lines 111 - 124, The Close() method
currently omits cleanup for ExecDMLEventRowsAffectedCounter causing metric label
leaks; update Close() to call ExecDMLEventRowsAffectedCounter.Delete(...) for
the same label sets you use elsewhere by iterating/removing combinations of
keyspace (b.changefeedID.Keyspace()), changefeedID (b.changefeedID.Name()), the
"actual"/"expected" value dimension and the four row_type values ("total",
"insert", "update", "delete") so that any time-series created by
RecordTotalRowsAffected and RecordRowsAffected are deleted when Close() runs.
pkg/sink/mysql/mysql_writer_dml.go
Outdated
| if nextRowType == common.RowTypeInsert { | ||
| if compareKeys(rowKey, rowLists[j].RowKeys) { | ||
| sql, values, rowTypes := w.generateBatchSQLsPerEvent(events) | ||
| log.Info("normal sql should be", zap.Any("sql", sql), zap.Any("values", values), zap.Any("rowTypes", rowTypes), zap.Int("writerID", w.id)) | ||
| log.Panic("Here are two invalid rows with the same row type and keys", zap.Any("Events", events), zap.Any("i", i), zap.Any("j", j), zap.Int("writerID", w.id)) | ||
| } |
There was a problem hiding this comment.
log.Info with SQL values on conflict detection paths — noisy and potentially sensitive.
Lines 369 and 408 use log.Info to dump SQL queries and their values right before a log.Panic. Since log.Panic will already terminate the process, this log.Info line should use log.Error or log.Warn at minimum, and consider whether logging full SQL values is appropriate in production (may contain PII).
🤖 Prompt for AI Agents
In `@pkg/sink/mysql/mysql_writer_dml.go` around lines 366 - 371, The current
conflict-detection path in mysql_writer_dml.go logs full SQL and values with
log.Info immediately before log.Panic (see variables/functions nextRowType,
compareKeys, rowKey, rowLists, and generateBatchSQLsPerEvent), which is noisy
and may leak sensitive data; change the log.Info call to log.Error or log.Warn
and remove or redact sensitive payloads (avoid logging full values array or
replace with a safe summary like value counts/types or a hashed/redacted
representation) while keeping contextual fields (e.g., writer id, i, j, Events
metadata) so the panic still includes useful non-PII diagnostics.
pkg/sink/mysql/mysql_writer_dml.go
Outdated
| rowsAffected, err := res.RowsAffected() | ||
| if err != nil { | ||
| log.Warn("get rows affected rows failed", zap.Error(err)) | ||
| } else { | ||
| w.statistics.RecordTotalRowsAffected(rowsAffected, dmls.RowsAffected()) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
go-sql-driver/mysql RowsAffected multi-statement execution behavior
💡 Result:
With github.com/go-sql-driver/mysql, when you enable multiStatements=true and execute multiple SQL statements in a single Exec(...) call, the database/sql-level sql.Result only exposes one LastInsertId() / RowsAffected() value (i.e., not a per-statement breakdown). To get per-statement affected-row counts (and insert IDs), you must drop down to the driver result type and read the slices of values. (github.com)
Concretely:
- Enable multi-statements via DSN:
...?multiStatements=true. (github.com) - For result sets from multiple statements, iterate with
Rows.NextResultSet()to reach the 2nd+ statement’s results. (github.com) - For affected rows / last insert ids for multiple statements, use
sql.Conn.Raw()and cast the returned result tomysql.Result, then call:AllRowsAffected()→[]int64(one entry per statement)AllLastInsertIds()→[]int64(one entry per statement) (github.com)
- Also note the driver restriction: with
multiStatements,?parameters may only appear in the first statement (unless usinginterpolateParams, with caveats). (github.com)
Citations:
- 1: https://github.com/go-sql-driver/mysql?utm_source=openai
- 2: https://github.com/go-sql-driver/mysql?utm_source=openai
- 3: https://github.com/go-sql-driver/mysql?utm_source=openai
- 4: https://github.com/go-sql-driver/mysql?utm_source=openai
- 5: https://github.com/go-sql-driver/mysql?utm_source=openai
🏁 Script executed:
# Find the actual code file and examine the context
fd "mysql_writer_dml.go" --exec cat -n {} \; | head -100Repository: pingcap/ticdc
Length of output: 4433
🏁 Script executed:
# Search for multiStmtExecute function to understand how multi-statement execution is handled
rg "multiStmtExecute" -B 5 -A 20Repository: pingcap/ticdc
Length of output: 7802
🏁 Script executed:
# Get the specific lines 813-818 from mysql_writer_dml.go
sed -n '800,830p' pkg/sink/mysql/mysql_writer_dml.goRepository: pingcap/ticdc
Length of output: 1267
🏁 Script executed:
# Search for all RowsAffected calls in the mysql writer files
rg "RowsAffected" pkg/sink/mysql/ -B 3 -A 3Repository: pingcap/ticdc
Length of output: 2008
🌐 Web query:
go-sql-driver/mysql RowsAffected return value last statement aggregate
💡 Result:
With go-sql-driver/mysql, sql.Result.RowsAffected() reflects MySQL’s “affected rows” for the last statement only (because the underlying API reports it “for the last statement”). (dev.mysql.com)
Multi-statement queries (multiStatements=true)
If you send multiple statements in one Exec (e.g., UPDATE ...; UPDATE ...;), RowsAffected() will correspond to the final statement’s affected-row count, not an aggregate/sum across statements. (github.com)
If you need per-statement counts (or want to aggregate them yourself), the driver exposes a MySQL-specific result via sql.Conn.Raw() and mysql.Result.AllRowsAffected() (slice of counts, one per statement). (github.com)
“Aggregate” SELECT (e.g., SELECT COUNT(*) ...)
Even though MySQL can report “affected rows” for a SELECT like “number of rows returned,” an aggregate query like SELECT COUNT(*) ... returns exactly 1 row (the row containing the count). So if you (mis)use Exec+RowsAffected() on such a query, you’ll typically see 1, not the counted value. You must read the count from the result set (use QueryRow/Scan). (dev.mysql.com)
Citations:
- 1: https://dev.mysql.com/doc/c-api/8.4/en/mysql-affected-rows.html?utm_source=openai
- 2: https://github.com/go-sql-driver/mysql?utm_source=openai
- 3: https://github.com/go-sql-driver/mysql?utm_source=openai
- 4: https://dev.mysql.com/doc/c-api/8.4/en/mysql-affected-rows.html?utm_source=openai
RowsAffected() on multi-statement transactions returns only the final statement's value, not the aggregate.
With BEGIN; stmt1; stmt2; ...; COMMIT; execution, res.RowsAffected() returns the affected rows of the last statement (COMMIT), which is 0. The RecordTotalRowsAffected() call should use dmls.RowsAffected() instead of res.RowsAffected() to accurately track total rows affected across all statements, or access per-statement counts via mysql.Result.AllRowsAffected() if available from the connection's raw result.
🤖 Prompt for AI Agents
In `@pkg/sink/mysql/mysql_writer_dml.go` around lines 813 - 818, The current code
calls res.RowsAffected() and passes that to
w.statistics.RecordTotalRowsAffected, but res.RowsAffected() on multi-statement
transactions returns only the final statement's count (often 0); change the
logic to use the aggregated count from dmls.RowsAffected() (or if you can access
the driver raw result, use mysql.Result.AllRowsAffected()) when calling
RecordTotalRowsAffected so total rows across all statements are recorded; update
the block around res.RowsAffected() (referencing res.RowsAffected(),
dmls.RowsAffected(), and w.statistics.RecordTotalRowsAffected()) to prefer
dmls.RowsAffected() and fall back to per-statement totals only if available from
the raw mysql result.
pkg/sink/mysql/mysql_writer_dml.go
Outdated
| if len(updateRows) > 0 { | ||
| if w.cfg.IsTiDB { | ||
| for _, rows := range updateRows { | ||
| s, v := w.genUpdateSQL(rows...) | ||
| sqls = append(sqls, s...) | ||
| values = append(values, v...) | ||
| rowTypes = append(rowTypes, common.RowTypeUpdate) | ||
| } | ||
| // The behavior of update statement differs between TiDB and MySQL. | ||
| // So we don't use batch update statement when downstream is MySQL. | ||
| // Ref:https://docs.pingcap.com/tidb/stable/sql-statement-update#mysql-compatibility | ||
| } else { | ||
| for _, rows := range updateRows { | ||
| for _, row := range rows { | ||
| sql, value := row.GenSQL(sqlmodel.DMLUpdate) | ||
| sqls = append(sqls, sql) | ||
| values = append(values, value) | ||
| rowTypes = append(rowTypes, common.RowTypeUpdate) | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
rowTypes / sqls length mismatch when genUpdateSQL returns multiple SQLs — will cause index-out-of-bounds panic.
genUpdateSQL (line 987) can return multiple SQL strings when the row data size exceeds MaxMultiUpdateRowSize. However, only one RowTypeUpdate is appended to rowTypes per batch, while all returned SQLs are appended to sqls. This means len(sqls) > len(rowTypes), and sequenceExecute (line 777: dmls.rowTypes[i]) will panic with an index-out-of-bounds error.
The same issue exists for the non-TiDB update path (lines 882-889), though there each row produces one SQL and one rowType, so it's correct. The TiDB path at line 872–877 is the problem.
Proposed fix
if w.cfg.IsTiDB {
for _, rows := range updateRows {
s, v := w.genUpdateSQL(rows...)
sqls = append(sqls, s...)
values = append(values, v...)
- rowTypes = append(rowTypes, common.RowTypeUpdate)
+ for range s {
+ rowTypes = append(rowTypes, common.RowTypeUpdate)
+ }
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if len(updateRows) > 0 { | |
| if w.cfg.IsTiDB { | |
| for _, rows := range updateRows { | |
| s, v := w.genUpdateSQL(rows...) | |
| sqls = append(sqls, s...) | |
| values = append(values, v...) | |
| rowTypes = append(rowTypes, common.RowTypeUpdate) | |
| } | |
| // The behavior of update statement differs between TiDB and MySQL. | |
| // So we don't use batch update statement when downstream is MySQL. | |
| // Ref:https://docs.pingcap.com/tidb/stable/sql-statement-update#mysql-compatibility | |
| } else { | |
| for _, rows := range updateRows { | |
| for _, row := range rows { | |
| sql, value := row.GenSQL(sqlmodel.DMLUpdate) | |
| sqls = append(sqls, sql) | |
| values = append(values, value) | |
| rowTypes = append(rowTypes, common.RowTypeUpdate) | |
| } | |
| } | |
| } | |
| } | |
| if len(updateRows) > 0 { | |
| if w.cfg.IsTiDB { | |
| for _, rows := range updateRows { | |
| s, v := w.genUpdateSQL(rows...) | |
| sqls = append(sqls, s...) | |
| values = append(values, v...) | |
| for range s { | |
| rowTypes = append(rowTypes, common.RowTypeUpdate) | |
| } | |
| } | |
| // The behavior of update statement differs between TiDB and MySQL. | |
| // So we don't use batch update statement when downstream is MySQL. | |
| // Ref:https://docs.pingcap.com/tidb/stable/sql-statement-update#mysql-compatibility | |
| } else { | |
| for _, rows := range updateRows { | |
| for _, row := range rows { | |
| sql, value := row.GenSQL(sqlmodel.DMLUpdate) | |
| sqls = append(sqls, sql) | |
| values = append(values, value) | |
| rowTypes = append(rowTypes, common.RowTypeUpdate) | |
| } | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
In `@pkg/sink/mysql/mysql_writer_dml.go` around lines 870 - 891, genUpdateSQL may
return multiple SQL strings but the TiDB branch only appends a single
common.RowTypeUpdate per call, causing len(sqls) > len(rowTypes) and panics in
sequenceExecute; fix by appending one RowTypeUpdate per SQL returned: after s, v
:= w.genUpdateSQL(rows...) and appending s and v to sqls/values, append len(s)
copies of common.RowTypeUpdate to rowTypes (e.g., loop or use slice
multiplication) so rowTypes, sqls, and values stay aligned.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/sink/mysql/mysql_writer_dml.go (1)
92-107:⚠️ Potential issue | 🟠 MajorActive-active path omits
rowTypestracking — metrics will silently fail.The active-active SQL generation path (
genActiveActiveSQLand its callees) returns only([]string, [][]interface{})while all other paths return([]string, [][]interface{}, []common.RowType). At line 96,genActiveActiveSQLis called but only two values are captured, leavingrowTypesListuninitialized. Line 106 then appendsniltodmls.rowTypes, creating a length mismatch:dmls.sqlswill be populated whiledmls.rowTypesremains empty. TheRowsAffected()method (sql_builder.go:120) will iterate an empty slice and return 0, causing metrics collection to silently fail for active-active replication.Update
genActiveActiveSQL,generateActiveActiveNormalSQLs, andgenerateActiveActiveBatchSQLto return[]common.RowTypealongside the current return values, and capture all three values at line 96.
| func (w *Writer) generateBatchSQLInUnSafeMode(events []*commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType) { | ||
| tableInfo := events[0].TableInfo | ||
| finalRowLists, _, err := w.buildRowChangesForUnSafeBatch(events, tableInfo) | ||
| if err != nil { | ||
| sql, values := w.generateBatchSQLsPerEvent(events) | ||
| log.Info("normal sql should be", zap.Any("sql", sql), zap.String("values", util.RedactAny(values)), zap.Int("writerID", w.id)) | ||
| log.Panic("invalid rows when generating batch SQL in unsafe mode", | ||
| zap.Error(err), zap.Any("events", events), zap.Int("writerID", w.id)) | ||
| type RowChangeWithKeys struct { | ||
| RowChange *commonEvent.RowChange | ||
| RowKeys []byte | ||
| PreRowKeys []byte | ||
| } | ||
|
|
||
| // Step 1 extract all rows in these events to rowLists, and calcuate row key for each row(based on pk value) | ||
| rowLists := make([]RowChangeWithKeys, 0) | ||
| for _, event := range events { | ||
| for { | ||
| row, ok := event.GetNextRow() | ||
| if !ok { | ||
| event.Rewind() | ||
| break | ||
| } | ||
| rowChangeWithKeys := RowChangeWithKeys{RowChange: &row} | ||
| if !row.Row.IsEmpty() { | ||
| _, keys := genKeyAndHash(&row.Row, tableInfo) | ||
| rowChangeWithKeys.RowKeys = keys | ||
| } | ||
| if !row.PreRow.IsEmpty() { | ||
| _, keys := genKeyAndHash(&row.PreRow, tableInfo) | ||
| rowChangeWithKeys.PreRowKeys = keys | ||
| } | ||
| rowLists = append(rowLists, rowChangeWithKeys) | ||
| } | ||
| } | ||
|
|
||
| // Step 2 combine the rows until there is no change | ||
| // Consider we will split the event if PK is changed, so the Update will not change the PK | ||
| // for the rows comparation, there are six situations: | ||
| // 1. the previous row is Delete A, the next row is Insert A. --- we don't need to combine the rows. | ||
| // 2. the previous row is Delete A, the next row is Update xx where A . --- we don't need to combine the rows. | ||
| // 3. the previous row is Insert A, the next row is Delete A. --- remove the row of `Insert A` | ||
| // 4. the previous row is Insert A, the next row is Update xx where A -- remove the row of `Insert A`, change the row `Update A` to `Insert A` | ||
| // 5. the previous row is Update xx where A, the next row is Delete A. --- remove the row `Update xx where A` | ||
| // 6. the previous row is Update xx where A, the next row is Update xx where A. --- we need to remove the row, and change the second Update's preRows = first Update's preRows | ||
| for { | ||
| // hasUpdate to determine whether we can break the combine logic | ||
| hasUpdate := false | ||
| // flagList used to store the exists or not for this row. True means exists. | ||
| flagList := make([]bool, len(rowLists)) | ||
| for i := 0; i < len(rowLists); i++ { | ||
| flagList[i] = true | ||
| } | ||
| for i := 0; i < len(rowLists); i++ { | ||
| if !flagList[i] { | ||
| continue | ||
| } | ||
| innerLoop: | ||
| for j := i + 1; j < len(rowLists); j++ { | ||
| if !flagList[j] { | ||
| continue | ||
| } | ||
| rowType := rowLists[i].RowChange.RowType | ||
| nextRowType := rowLists[j].RowChange.RowType | ||
| switch rowType { | ||
| case common.RowTypeInsert: | ||
| rowKey := rowLists[i].RowKeys | ||
| if nextRowType == common.RowTypeInsert { | ||
| if compareKeys(rowKey, rowLists[j].RowKeys) { | ||
| sql, values, rowTypes := w.generateBatchSQLsPerEvent(events) | ||
| log.Info("normal sql should be", zap.Any("sql", sql), zap.Any("values", values), zap.Any("rowTypes", rowTypes), zap.Int("writerID", w.id)) | ||
| log.Panic("Here are two invalid rows with the same row type and keys", zap.Any("Events", events), zap.Any("i", i), zap.Any("j", j), zap.Int("writerID", w.id)) | ||
| } | ||
| } else if nextRowType == common.RowTypeDelete { | ||
| if compareKeys(rowKey, rowLists[j].PreRowKeys) { | ||
| // remove the insert one, and break the inner loop for row i | ||
| flagList[i] = false | ||
| hasUpdate = true | ||
| break innerLoop | ||
| } | ||
| } else if nextRowType == common.RowTypeUpdate { | ||
| if !compareKeys(rowLists[j].PreRowKeys, rowLists[j].RowKeys) { | ||
| log.Panic("The Update Row have different Row Key", zap.Any("Events", events), zap.Int("writerID", w.id)) | ||
| } | ||
| if compareKeys(rowKey, rowLists[j].PreRowKeys) { | ||
| // remove insert one, and break the inner loop for row i | ||
| flagList[i] = false | ||
| // change update one to insert | ||
| preRowChange := rowLists[j].RowChange | ||
| newRowChange := commonEvent.RowChange{ | ||
| Row: preRowChange.Row, | ||
| RowType: common.RowTypeInsert, | ||
| } | ||
| rowLists[j] = RowChangeWithKeys{ | ||
| RowChange: &newRowChange, | ||
| RowKeys: rowLists[j].RowKeys, | ||
| } | ||
| hasUpdate = true | ||
| break innerLoop | ||
| } | ||
| } | ||
| case common.RowTypeUpdate: | ||
| rowKey := rowLists[i].RowKeys | ||
| if !compareKeys(rowKey, rowLists[i].PreRowKeys) { | ||
| log.Panic("The Update Row have different Row Key", zap.Any("Events", events), zap.Int("writerID", w.id)) | ||
| } | ||
| if nextRowType == common.RowTypeInsert { | ||
| if compareKeys(rowKey, rowLists[j].RowKeys) { | ||
| sql, values, rowTypes := w.generateBatchSQLsPerEvent(events) | ||
| log.Info("normal sql should be", zap.Any("sql", sql), zap.Any("values", values), zap.Any("rowTypes", rowTypes), zap.Int("writerID", w.id)) | ||
| log.Panic("Here are two invalid rows with the same row type and keys", zap.Any("Events", events), zap.Any("i", i), zap.Any("j", j), zap.Int("writerID", w.id)) | ||
| } | ||
| } else if nextRowType == common.RowTypeDelete { | ||
| if compareKeys(rowKey, rowLists[j].PreRowKeys) { | ||
| // remove the update one, and break the inner loop | ||
| flagList[j] = false | ||
| // change the update to delete | ||
| preRowChange := rowLists[i].RowChange | ||
| newRowChange := commonEvent.RowChange{ | ||
| PreRow: preRowChange.PreRow, | ||
| RowType: common.RowTypeDelete, | ||
| } | ||
| rowLists[i] = RowChangeWithKeys{ | ||
| RowChange: &newRowChange, | ||
| PreRowKeys: rowKey, | ||
| } | ||
| hasUpdate = true | ||
| break innerLoop | ||
| } | ||
| } else if nextRowType == common.RowTypeUpdate { | ||
| if compareKeys(rowKey, rowLists[j].PreRowKeys) { | ||
| if !compareKeys(rowLists[j].PreRowKeys, rowLists[j].RowKeys) { | ||
| log.Panic("The Update Row have different Row Key", zap.Any("Events", events), zap.Int("writerID", w.id)) | ||
| } | ||
| // remove the first one, update the second one, then break | ||
| newRowChange := commonEvent.RowChange{ | ||
| PreRow: rowLists[j].RowChange.PreRow, | ||
| Row: rowLists[j].RowChange.Row, | ||
| RowType: common.RowTypeUpdate, | ||
| } | ||
| rowLists[j] = RowChangeWithKeys{ | ||
| RowChange: &newRowChange, | ||
| PreRowKeys: rowKey, | ||
| RowKeys: rowKey, | ||
| } | ||
| flagList[i] = false | ||
| hasUpdate = true | ||
| break innerLoop | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if !hasUpdate { | ||
| // means no more changes for the rows, break and generate sqls. | ||
| break | ||
| } else { | ||
| newRowLists := make([]RowChangeWithKeys, 0, len(rowLists)) | ||
| for i := 0; i < len(rowLists); i++ { | ||
| if flagList[i] { | ||
| newRowLists = append(newRowLists, rowLists[i]) | ||
| } | ||
| } | ||
| rowLists = newRowLists | ||
| } | ||
|
|
||
| } | ||
|
|
||
| finalRowLists := make([]*commonEvent.RowChange, 0, len(rowLists)) | ||
|
|
||
| for i := 0; i < len(rowLists); i++ { | ||
| finalRowLists = append(finalRowLists, rowLists[i].RowChange) | ||
| } | ||
|
|
||
| // Step 3. generate sqls based on finalRowLists | ||
| return w.batchSingleTxnDmls(finalRowLists, tableInfo, false) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if buildRowChangesForUnSafeBatch is still called anywhere
rg -n 'buildRowChangesForUnSafeBatch' --type=goRepository: pingcap/ticdc
Length of output: 598
🏁 Script executed:
#!/bin/bash
# Get the file size first
wc -l pkg/sink/mysql/mysql_writer_dml_batch.go
# View the buildRowChangesForUnSafeBatch function (lines 246-407)
sed -n '246,407p' pkg/sink/mysql/mysql_writer_dml_batch.go
# View the generateBatchSQLInUnSafeMode function (lines 411-581)
sed -n '411,581p' pkg/sink/mysql/mysql_writer_dml_batch.goRepository: pingcap/ticdc
Length of output: 12717
🏁 Script executed:
#!/bin/bash
# Check for log.Panic usage in the new function and nearby code
rg -n 'log\.Panic' pkg/sink/mysql/mysql_writer_dml_batch.go -A2Repository: pingcap/ticdc
Length of output: 1445
Code duplication in generateBatchSQLInUnSafeMode duplicates entire merge algorithm; also introduces naming and error handling inconsistencies.
The new generateBatchSQLInUnSafeMode (lines 411–581) duplicates the row-merging logic already present in buildRowChangesForUnSafeBatch (lines 246–407), including the identical combine loop and switch cases (~165 lines).
Beyond duplication, the new function introduces three issues:
- Struct field naming violation:
RowChangeWithKeysuses PascalCase fields (RowChange,RowKeys,PreRowKeys), violating Go conventions that require lowerCamelCase for field names. The original function correctly usesrowChange,rowKeys,preRowKeys. - Error handling regression: The new function uses
log.Panic()(5 times) for error cases instead of returning proper errors like the original function (cerror.ErrUnexpected.FastGenByArgs). This is incompatible with graceful error handling. - Lost
commitTstracking: The original function preservescommitTsfor each row (used for last-writer-wins semantics); the new function discards it entirely.
Note: buildRowChangesForUnSafeBatch is not dead code—it's actively called from mysql_writer_dml_active_active.go line 107.
Consider refactoring to reuse buildRowChangesForUnSafeBatch with the necessary adjustments (e.g., wrapping its return values) rather than duplicating the algorithm.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/sink/mysql/mysql_writer_dml_batch.go (1)
504-511:⚠️ Potential issue | 🔴 CriticalBug:
rowTypescount mismatchessqlscount whengenUpdateSQLreturns multiple statements.
genUpdateSQL(line 623–641) returns one SQL per row when the payload exceedsMaxMultiUpdateRowSize, but line 510 always appends exactly oneRowTypeUpdateregardless. This causeslen(rowTypes) < len(sqls), which will break any downstream consumer that indexesrowTypesalongsidesqls.🐛 Proposed fix
if w.cfg.IsTiDB { for _, rows := range updateRows { s, v := w.genUpdateSQL(rows...) sqls = append(sqls, s...) values = append(values, v...) - rowTypes = append(rowTypes, common.RowTypeUpdate) + for range s { + rowTypes = append(rowTypes, common.RowTypeUpdate) + } }
|
@wk989898: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #3889
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
New Features
Improvements