Skip to content

metrics: add DML rows affected metric#3909

Open
wk989898 wants to merge 20 commits intopingcap:masterfrom
wk989898:metric-affected
Open

metrics: add DML rows affected metric#3909
wk989898 wants to merge 20 commits intopingcap:masterfrom
wk989898:metric-affected

Conversation

@wk989898
Copy link
Collaborator

@wk989898 wk989898 commented Jan 4, 2026

What problem does this PR solve?

Issue Number: close #3889

What is changed and how it works?

  • New DML Rows Affected Metric: Introduced ExecDMLEventRowsAffectedCounter to track the number of rows affected by DML operations (insert, update, delete) in the sink, providing granular insights into data changes.
  • Grafana Dashboard Integration: Added a new panel named "Sink Event Row Affected Count / s" to the ticdc_new_arch.json, ticdc_new_arch_next_gen.json, and ticdc_new_arch_with_keyspace_name.json Grafana dashboards to visualize this new metric, enhancing monitoring capabilities.
  • Enhanced DML Tracking Logic: Modified the DML preparation and execution logic in mysql_writer_dml.go to record both actual and expected affected rows for each DML type, leveraging a new rowTypes field in preparedDMLs.
  • Statistics Module Update: The pkg/metrics/statistics.go file now includes dedicated functions (RecordTotalRowsAffected and RecordRowsAffected) to accurately record these new row affected metrics, ensuring comprehensive data collection.

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

  • New Features

    • Grafana panels: "EventService Output Different DML Event Types / s" and "Row Affected Count / s" visualizations added.
  • Improvements

    • New observability metrics: per-mode DML-type counts and per-row-type affected-row counts exposed.
    • MySQL sink now tracks row-type information across batching, improving accuracy of affected-row metrics and dashboard reports.

@ti-chi-bot ti-chi-bot bot added the release-note Denotes a PR that will be considered when it comes time to generate release notes. label Jan 4, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 4, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign sdojjy for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Jan 4, 2026
@gemini-code-assist
Copy link

Summary of Changes

Hello @wk989898, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request enhances TiCDC's monitoring capabilities by introducing a new metric that precisely tracks the number of rows affected by Data Manipulation Language (DML) operations as they are flushed to the downstream system. This provides critical insights into the volume and type of data changes being replicated, improving observability and enabling more accurate performance analysis of the data synchronization process. The new metric is also integrated into the existing Grafana dashboards for immediate visualization.

Highlights

  • New DML Rows Affected Metric: Introduced ExecDMLEventRowsAffectedCounter to track the number of rows affected by DML operations (insert, update, delete) in the sink, providing granular insights into data changes.
  • Grafana Dashboard Integration: Added a new panel named "Sink Event Row Affected Count / s" to the ticdc_new_arch.json, ticdc_new_arch_next_gen.json, and ticdc_new_arch_with_keyspace_name.json Grafana dashboards to visualize this new metric, enhancing monitoring capabilities.
  • Enhanced DML Tracking Logic: Modified the DML preparation and execution logic in mysql_writer_dml.go to record both actual and expected affected rows for each DML type, leveraging a new rowTypes field in preparedDMLs.
  • Statistics Module Update: The pkg/metrics/statistics.go file now includes dedicated functions (RecordTotalRowsAffected and RecordRowsAffected) to accurately record these new row affected metrics, ensuring comprehensive data collection.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new metric, ticdc_sink_dml_event_affected_row_count, to monitor the number of rows affected by DML operations in the sink. The changes are well-implemented, including the definition of the new Prometheus counter, the logic in the MySQL sink to track DML row types, and the recording of both expected and actual affected row counts. The Grafana dashboards have also been updated to visualize this new metric. I have a couple of suggestions for pkg/sink/mysql/mysql_writer_dml.go to improve code maintainability and cleanliness.

// Considering the batch algorithm in safe mode is O(n^3), which n is the number of rows.
// So we need to limit the number of rows in one batch to avoid performance issues.
func (w *Writer) generateBatchSQL(events []*commonEvent.DMLEvent) ([]string, [][]interface{}) {
func (w *Writer) generateBatchSQL(events []*commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Many SQL generation functions in this file, such as this one, now return three separate slices: []string, [][]interface{}, and []common.RowType. This makes the function signatures complex and harder to maintain. To improve code clarity, consider encapsulating these related values into a struct. For example:

type dmlGroup struct {
    sqls     []string
    values   [][]interface{}
    rowTypes []common.RowType
}

This would simplify function signatures to something like func (w *Writer) generateBatchSQL(...) *dmlGroup. You could even potentially reuse or adapt the existing preparedDMLs struct for this purpose, which would further improve consistency.

Comment on lines 780 to 784
// if err != nil {
// log.Warn("get rows affected rows failed", zap.Error(err))
// } else {
// w.statistics.RecordRowsAffected(rowsAffected, dmls.rowTypes[i])
// }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This commented-out code appears to be a remnant from development and should be removed to keep the codebase clean.

@wk989898 wk989898 added the do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. label Jan 8, 2026
@wk989898 wk989898 removed the do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. label Jan 15, 2026
@lidezhu
Copy link
Collaborator

lidezhu commented Jan 15, 2026

Please paste the screenshots of your newly added metrics in the pr description.

if err != nil {
return err
}
shouldSplit, err = event.IsUKChanged(rawEvent, p.currentTxn.CurrentDMLEvent.TableInfo)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a type like 'PkChanged'?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. If the PK changed, there should be two raw events.

Signed-off-by: wk989898 <nhsmwk@gmail.com>
Signed-off-by: wk989898 <nhsmwk@gmail.com>
Signed-off-by: wk989898 <nhsmwk@gmail.com>
Signed-off-by: wk989898 <nhsmwk@gmail.com>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 12, 2026

📝 Walkthrough

Walkthrough

Adds Grafana panels for DML event types and affected rows; introduces a new Prometheus counter for EventService DML types and a sink counter for affected rows; threads a mode parameter through event scanning and DML processing; propagates per-row-type metadata through MySQL sink SQL generation and tests.

Changes

Cohort / File(s) Summary
Grafana dashboards
metrics/grafana/ticdc_new_arch.json, metrics/nextgengrafana/ticdc_new_arch_next_gen.json, metrics/nextgengrafana/ticdc_new_arch_with_keyspace_name.json
Added panels: "EventService Output Different DML Event Types / s" and "Row Affected Count / s" (PromQL targets). Panels inserted in multiple dashboard sections (duplication observed).
EventService flow & raw entry classification
pkg/common/kv_entry.go, pkg/eventservice/event_scanner.go, pkg/eventservice/metrics_collector.go
Added RawKVEntry.GetType(), threaded mode int64 into scanAndMergeEvents and dmlProcessor, updated constructor/signatures to accept mode, and added helper to emit mode-aware DML-type metrics.
Prometheus metrics & statistics API
pkg/metrics/event_service.go, pkg/metrics/sink.go, pkg/metrics/statistics.go
Added EventServiceSendDMLTypeCount (labels: mode,dml_type) and ExecDMLEventRowsAffectedCounter (labels: keyspace,changefeed,type,row_type); added RecordRowsAffected and RecordTotalRowsAffected to update affected-row metrics and registered the new metrics.
MySQL sink: DML generation, batching, and SQL builders
pkg/sink/mysql/mysql_writer_dml.go, pkg/sink/mysql/mysql_writer_dml_batch.go, pkg/sink/mysql/sql_builder.go, pkg/sink/mysql/mysql_writer_dml_test.go
Propagated per-event rowTypes through SQL generation and batching functions (many function signatures now return []common.RowType); added preparedDMLs.rowTypes and RowsAffected(); updated callers and tests to handle the extra return and to compute rows-affected.
Other small changes
pkg/metrics/..., pkg/eventservice/...
Metric registration and emission call sites adjusted to include mode and row-type mappings; small control-flow updates to ensure mode-aware metrics are emitted consistently.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Source as EventSource
  participant ES as EventService
  participant Metrics as Metrics
  participant Sink as Sink

  Source->>ES: send raw KV events (include mode)
  ES->>ES: classify event via RawKVEntry.GetType()
  ES->>Metrics: update EventServiceSendDMLTypeCount(mode, dml_type)
  ES->>Sink: forward DML events (include mode, raw type)
  Sink->>Sink: generate batched SQLs and collect rowTypes
  Sink->>Metrics: update ExecDMLEventRowsAffectedCounter(keyspace,changefeed,type,row_type)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

lgtm, approved

Suggested reviewers

  • lidezhu
  • hongyunyan
  • asddongmen

Poem

🐇 I hopped through events, mode in tow,
Counted inserts, updates, deletes as they go.
Batches baked with row-types known,
Metrics sing and dashboards grown,
A carrot chart to watch metrics glow. 🥕

🚥 Pre-merge checks | ✅ 3 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Out of Scope Changes check ❓ Inconclusive While the PR primarily focuses on adding DML rows affected metrics as specified in issue #3889, it also introduces an additional EventServiceSendDMLTypeCount metric and related infrastructure not explicitly mentioned in the linked issue requirements. Clarify whether the EventServiceSendDMLTypeCount metric additions and EventService-related changes (event_scanner.go, metrics_collector.go, event_service.go) represent scope creep or are complementary enhancements intended to support the primary objective.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title 'metrics: add DML rows affected metric' clearly and concisely describes the main change: introducing a new metric for DML rows affected, which aligns with the primary objective of the PR.
Description check ✅ Passed The PR description provides a clear issue reference (close #3889), explains the changes made (new metric, dashboard integration, enhanced tracking logic, statistics updates), and follows the template structure with sections for problem, changes, test checklist, and release notes.
Linked Issues check ✅ Passed The PR successfully implements the core requirements from issue #3889: it adds the 'affected rows' metric (ExecDMLEventRowsAffectedCounter) and implements related tracking logic for DML operations (insert, update, delete) to improve observability as requested.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Signed-off-by: wk989898 <nhsmwk@gmail.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
pkg/sink/mysql/sql_builder.go (1)

144-150: ⚠️ Potential issue | 🔴 Critical

Bug: rowTypes is not cleared in reset(), causing stale data when the object is reused from dmlsPool.

Since preparedDMLs instances are pooled via dmlsPool (line 134), omitting rowTypes from reset() means a recycled object will carry over the previous caller's row types, inflating RowsAffected() and corrupting metrics.

Proposed fix
 func (d *preparedDMLs) reset() {
 	d.sqls = d.sqls[:0]
 	d.values = d.values[:0]
 	d.tsPairs = d.tsPairs[:0]
+	d.rowTypes = d.rowTypes[:0]
 	d.rowCount = 0
 	d.approximateSize = 0
 }
pkg/sink/mysql/mysql_writer_dml.go (1)

112-123: ⚠️ Potential issue | 🔴 Critical

genActiveActiveSQL does not return rowTypes — causes index-out-of-bounds in sequenceExecute.

When EnableActiveActive is true (line 112), genActiveActiveSQL returns only ([]string, [][]interface{}) without row types. The rowTypesList remains empty, but dmls.sqls is populated. Later in sequenceExecute at line 777, dmls.rowTypes[i] will panic.

Either genActiveActiveSQL needs to be updated to return []common.RowType, or sequenceExecute needs a bounds check on dmls.rowTypes.

Proposed fix — add bounds check in sequenceExecute
 		rowsAffected, err := res.RowsAffected()
 		if err != nil {
 			log.Warn("get rows affected rows failed", zap.Error(err))
-		} else {
+		} else if i < len(dmls.rowTypes) {
 			w.statistics.RecordRowsAffected(rowsAffected, dmls.rowTypes[i])
+		} else {
+			w.statistics.RecordTotalRowsAffected(rowsAffected, 1)
 		}
🤖 Fix all issues with AI agents
In `@pkg/eventservice/event_scanner.go`:
- Around line 760-778: The change causes event.IsUKChanged to be called for all
updates, changing semantics and cost when p.outputRawChangeEvent is true; revert
to the original behavior by short-circuiting when outputRawChangeEvent is true:
call updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, false) and return
p.currentTxn.AppendRow(rawEvent, p.mounter.DecodeToChunk, p.filter) without
invoking event.IsUKChanged; only call event.IsUKChanged and
updateMetricEventServiceSendDMLTypeCount with the real shouldSplit when
outputRawChangeEvent is false so UK-change decoding happens only when splitting
is actually allowed.

In `@pkg/metrics/event_service.go`:
- Around line 173-178: The Help string for the Prometheus metric created in
EventServiceSendDMLTypeCount (prometheus.NewCounterVec with CounterOpts) has a
grammar mistake; update the CounterOpts.Help value from "The number of different
dml events type sent by the event service" to "The number of different dml event
types sent by the event service" so the metric help text reads correctly.

In `@pkg/metrics/statistics.go`:
- Around line 111-124: The Close() method currently omits cleanup for
ExecDMLEventRowsAffectedCounter causing metric label leaks; update Close() to
call ExecDMLEventRowsAffectedCounter.Delete(...) for the same label sets you use
elsewhere by iterating/removing combinations of keyspace
(b.changefeedID.Keyspace()), changefeedID (b.changefeedID.Name()), the
"actual"/"expected" value dimension and the four row_type values ("total",
"insert", "update", "delete") so that any time-series created by
RecordTotalRowsAffected and RecordRowsAffected are deleted when Close() runs.

In `@pkg/sink/mysql/mysql_writer_dml.go`:
- Around line 571-589: There is a duplicate method generateNormalSQLs; remove
this redundant declaration from this file and rely on the single canonical
implementation in mysql_writer_dml_batch.go (or vice versa if you prefer to keep
this one) so the build no longer sees two definitions. If you remove this file's
generateNormalSQLs, ensure any callers still resolve (they can call the
remaining generateNormalSQLs) and, if needed, replace direct logic here with a
call to the kept function (generateNormalSQLs) or to the helper
generateNormalSQL to avoid duplicating behavior; also clean up any now-unused
imports/variables in this file.
- Around line 366-371: The current conflict-detection path in
mysql_writer_dml.go logs full SQL and values with log.Info immediately before
log.Panic (see variables/functions nextRowType, compareKeys, rowKey, rowLists,
and generateBatchSQLsPerEvent), which is noisy and may leak sensitive data;
change the log.Info call to log.Error or log.Warn and remove or redact sensitive
payloads (avoid logging full values array or replace with a safe summary like
value counts/types or a hashed/redacted representation) while keeping contextual
fields (e.g., writer id, i, j, Events metadata) so the panic still includes
useful non-PII diagnostics.
- Around line 870-891: genUpdateSQL may return multiple SQL strings but the TiDB
branch only appends a single common.RowTypeUpdate per call, causing len(sqls) >
len(rowTypes) and panics in sequenceExecute; fix by appending one RowTypeUpdate
per SQL returned: after s, v := w.genUpdateSQL(rows...) and appending s and v to
sqls/values, append len(s) copies of common.RowTypeUpdate to rowTypes (e.g.,
loop or use slice multiplication) so rowTypes, sqls, and values stay aligned.
- Around line 813-818: The current code calls res.RowsAffected() and passes that
to w.statistics.RecordTotalRowsAffected, but res.RowsAffected() on
multi-statement transactions returns only the final statement's count (often 0);
change the logic to use the aggregated count from dmls.RowsAffected() (or if you
can access the driver raw result, use mysql.Result.AllRowsAffected()) when
calling RecordTotalRowsAffected so total rows across all statements are
recorded; update the block around res.RowsAffected() (referencing
res.RowsAffected(), dmls.RowsAffected(), and
w.statistics.RecordTotalRowsAffected()) to prefer dmls.RowsAffected() and fall
back to per-statement totals only if available from the raw mysql result.
🧹 Nitpick comments (7)
pkg/sink/mysql/sql_builder.go (1)

134-142: Pool's New func doesn't pre-allocate rowTypes, unlike the other slices.

For consistency and to avoid an initial nil-slice allocation on first append, consider pre-allocating rowTypes the same way sqls, values, and tsPairs are.

Proposed fix
 var dmlsPool = sync.Pool{
 	New: func() interface{} {
 		return &preparedDMLs{
 			sqls:    make([]string, 0, 128),
 			values:  make([][]interface{}, 0, 128),
+			rowTypes: make([]common.RowType, 0, 128),
 			tsPairs: make([]tsPair, 0, 128),
 		}
 	},
 }
pkg/common/kv_entry.go (1)

78-93: Minor: redundant condition in else if.

On line 84, since we already know len(v.OldValue) != 0 (the if on line 82 was false), the len(v.OldValue) > 0 check is always true here. You can simplify to just len(v.Value) > 0.

Also note: an OpTypePut with OldValue present but Value empty will fall through to "unknown". This matches the existing IsInsert/IsUpdate helpers, so it seems intentional, but worth a comment if that state is truly impossible.

Simplification
 func (v *RawKVEntry) GetType() string {
 	switch v.OpType {
 	case OpTypePut:
 		if len(v.OldValue) == 0 {
 			return "insert"
-		} else if len(v.OldValue) > 0 && len(v.Value) > 0 {
+		} else if len(v.Value) > 0 {
 			return "update"
 		}
 	case OpTypeDelete:
pkg/metrics/statistics.go (1)

111-116: Exported methods missing doc comments.

RecordTotalRowsAffected and RecordRowsAffected are exported but lack Go doc comments. The rest of the exported methods in this file (RecordBatchExecution, RecordDDLExecution, Close) all have doc comments.

Add doc comments
+// RecordTotalRowsAffected records the total actual and expected rows affected by DML execution.
 func (b *Statistics) RecordTotalRowsAffected(actualRowsAffected, expectedRowsAffected int64) {
+// RecordRowsAffected records the actual rows affected for a specific row type and updates the total counters.
 func (b *Statistics) RecordRowsAffected(rowsAffected int64, rowType common.RowType) {
pkg/sink/mysql/mysql_writer_dml.go (1)

681-685: tx.Commit() error not wrapped with errors.Trace.

Per coding guidelines, errors from library calls should be wrapped immediately. The error from tx.Commit() is returned bare.

Proposed fix
 		if err = tx.Commit(); err != nil {
-			return 0, 0, err
+			return 0, 0, errors.Trace(err)
 		}

As per coding guidelines, "When an error comes from a third-party or library call in Go, wrap it immediately with errors.Trace(err) or errors.WrapError(...) to attach a stack trace".

pkg/sink/mysql/mysql_writer_dml_test.go (2)

149-156: rowTypes values not asserted beyond length checks.

Throughout the tests, rowTypes is captured but only its length is sometimes asserted (e.g., line 152). The actual content of rowTypes (e.g., whether it contains RowTypeInsert, RowTypeDelete, RowTypeUpdate) is never verified. Adding value assertions would strengthen the test coverage for the new metric propagation.

For example, at line 152 you could additionally check:

require.Equal(t, common.RowTypeInsert, rowTypes[0])
require.Equal(t, common.RowTypeInsert, rowTypes[1])

160-163: Missing len(rowTypes) assertion for safe-mode batch SQL test.

Line 160 captures rowTypes from generateBatchSQL in safe mode, but unlike the unsafe-mode test at line 152, there is no require.Equal(t, 1, len(rowTypes)) assertion here. This should be consistent with other test cases.

Proposed fix
 	sql, args, rowTypes = writer.generateBatchSQL([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3})
 	require.Equal(t, 1, len(sql))
 	require.Equal(t, 1, len(args))
+	require.Equal(t, 1, len(rowTypes))
 	require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?),(?,?),(?,?)", sql[0])
pkg/eventservice/event_scanner.go (1)

143-143: Redundant mode parameter — scanAndMergeEvents already has access via receiver.

scanAndMergeEvents is a method on *eventScanner, which already stores mode as a field (s.mode). Passing it as an explicit parameter is unnecessary.

♻️ Proposed simplification
-func (s *eventScanner) scanAndMergeEvents(
-	session *session,
-	merger *eventMerger,
-	iter eventstore.EventIterator,
-	mode int64,
-) (bool, error) {
+func (s *eventScanner) scanAndMergeEvents(
+	session *session,
+	merger *eventMerger,
+	iter eventstore.EventIterator,
+) (bool, error) {
 	tableID := session.dataRange.Span.TableID
 	dispatcher := session.dispatcherStat
-	processor := newDMLProcessor(s.mounter, s.schemaGetter, dispatcher.filter, dispatcher.info.IsOutputRawChangeEvent(), mode)
+	processor := newDMLProcessor(s.mounter, s.schemaGetter, dispatcher.filter, dispatcher.info.IsOutputRawChangeEvent(), s.mode)

And update the call site:

-	interrupted, err := s.scanAndMergeEvents(sess, merger, iter, s.mode)
+	interrupted, err := s.scanAndMergeEvents(sess, merger, iter)

Also applies to: 186-194

Comment on lines 760 to 778
rawType := rawEvent.GetType()
if !rawEvent.IsUpdate() {
updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, false)
return p.currentTxn.AppendRow(rawEvent, p.mounter.DecodeToChunk, p.filter)
}

var (
shouldSplit bool
err error
)
if !p.outputRawChangeEvent {
shouldSplit, err = event.IsUKChanged(rawEvent, p.currentTxn.CurrentDMLEvent.TableInfo)
if err != nil {
return err
}
shouldSplit, err = event.IsUKChanged(rawEvent, p.currentTxn.CurrentDMLEvent.TableInfo)
if err != nil {
return err
}
updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, shouldSplit)

if !shouldSplit {
if p.outputRawChangeEvent && !shouldSplit {
return p.currentTxn.AppendRow(rawEvent, p.mounter.DecodeToChunk, p.filter)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Behavior change: updates with UK changes are now split even when outputRawChangeEvent is true.

Previously, when outputRawChangeEvent was true, update events were always appended directly without checking for UK changes. Now, IsUKChanged is always evaluated, and if the UK changed, the update falls through to the split-into-delete+insert path (lines 780–788) regardless of outputRawChangeEvent.

This is a semantic change beyond adding metrics:

  1. Correctness concern: Consumers expecting raw change events will now receive split delete+insert pairs when UK changes, which may break downstream expectations.
  2. Performance: IsUKChanged decodes both old and new row values for every update event, even when outputRawChangeEvent=true where previously no decoding was needed. This could noticeably increase CPU/memory cost on update-heavy workloads.

Please confirm this behavioral change is intentional. If only the metric is needed and the original append behavior for outputRawChangeEvent=true should be preserved, the logic would need restructuring:

♻️ Suggested restructuring if the old behavior should be preserved
 	rawType := rawEvent.GetType()
 	if !rawEvent.IsUpdate() {
 		updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, false)
 		return p.currentTxn.AppendRow(rawEvent, p.mounter.DecodeToChunk, p.filter)
 	}

-	var (
-		shouldSplit bool
-		err         error
-	)
-	shouldSplit, err = event.IsUKChanged(rawEvent, p.currentTxn.CurrentDMLEvent.TableInfo)
-	if err != nil {
-		return err
-	}
-	updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, shouldSplit)
-
-	if p.outputRawChangeEvent && !shouldSplit {
-		return p.currentTxn.AppendRow(rawEvent, p.mounter.DecodeToChunk, p.filter)
-	}
+	shouldSplit, err := event.IsUKChanged(rawEvent, p.currentTxn.CurrentDMLEvent.TableInfo)
+	if err != nil {
+		return err
+	}
+	updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, shouldSplit)
+
+	if p.outputRawChangeEvent {
+		return p.currentTxn.AppendRow(rawEvent, p.mounter.DecodeToChunk, p.filter)
+	}
🤖 Prompt for AI Agents
In `@pkg/eventservice/event_scanner.go` around lines 760 - 778, The change causes
event.IsUKChanged to be called for all updates, changing semantics and cost when
p.outputRawChangeEvent is true; revert to the original behavior by
short-circuiting when outputRawChangeEvent is true: call
updateMetricEventServiceSendDMLTypeCount(p.mode, rawType, false) and return
p.currentTxn.AppendRow(rawEvent, p.mounter.DecodeToChunk, p.filter) without
invoking event.IsUKChanged; only call event.IsUKChanged and
updateMetricEventServiceSendDMLTypeCount with the real shouldSplit when
outputRawChangeEvent is false so UK-change decoding happens only when splitting
is actually allowed.

Comment on lines +173 to +178
EventServiceSendDMLTypeCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "event_service",
Name: "send_dml_type_count",
Help: "The number of different dml events type sent by the event service",
}, []string{"mode", "dml_type"})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Minor grammar nit in Help text.

"The number of different dml events type sent by the event service" should be "The number of different dml event types sent by the event service".

Proposed fix
 	EventServiceSendDMLTypeCount = prometheus.NewCounterVec(prometheus.CounterOpts{
 		Namespace: "ticdc",
 		Subsystem: "event_service",
 		Name:      "send_dml_type_count",
-		Help:      "The number of different dml events type sent by the event service",
+		Help:      "The number of different dml event types sent by the event service",
 	}, []string{"mode", "dml_type"})
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
EventServiceSendDMLTypeCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "event_service",
Name: "send_dml_type_count",
Help: "The number of different dml events type sent by the event service",
}, []string{"mode", "dml_type"})
EventServiceSendDMLTypeCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "event_service",
Name: "send_dml_type_count",
Help: "The number of different dml event types sent by the event service",
}, []string{"mode", "dml_type"})
🤖 Prompt for AI Agents
In `@pkg/metrics/event_service.go` around lines 173 - 178, The Help string for the
Prometheus metric created in EventServiceSendDMLTypeCount
(prometheus.NewCounterVec with CounterOpts) has a grammar mistake; update the
CounterOpts.Help value from "The number of different dml events type sent by the
event service" to "The number of different dml event types sent by the event
service" so the metric help text reads correctly.

Comment on lines +111 to +124
func (b *Statistics) RecordTotalRowsAffected(actualRowsAffected, expectedRowsAffected int64) {
keyspace := b.changefeedID.Keyspace()
changefeedID := b.changefeedID.Name()
ExecDMLEventRowsAffectedCounter.WithLabelValues(keyspace, changefeedID, "actual", "total").Add(float64(actualRowsAffected))
ExecDMLEventRowsAffectedCounter.WithLabelValues(keyspace, changefeedID, "expected", "total").Add(float64(expectedRowsAffected))
}

func (b *Statistics) RecordRowsAffected(rowsAffected int64, rowType common.RowType) {
keyspace := b.changefeedID.Keyspace()
changefeedID := b.changefeedID.Name()
ExecDMLEventRowsAffectedCounter.WithLabelValues(keyspace, changefeedID, "actual", rowType.String()).Add(float64(rowsAffected))
ExecDMLEventRowsAffectedCounter.WithLabelValues(keyspace, changefeedID, "expected", rowType.String()).Add(1)
b.RecordTotalRowsAffected(rowsAffected, 1)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find RowType constants and their String() method
fd -t f '\.go$' | xargs rg -l 'RowType' | head -20

Repository: pingcap/ticdc

Length of output: 676


🏁 Script executed:

#!/bin/bash
# Find the RowType definition and String() method
rg 'type RowType' -A 20 --type=go

Repository: pingcap/ticdc

Length of output: 872


🏁 Script executed:

#!/bin/bash
# Find the Close() method in statistics.go
rg -n 'func.*Close' pkg/metrics/statistics.go -A 30

Repository: pingcap/ticdc

Length of output: 922


🏁 Script executed:

#!/bin/bash
# Check what counters are cleaned up in Close() and find ExecDMLEventRowsAffectedCounter
rg -n 'ExecDMLEventRowsAffectedCounter|DeleteLabelValues' pkg/metrics/statistics.go -B 2 -A 2

Repository: pingcap/ticdc

Length of output: 1638


Add missing ExecDMLEventRowsAffectedCounter cleanup in Close().

The Close() method (lines 127–143) deletes label sets for every other counter in the statistics object (DDL histogram, DML counter, errors, bytes, etc.) but omits cleanup for ExecDMLEventRowsAffectedCounter. This causes metric label leaks after a changefeed is removed, as stale time-series for all (keyspace, changefeed, type, row_type) combinations persist in the Prometheus registry.

The four possible row_type label values are:

  • "total" (from RecordTotalRowsAffected)
  • "insert", "update", "delete" (from RowType.String())
Proposed fix — add cleanup in Close()
 func (b *Statistics) Close() {
 	keyspace := b.changefeedID.Keyspace()
 	changefeedID := b.changefeedID.Name()
+	for _, typ := range []string{"actual", "expected"} {
+		for _, rowType := range []string{"total", "insert", "update", "delete"} {
+			ExecDMLEventRowsAffectedCounter.DeleteLabelValues(keyspace, changefeedID, typ, rowType)
+		}
+	}
 	ExecDDLHistogram.DeleteLabelValues(keyspace, changefeedID)
🤖 Prompt for AI Agents
In `@pkg/metrics/statistics.go` around lines 111 - 124, The Close() method
currently omits cleanup for ExecDMLEventRowsAffectedCounter causing metric label
leaks; update Close() to call ExecDMLEventRowsAffectedCounter.Delete(...) for
the same label sets you use elsewhere by iterating/removing combinations of
keyspace (b.changefeedID.Keyspace()), changefeedID (b.changefeedID.Name()), the
"actual"/"expected" value dimension and the four row_type values ("total",
"insert", "update", "delete") so that any time-series created by
RecordTotalRowsAffected and RecordRowsAffected are deleted when Close() runs.

Comment on lines 366 to 371
if nextRowType == common.RowTypeInsert {
if compareKeys(rowKey, rowLists[j].RowKeys) {
sql, values, rowTypes := w.generateBatchSQLsPerEvent(events)
log.Info("normal sql should be", zap.Any("sql", sql), zap.Any("values", values), zap.Any("rowTypes", rowTypes), zap.Int("writerID", w.id))
log.Panic("Here are two invalid rows with the same row type and keys", zap.Any("Events", events), zap.Any("i", i), zap.Any("j", j), zap.Int("writerID", w.id))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

log.Info with SQL values on conflict detection paths — noisy and potentially sensitive.

Lines 369 and 408 use log.Info to dump SQL queries and their values right before a log.Panic. Since log.Panic will already terminate the process, this log.Info line should use log.Error or log.Warn at minimum, and consider whether logging full SQL values is appropriate in production (may contain PII).

🤖 Prompt for AI Agents
In `@pkg/sink/mysql/mysql_writer_dml.go` around lines 366 - 371, The current
conflict-detection path in mysql_writer_dml.go logs full SQL and values with
log.Info immediately before log.Panic (see variables/functions nextRowType,
compareKeys, rowKey, rowLists, and generateBatchSQLsPerEvent), which is noisy
and may leak sensitive data; change the log.Info call to log.Error or log.Warn
and remove or redact sensitive payloads (avoid logging full values array or
replace with a safe summary like value counts/types or a hashed/redacted
representation) while keeping contextual fields (e.g., writer id, i, j, Events
metadata) so the panic still includes useful non-PII diagnostics.

Comment on lines 813 to 818
rowsAffected, err := res.RowsAffected()
if err != nil {
log.Warn("get rows affected rows failed", zap.Error(err))
} else {
w.statistics.RecordTotalRowsAffected(rowsAffected, dmls.RowsAffected())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

go-sql-driver/mysql RowsAffected multi-statement execution behavior

💡 Result:

With github.com/go-sql-driver/mysql, when you enable multiStatements=true and execute multiple SQL statements in a single Exec(...) call, the database/sql-level sql.Result only exposes one LastInsertId() / RowsAffected() value (i.e., not a per-statement breakdown). To get per-statement affected-row counts (and insert IDs), you must drop down to the driver result type and read the slices of values. (github.com)

Concretely:

  • Enable multi-statements via DSN: ...?multiStatements=true. (github.com)
  • For result sets from multiple statements, iterate with Rows.NextResultSet() to reach the 2nd+ statement’s results. (github.com)
  • For affected rows / last insert ids for multiple statements, use sql.Conn.Raw() and cast the returned result to mysql.Result, then call:
    • AllRowsAffected()[]int64 (one entry per statement)
    • AllLastInsertIds()[]int64 (one entry per statement) (github.com)
  • Also note the driver restriction: with multiStatements, ? parameters may only appear in the first statement (unless using interpolateParams, with caveats). (github.com)

Citations:


🏁 Script executed:

# Find the actual code file and examine the context
fd "mysql_writer_dml.go" --exec cat -n {} \; | head -100

Repository: pingcap/ticdc

Length of output: 4433


🏁 Script executed:

# Search for multiStmtExecute function to understand how multi-statement execution is handled
rg "multiStmtExecute" -B 5 -A 20

Repository: pingcap/ticdc

Length of output: 7802


🏁 Script executed:

# Get the specific lines 813-818 from mysql_writer_dml.go
sed -n '800,830p' pkg/sink/mysql/mysql_writer_dml.go

Repository: pingcap/ticdc

Length of output: 1267


🏁 Script executed:

# Search for all RowsAffected calls in the mysql writer files
rg "RowsAffected" pkg/sink/mysql/ -B 3 -A 3

Repository: pingcap/ticdc

Length of output: 2008


🌐 Web query:

go-sql-driver/mysql RowsAffected return value last statement aggregate

💡 Result:

With go-sql-driver/mysql, sql.Result.RowsAffected() reflects MySQL’s “affected rows” for the last statement only (because the underlying API reports it “for the last statement”). (dev.mysql.com)

Multi-statement queries (multiStatements=true)

If you send multiple statements in one Exec (e.g., UPDATE ...; UPDATE ...;), RowsAffected() will correspond to the final statement’s affected-row count, not an aggregate/sum across statements. (github.com)

If you need per-statement counts (or want to aggregate them yourself), the driver exposes a MySQL-specific result via sql.Conn.Raw() and mysql.Result.AllRowsAffected() (slice of counts, one per statement). (github.com)

“Aggregate” SELECT (e.g., SELECT COUNT(*) ...)

Even though MySQL can report “affected rows” for a SELECT like “number of rows returned,” an aggregate query like SELECT COUNT(*) ... returns exactly 1 row (the row containing the count). So if you (mis)use Exec+RowsAffected() on such a query, you’ll typically see 1, not the counted value. You must read the count from the result set (use QueryRow/Scan). (dev.mysql.com)

Citations:


RowsAffected() on multi-statement transactions returns only the final statement's value, not the aggregate.

With BEGIN; stmt1; stmt2; ...; COMMIT; execution, res.RowsAffected() returns the affected rows of the last statement (COMMIT), which is 0. The RecordTotalRowsAffected() call should use dmls.RowsAffected() instead of res.RowsAffected() to accurately track total rows affected across all statements, or access per-statement counts via mysql.Result.AllRowsAffected() if available from the connection's raw result.

🤖 Prompt for AI Agents
In `@pkg/sink/mysql/mysql_writer_dml.go` around lines 813 - 818, The current code
calls res.RowsAffected() and passes that to
w.statistics.RecordTotalRowsAffected, but res.RowsAffected() on multi-statement
transactions returns only the final statement's count (often 0); change the
logic to use the aggregated count from dmls.RowsAffected() (or if you can access
the driver raw result, use mysql.Result.AllRowsAffected()) when calling
RecordTotalRowsAffected so total rows across all statements are recorded; update
the block around res.RowsAffected() (referencing res.RowsAffected(),
dmls.RowsAffected(), and w.statistics.RecordTotalRowsAffected()) to prefer
dmls.RowsAffected() and fall back to per-statement totals only if available from
the raw mysql result.

Comment on lines 870 to 891
if len(updateRows) > 0 {
if w.cfg.IsTiDB {
for _, rows := range updateRows {
s, v := w.genUpdateSQL(rows...)
sqls = append(sqls, s...)
values = append(values, v...)
rowTypes = append(rowTypes, common.RowTypeUpdate)
}
// The behavior of update statement differs between TiDB and MySQL.
// So we don't use batch update statement when downstream is MySQL.
// Ref:https://docs.pingcap.com/tidb/stable/sql-statement-update#mysql-compatibility
} else {
for _, rows := range updateRows {
for _, row := range rows {
sql, value := row.GenSQL(sqlmodel.DMLUpdate)
sqls = append(sqls, sql)
values = append(values, value)
rowTypes = append(rowTypes, common.RowTypeUpdate)
}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

rowTypes / sqls length mismatch when genUpdateSQL returns multiple SQLs — will cause index-out-of-bounds panic.

genUpdateSQL (line 987) can return multiple SQL strings when the row data size exceeds MaxMultiUpdateRowSize. However, only one RowTypeUpdate is appended to rowTypes per batch, while all returned SQLs are appended to sqls. This means len(sqls) > len(rowTypes), and sequenceExecute (line 777: dmls.rowTypes[i]) will panic with an index-out-of-bounds error.

The same issue exists for the non-TiDB update path (lines 882-889), though there each row produces one SQL and one rowType, so it's correct. The TiDB path at line 872–877 is the problem.

Proposed fix
 	if w.cfg.IsTiDB {
 		for _, rows := range updateRows {
 			s, v := w.genUpdateSQL(rows...)
 			sqls = append(sqls, s...)
 			values = append(values, v...)
-			rowTypes = append(rowTypes, common.RowTypeUpdate)
+			for range s {
+				rowTypes = append(rowTypes, common.RowTypeUpdate)
+			}
 		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if len(updateRows) > 0 {
if w.cfg.IsTiDB {
for _, rows := range updateRows {
s, v := w.genUpdateSQL(rows...)
sqls = append(sqls, s...)
values = append(values, v...)
rowTypes = append(rowTypes, common.RowTypeUpdate)
}
// The behavior of update statement differs between TiDB and MySQL.
// So we don't use batch update statement when downstream is MySQL.
// Ref:https://docs.pingcap.com/tidb/stable/sql-statement-update#mysql-compatibility
} else {
for _, rows := range updateRows {
for _, row := range rows {
sql, value := row.GenSQL(sqlmodel.DMLUpdate)
sqls = append(sqls, sql)
values = append(values, value)
rowTypes = append(rowTypes, common.RowTypeUpdate)
}
}
}
}
if len(updateRows) > 0 {
if w.cfg.IsTiDB {
for _, rows := range updateRows {
s, v := w.genUpdateSQL(rows...)
sqls = append(sqls, s...)
values = append(values, v...)
for range s {
rowTypes = append(rowTypes, common.RowTypeUpdate)
}
}
// The behavior of update statement differs between TiDB and MySQL.
// So we don't use batch update statement when downstream is MySQL.
// Ref:https://docs.pingcap.com/tidb/stable/sql-statement-update#mysql-compatibility
} else {
for _, rows := range updateRows {
for _, row := range rows {
sql, value := row.GenSQL(sqlmodel.DMLUpdate)
sqls = append(sqls, sql)
values = append(values, value)
rowTypes = append(rowTypes, common.RowTypeUpdate)
}
}
}
}
🤖 Prompt for AI Agents
In `@pkg/sink/mysql/mysql_writer_dml.go` around lines 870 - 891, genUpdateSQL may
return multiple SQL strings but the TiDB branch only appends a single
common.RowTypeUpdate per call, causing len(sqls) > len(rowTypes) and panics in
sequenceExecute; fix by appending one RowTypeUpdate per SQL returned: after s, v
:= w.genUpdateSQL(rows...) and appending s and v to sqls/values, append len(s)
copies of common.RowTypeUpdate to rowTypes (e.g., loop or use slice
multiplication) so rowTypes, sqls, and values stay aligned.

Signed-off-by: wk989898 <nhsmwk@gmail.com>
Signed-off-by: wk989898 <nhsmwk@gmail.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pkg/sink/mysql/mysql_writer_dml.go (1)

92-107: ⚠️ Potential issue | 🟠 Major

Active-active path omits rowTypes tracking — metrics will silently fail.

The active-active SQL generation path (genActiveActiveSQL and its callees) returns only ([]string, [][]interface{}) while all other paths return ([]string, [][]interface{}, []common.RowType). At line 96, genActiveActiveSQL is called but only two values are captured, leaving rowTypesList uninitialized. Line 106 then appends nil to dmls.rowTypes, creating a length mismatch: dmls.sqls will be populated while dmls.rowTypes remains empty. The RowsAffected() method (sql_builder.go:120) will iterate an empty slice and return 0, causing metrics collection to silently fail for active-active replication.

Update genActiveActiveSQL, generateActiveActiveNormalSQLs, and generateActiveActiveBatchSQL to return []common.RowType alongside the current return values, and capture all three values at line 96.

Comment on lines 411 to 581
func (w *Writer) generateBatchSQLInUnSafeMode(events []*commonEvent.DMLEvent) ([]string, [][]interface{}, []common.RowType) {
tableInfo := events[0].TableInfo
finalRowLists, _, err := w.buildRowChangesForUnSafeBatch(events, tableInfo)
if err != nil {
sql, values := w.generateBatchSQLsPerEvent(events)
log.Info("normal sql should be", zap.Any("sql", sql), zap.String("values", util.RedactAny(values)), zap.Int("writerID", w.id))
log.Panic("invalid rows when generating batch SQL in unsafe mode",
zap.Error(err), zap.Any("events", events), zap.Int("writerID", w.id))
type RowChangeWithKeys struct {
RowChange *commonEvent.RowChange
RowKeys []byte
PreRowKeys []byte
}

// Step 1 extract all rows in these events to rowLists, and calcuate row key for each row(based on pk value)
rowLists := make([]RowChangeWithKeys, 0)
for _, event := range events {
for {
row, ok := event.GetNextRow()
if !ok {
event.Rewind()
break
}
rowChangeWithKeys := RowChangeWithKeys{RowChange: &row}
if !row.Row.IsEmpty() {
_, keys := genKeyAndHash(&row.Row, tableInfo)
rowChangeWithKeys.RowKeys = keys
}
if !row.PreRow.IsEmpty() {
_, keys := genKeyAndHash(&row.PreRow, tableInfo)
rowChangeWithKeys.PreRowKeys = keys
}
rowLists = append(rowLists, rowChangeWithKeys)
}
}

// Step 2 combine the rows until there is no change
// Consider we will split the event if PK is changed, so the Update will not change the PK
// for the rows comparation, there are six situations:
// 1. the previous row is Delete A, the next row is Insert A. --- we don't need to combine the rows.
// 2. the previous row is Delete A, the next row is Update xx where A . --- we don't need to combine the rows.
// 3. the previous row is Insert A, the next row is Delete A. --- remove the row of `Insert A`
// 4. the previous row is Insert A, the next row is Update xx where A -- remove the row of `Insert A`, change the row `Update A` to `Insert A`
// 5. the previous row is Update xx where A, the next row is Delete A. --- remove the row `Update xx where A`
// 6. the previous row is Update xx where A, the next row is Update xx where A. --- we need to remove the row, and change the second Update's preRows = first Update's preRows
for {
// hasUpdate to determine whether we can break the combine logic
hasUpdate := false
// flagList used to store the exists or not for this row. True means exists.
flagList := make([]bool, len(rowLists))
for i := 0; i < len(rowLists); i++ {
flagList[i] = true
}
for i := 0; i < len(rowLists); i++ {
if !flagList[i] {
continue
}
innerLoop:
for j := i + 1; j < len(rowLists); j++ {
if !flagList[j] {
continue
}
rowType := rowLists[i].RowChange.RowType
nextRowType := rowLists[j].RowChange.RowType
switch rowType {
case common.RowTypeInsert:
rowKey := rowLists[i].RowKeys
if nextRowType == common.RowTypeInsert {
if compareKeys(rowKey, rowLists[j].RowKeys) {
sql, values, rowTypes := w.generateBatchSQLsPerEvent(events)
log.Info("normal sql should be", zap.Any("sql", sql), zap.Any("values", values), zap.Any("rowTypes", rowTypes), zap.Int("writerID", w.id))
log.Panic("Here are two invalid rows with the same row type and keys", zap.Any("Events", events), zap.Any("i", i), zap.Any("j", j), zap.Int("writerID", w.id))
}
} else if nextRowType == common.RowTypeDelete {
if compareKeys(rowKey, rowLists[j].PreRowKeys) {
// remove the insert one, and break the inner loop for row i
flagList[i] = false
hasUpdate = true
break innerLoop
}
} else if nextRowType == common.RowTypeUpdate {
if !compareKeys(rowLists[j].PreRowKeys, rowLists[j].RowKeys) {
log.Panic("The Update Row have different Row Key", zap.Any("Events", events), zap.Int("writerID", w.id))
}
if compareKeys(rowKey, rowLists[j].PreRowKeys) {
// remove insert one, and break the inner loop for row i
flagList[i] = false
// change update one to insert
preRowChange := rowLists[j].RowChange
newRowChange := commonEvent.RowChange{
Row: preRowChange.Row,
RowType: common.RowTypeInsert,
}
rowLists[j] = RowChangeWithKeys{
RowChange: &newRowChange,
RowKeys: rowLists[j].RowKeys,
}
hasUpdate = true
break innerLoop
}
}
case common.RowTypeUpdate:
rowKey := rowLists[i].RowKeys
if !compareKeys(rowKey, rowLists[i].PreRowKeys) {
log.Panic("The Update Row have different Row Key", zap.Any("Events", events), zap.Int("writerID", w.id))
}
if nextRowType == common.RowTypeInsert {
if compareKeys(rowKey, rowLists[j].RowKeys) {
sql, values, rowTypes := w.generateBatchSQLsPerEvent(events)
log.Info("normal sql should be", zap.Any("sql", sql), zap.Any("values", values), zap.Any("rowTypes", rowTypes), zap.Int("writerID", w.id))
log.Panic("Here are two invalid rows with the same row type and keys", zap.Any("Events", events), zap.Any("i", i), zap.Any("j", j), zap.Int("writerID", w.id))
}
} else if nextRowType == common.RowTypeDelete {
if compareKeys(rowKey, rowLists[j].PreRowKeys) {
// remove the update one, and break the inner loop
flagList[j] = false
// change the update to delete
preRowChange := rowLists[i].RowChange
newRowChange := commonEvent.RowChange{
PreRow: preRowChange.PreRow,
RowType: common.RowTypeDelete,
}
rowLists[i] = RowChangeWithKeys{
RowChange: &newRowChange,
PreRowKeys: rowKey,
}
hasUpdate = true
break innerLoop
}
} else if nextRowType == common.RowTypeUpdate {
if compareKeys(rowKey, rowLists[j].PreRowKeys) {
if !compareKeys(rowLists[j].PreRowKeys, rowLists[j].RowKeys) {
log.Panic("The Update Row have different Row Key", zap.Any("Events", events), zap.Int("writerID", w.id))
}
// remove the first one, update the second one, then break
newRowChange := commonEvent.RowChange{
PreRow: rowLists[j].RowChange.PreRow,
Row: rowLists[j].RowChange.Row,
RowType: common.RowTypeUpdate,
}
rowLists[j] = RowChangeWithKeys{
RowChange: &newRowChange,
PreRowKeys: rowKey,
RowKeys: rowKey,
}
flagList[i] = false
hasUpdate = true
break innerLoop
}
}
}
}
}

if !hasUpdate {
// means no more changes for the rows, break and generate sqls.
break
} else {
newRowLists := make([]RowChangeWithKeys, 0, len(rowLists))
for i := 0; i < len(rowLists); i++ {
if flagList[i] {
newRowLists = append(newRowLists, rowLists[i])
}
}
rowLists = newRowLists
}

}

finalRowLists := make([]*commonEvent.RowChange, 0, len(rowLists))

for i := 0; i < len(rowLists); i++ {
finalRowLists = append(finalRowLists, rowLists[i].RowChange)
}

// Step 3. generate sqls based on finalRowLists
return w.batchSingleTxnDmls(finalRowLists, tableInfo, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if buildRowChangesForUnSafeBatch is still called anywhere
rg -n 'buildRowChangesForUnSafeBatch' --type=go

Repository: pingcap/ticdc

Length of output: 598


🏁 Script executed:

#!/bin/bash
# Get the file size first
wc -l pkg/sink/mysql/mysql_writer_dml_batch.go

# View the buildRowChangesForUnSafeBatch function (lines 246-407)
sed -n '246,407p' pkg/sink/mysql/mysql_writer_dml_batch.go

# View the generateBatchSQLInUnSafeMode function (lines 411-581)
sed -n '411,581p' pkg/sink/mysql/mysql_writer_dml_batch.go

Repository: pingcap/ticdc

Length of output: 12717


🏁 Script executed:

#!/bin/bash
# Check for log.Panic usage in the new function and nearby code
rg -n 'log\.Panic' pkg/sink/mysql/mysql_writer_dml_batch.go -A2

Repository: pingcap/ticdc

Length of output: 1445


Code duplication in generateBatchSQLInUnSafeMode duplicates entire merge algorithm; also introduces naming and error handling inconsistencies.

The new generateBatchSQLInUnSafeMode (lines 411–581) duplicates the row-merging logic already present in buildRowChangesForUnSafeBatch (lines 246–407), including the identical combine loop and switch cases (~165 lines).

Beyond duplication, the new function introduces three issues:

  1. Struct field naming violation: RowChangeWithKeys uses PascalCase fields (RowChange, RowKeys, PreRowKeys), violating Go conventions that require lowerCamelCase for field names. The original function correctly uses rowChange, rowKeys, preRowKeys.
  2. Error handling regression: The new function uses log.Panic() (5 times) for error cases instead of returning proper errors like the original function (cerror.ErrUnexpected.FastGenByArgs). This is incompatible with graceful error handling.
  3. Lost commitTs tracking: The original function preserves commitTs for each row (used for last-writer-wins semantics); the new function discards it entirely.

Note: buildRowChangesForUnSafeBatch is not dead code—it's actively called from mysql_writer_dml_active_active.go line 107.

Consider refactoring to reuse buildRowChangesForUnSafeBatch with the necessary adjustments (e.g., wrapping its return values) rather than duplicating the algorithm.

Signed-off-by: wk989898 <nhsmwk@gmail.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pkg/sink/mysql/mysql_writer_dml_batch.go (1)

504-511: ⚠️ Potential issue | 🔴 Critical

Bug: rowTypes count mismatches sqls count when genUpdateSQL returns multiple statements.

genUpdateSQL (line 623–641) returns one SQL per row when the payload exceeds MaxMultiUpdateRowSize, but line 510 always appends exactly one RowTypeUpdate regardless. This causes len(rowTypes) < len(sqls), which will break any downstream consumer that indexes rowTypes alongside sqls.

🐛 Proposed fix
 		if w.cfg.IsTiDB {
 			for _, rows := range updateRows {
 				s, v := w.genUpdateSQL(rows...)
 				sqls = append(sqls, s...)
 				values = append(values, v...)
-				rowTypes = append(rowTypes, common.RowTypeUpdate)
+				for range s {
+					rowTypes = append(rowTypes, common.RowTypeUpdate)
+				}
 			}

Signed-off-by: wk989898 <nhsmwk@gmail.com>
@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 12, 2026

@wk989898: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-unit-test-next-gen 0a45840 link false /test pull-unit-test-next-gen
pull-unit-test 0a45840 link true /test pull-unit-test
pull-error-log-review 144bd1b link true /test pull-error-log-review

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add more metrics to improve observability

2 participants