From 63d71f9624a8c97cf195c857c1c853a476ab3712 Mon Sep 17 00:00:00 2001 From: Cao Kai Date: Thu, 25 Jun 2026 16:35:34 +0800 Subject: [PATCH 1/4] fix: cross-CN shuffle join INSERT...SELECT hang on multi-CN cluster (#24919) Root cause: newShuffleJoinScopeList leaves a CNs dop join buckets in separate RemoteRun trees while the shuffle dispatch attaches to only the first one. In compileMultiUpdate (used by large INSERT...SELECT), newMergeScope sends each bucket individually -> checkPipeline detects dispatch.LocalRegs pointing to out-of-tree sibling buckets -> converts to local on coordinator -> dispatch runs on wrong CN, mispaired with compile-time cross-CN receiver FromAddr -> remote GetProcByUuid spins / merge WaitingEnd waits forever -> 5M+ rows hang. Fix: add groupShuffleBucketsByCNIfNeeded (gating: multi-CN + cross-CN dispatch present) which merges same-CN shuffle buckets and their nested dispatch into one per-CN send unit via mergeScopesByCN. When the whole group is sent as one RemoteRun unit to its target CN, the dispatch runs on the correct CN, checkPipeline returns true, and the cross-CN receiver handshake completes normally. Noop for single-CN / non-shuffle inserts. Wired into compileInsert and compileMultiUpdate (toWriteS3 + !toWriteS3, 4 call sites). Verified on 2-CN docker cluster: 5Mx2 consecutive runs complete in ~17s with count(*)=5,000,000 and correct data; previously hung forever. Co-Authored-By: Claude --- pkg/sql/compile/compile.go | 62 +++++++++++++++++ pkg/sql/compile/remoterun_test.go | 109 ++++++++++++++++++++++++++++++ 2 files changed, 171 insertions(+) diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index 06ac02837ba47..986c44af2c717 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -3828,6 +3828,8 @@ func (c *Compile) compileInsert(ns []*plan.Node, n *plan.Node, ss []*Scope) ([]* ss[i].setRootOperator(insertArg) } c.anal.isFirst = false + // keep a cross-CN shuffle dispatch in the same send unit as all its local buckets (issue #24919). + ss = c.groupShuffleBucketsByCNIfNeeded(ss) return ss, nil } @@ -3903,6 +3905,11 @@ func (c *Compile) compileInsert(ns []*plan.Node, n *plan.Node, ss []*Scope) ([]* ss[i].setRootOperator(insertArg) } currentFirstFlag = false + // Group a CN's dop shuffle buckets (and the shuffle dispatch nested under them) into one + // per-CN send unit before the coordinator merge, so the cross-CN shuffle dispatch is sent + // to and executed at its own CN instead of being converted to local on the coordinator + // (which mispairs the cross-CN receiver handshake and hangs -- issue #24919). + ss = c.groupShuffleBucketsByCNIfNeeded(ss) rs := c.newMergeScope(ss) rs.Magic = MergeInsert mergeInsertArg := constructMergeblock(c.e, n) @@ -3952,6 +3959,11 @@ func (c *Compile) compileMultiUpdate(_ []*plan.Node, n *plan.Node, ss []*Scope) ss[i].setRootOperator(multiUpdateArg) } + // Group a CN's dop shuffle buckets (and the shuffle dispatch nested under them) into one + // per-CN send unit before the coordinator merge, so the cross-CN shuffle dispatch is sent + // to and executed at its own CN instead of being converted to local on the coordinator + // (which mispairs the cross-CN receiver handshake and hangs -- issue #24919). + ss = c.groupShuffleBucketsByCNIfNeeded(ss) rs := ss[0] if len(ss) > 1 || ss[0].NodeInfo.Mcpu > 1 { rs = c.newMergeScope(ss) @@ -3966,6 +3978,8 @@ func (c *Compile) compileMultiUpdate(_ []*plan.Node, n *plan.Node, ss []*Scope) ss = []*Scope{rs} } else { if !c.IsTpQuery() { + // keep a cross-CN shuffle dispatch in the same send unit as all its local buckets (issue #24919). + ss = c.groupShuffleBucketsByCNIfNeeded(ss) rs := c.newMergeScope(ss) ss = []*Scope{rs} } @@ -4389,6 +4403,54 @@ func (c *Compile) mergeShuffleScopesIfNeeded(ss []*Scope, force bool) []*Scope { return rs } +// scopeTreeHasCrossCNDispatch reports whether the scope tree rooted at s contains a +// dispatch operator that sends to remote (cross-CN) receivers. A non-empty RemoteRegs is +// the signature of a cross-CN shuffle dispatch (see constructDispatchLocalAndRemote). +func scopeTreeHasCrossCNDispatch(s *Scope) bool { + if d, ok := s.RootOp.(*dispatch.Dispatch); ok && len(d.RemoteRegs) > 0 { + return true + } + for _, pre := range s.PreScopes { + if scopeTreeHasCrossCNDispatch(pre) { + return true + } + } + return false +} + +// groupShuffleBucketsByCNIfNeeded groups the same-CN shuffle buckets (together with the +// shuffle dispatch nested under them) into one per-CN send unit, so a cross-CN shuffle +// dispatch always travels in the same pipeline tree as all of its dop local buckets. +// +// Background (issue #24919): newShuffleJoinScopeList leaves a CN's dop join buckets in +// separate RemoteRun trees while the shuffle dispatch only attaches to the first bucket. +// When the consumer (here compileInsert) sends each bucket individually, RemoteRun -> +// checkPipelineStandaloneExecutableAtRemote sees the dispatch.LocalRegs pointing to the +// sibling out-of-tree buckets, converts the pipeline to local on the coordinator, and the +// dispatch then runs on the coordinator instead of its compile-time CN -- mispaired with +// the cross-CN receiver's FromAddr -> the remote receiver's GetProcByUuid spins / merge +// WaitingEnd waits forever -> hang. Regrouping by CN keeps all of a CN's buckets in one +// tree, so the whole group is really executed at the remote CN and the pairing is correct. +// +// It is a no-op unless we are multi-CN and ss actually carries a cross-CN shuffle dispatch, +// so single-CN and non-shuffle inserts are completely unaffected. +func (c *Compile) groupShuffleBucketsByCNIfNeeded(ss []*Scope) []*Scope { + if len(c.cnList) <= 1 || len(ss) <= len(c.cnList) { + return ss + } + hasCrossCNDispatch := false + for _, s := range ss { + if scopeTreeHasCrossCNDispatch(s) { + hasCrossCNDispatch = true + break + } + } + if !hasCrossCNDispatch { + return ss + } + return c.mergeScopesByCN(ss) +} + func (c *Compile) mergeScopesByCN(ss []*Scope) []*Scope { rs := make([]*Scope, 0, len(c.cnList)) for i := range c.cnList { diff --git a/pkg/sql/compile/remoterun_test.go b/pkg/sql/compile/remoterun_test.go index 65c3d6bb1de83..04b9ff76cee44 100644 --- a/pkg/sql/compile/remoterun_test.go +++ b/pkg/sql/compile/remoterun_test.go @@ -86,6 +86,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/txn/client" "github.com/matrixorigin/matrixone/pkg/vm" + "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/process" ) @@ -803,3 +804,111 @@ func Test_checkPipelineStandaloneExecutableAtRemote(t *testing.T) { require.False(t, checkPipelineStandaloneExecutableAtRemote(s0)) } } + +// newDispatchSrcScopeForTest builds a cross-CN shuffle dispatch source scope: +// its dispatch sends to localBuckets via LocalRegs (same CN) and to remoteBuckets +// via RemoteRegs (other CN), exactly like constructDispatchLocalAndRemote does. +func newDispatchSrcScopeForTest(proc *process.Process, addr string, localBuckets, remoteBuckets []*Scope) *Scope { + src := &Scope{ + Magic: Remote, + NodeInfo: engine.Node{Addr: addr, Mcpu: 1}, + Proc: proc.NewContextChildProc(0), + } + d := dispatch.NewArgument() + d.FuncId = dispatch.ShuffleToAllFunc + for _, b := range localBuckets { + d.LocalRegs = append(d.LocalRegs, b.Proc.Reg.MergeReceivers[0]) + } + for _, b := range remoteBuckets { + uid, _ := uuid.NewV7() + d.RemoteRegs = append(d.RemoteRegs, colexec.ReceiveInfo{Uuid: uid, NodeAddr: b.NodeInfo.Addr}) + } + src.setRootOperator(d) + src.IsEnd = true + return src +} + +// TestGroupShuffleBucketsByCNIfNeeded reproduces the issue #24919 root cause and +// verifies the per-CN regrouping fix: +// +// before regrouping, the bucket that carries a cross-CN shuffle dispatch is wrongly +// judged non-standalone-executable (its dispatch LocalRegs point to a sibling bucket +// that lives in a separate send tree) -> RemoteRun converts it to local -> the dispatch +// lands on the coordinator, mispaired with the compile-time cross-CN receiver FromAddr +// -> hang. +// +// after regrouping, the dop same-CN buckets (and the nested dispatch) become one per-CN +// send unit, so checkPipelineStandaloneExecutableAtRemote returns true and the whole +// group is really executed at the remote CN. +func TestGroupShuffleBucketsByCNIfNeeded(t *testing.T) { + c := NewMockCompile(t) + c.cnList = engine.Nodes{ + engine.Node{Addr: "cn1:6001", Mcpu: 2}, + engine.Node{Addr: "cn2:6001", Mcpu: 2}, + } + c.addr = "cn1:6001" + c.anal = &AnalyzeModule{qry: &plan.Query{}} + c.proc.Base.TxnOperator = fakeTxnOperator{} + proc := c.proc + + // dop=2, 2 CN -> bucketNum=4. buckets[0,1] on cn1, buckets[2,3] on cn2. + addrs := []string{"cn1:6001", "cn1:6001", "cn2:6001", "cn2:6001"} + buckets := make([]*Scope, 4) + for i := range buckets { + buckets[i] = &Scope{ + Magic: Remote, + NodeInfo: engine.Node{Addr: addrs[i], Mcpu: 1}, + Proc: proc.NewContextChildProc(1), + } + buckets[i].setRootOperator(merge.NewArgument()) + } + + // each CN's dispatch source is attached to that CN's first bucket (like compile.go:4500). + srcCN1 := newDispatchSrcScopeForTest(proc, "cn1:6001", + []*Scope{buckets[0], buckets[1]}, []*Scope{buckets[2], buckets[3]}) + buckets[0].PreScopes = append(buckets[0].PreScopes, srcCN1) + srcCN2 := newDispatchSrcScopeForTest(proc, "cn2:6001", + []*Scope{buckets[2], buckets[3]}, []*Scope{buckets[0], buckets[1]}) + buckets[2].PreScopes = append(buckets[2].PreScopes, srcCN2) + + // before regrouping: the dispatch-carrying buckets are wrongly judged not standalone. + require.False(t, checkPipelineStandaloneExecutableAtRemote(buckets[0])) + require.False(t, checkPipelineStandaloneExecutableAtRemote(buckets[2])) + + // after regrouping: one per-CN container each, all standalone-executable at remote. + grouped := c.groupShuffleBucketsByCNIfNeeded(buckets) + require.Equal(t, 2, len(grouped)) + for _, container := range grouped { + require.Equal(t, Remote, container.Magic) + require.True(t, checkPipelineStandaloneExecutableAtRemote(container)) + } +} + +// TestGroupShuffleBucketsByCNIfNeeded_Gating verifies the regrouping is a no-op when +// there is no cross-CN shuffle dispatch (single CN, or no dispatch), so non-shuffle / +// single-CN inserts are completely unaffected. +func TestGroupShuffleBucketsByCNIfNeeded_Gating(t *testing.T) { + c := NewMockCompile(t) + c.cnList = engine.Nodes{ + engine.Node{Addr: "cn1:6001", Mcpu: 2}, + engine.Node{Addr: "cn2:6001", Mcpu: 2}, + } + c.anal = &AnalyzeModule{qry: &plan.Query{}} + proc := c.proc + + // scopes without any cross-CN dispatch -> returned unchanged. + ss := make([]*Scope, 4) + for i := range ss { + ss[i] = &Scope{ + Magic: Remote, + NodeInfo: engine.Node{Addr: "cn1:6001", Mcpu: 1}, + Proc: proc.NewContextChildProc(0), + } + ss[i].setRootOperator(merge.NewArgument()) + } + require.Equal(t, 4, len(c.groupShuffleBucketsByCNIfNeeded(ss))) + + // single CN -> returned unchanged even if a cross-CN dispatch is present. + c.cnList = engine.Nodes{engine.Node{Addr: "cn1:6001", Mcpu: 2}} + require.Equal(t, 4, len(c.groupShuffleBucketsByCNIfNeeded(ss))) +} From be77d2516496937942b3f754da54a0cdb2ed5be0 Mon Sep 17 00:00:00 2001 From: Cao Kai Date: Thu, 25 Jun 2026 18:26:55 +0800 Subject: [PATCH 2/4] fix: address review for #24919 cross-CN shuffle grouping - document scopeTreeHasCrossCNDispatch precondition (dispatch is always RootOp) - document that grouping preserves callers existing root operator via AppendChild - extend grouping to the compileInsert S3 sink-scan path: dataScope.MergeRun still sends each bucket individually, so a cross-CN shuffle dispatch there would hit the same convert-to-local hang; group same-CN buckets first Co-Authored-By: Claude --- pkg/sql/compile/compile.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index 986c44af2c717..8f437f7df23b0 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -3838,6 +3838,10 @@ func (c *Compile) compileInsert(ns []*plan.Node, n *plan.Node, ss []*Scope) ([]* // todo : pipelines with sink scan ,must refactor this in the future currentFirstFlag := c.anal.isFirst c.anal.isFirst = false + // dataScope merges the buckets, but dataScope.MergeRun still sends each bucket as an + // individual RemoteRun unit, so a cross-CN shuffle dispatch here would hit the same + // convert-to-local hang. Group same-CN buckets into one per-CN send unit first (issue #24919). + ss = c.groupShuffleBucketsByCNIfNeeded(ss) dataScope := c.newMergeScope(ss) if c.anal.qry.LoadTag { // reset the channel buffer of sink for load @@ -4406,6 +4410,11 @@ func (c *Compile) mergeShuffleScopesIfNeeded(ss []*Scope, force bool) []*Scope { // scopeTreeHasCrossCNDispatch reports whether the scope tree rooted at s contains a // dispatch operator that sends to remote (cross-CN) receivers. A non-empty RemoteRegs is // the signature of a cross-CN shuffle dispatch (see constructDispatchLocalAndRemote). +// +// Precondition: it only inspects each scope's RootOp, assuming a shuffle dispatch is always +// the root operator of its scope (constructDispatch results are attached via setRootOperator +// with IsEnd=true, so nothing is appended on top of them). A dispatch nested as a child of +// another operator would be missed -- which does not happen for shuffle dispatches today. func scopeTreeHasCrossCNDispatch(s *Scope) bool { if d, ok := s.RootOp.(*dispatch.Dispatch); ok && len(d.RemoteRegs) > 0 { return true @@ -4434,6 +4443,11 @@ func scopeTreeHasCrossCNDispatch(s *Scope) bool { // // It is a no-op unless we are multi-CN and ss actually carries a cross-CN shuffle dispatch, // so single-CN and non-shuffle inserts are completely unaffected. +// +// Operator-chain note: callers attach their own root operator to each bucket first (e.g. +// the insert / multiUpdate operator). mergeScopesByCN (via newMergeScopeByCN -> +// doSetRootOperator) appends a connector *on top of* that existing root using AppendChild +// semantics, so the caller's operator is preserved as the connector's child, not replaced. func (c *Compile) groupShuffleBucketsByCNIfNeeded(ss []*Scope) []*Scope { if len(c.cnList) <= 1 || len(ss) <= len(c.cnList) { return ss From 6a6ac15cbca1bbf38df71172b39cb2f89f5e2977 Mon Sep 17 00:00:00 2001 From: Cao Kai Date: Fri, 26 Jun 2026 14:55:29 +0800 Subject: [PATCH 3/4] fix: drop CN-grouping from non-S3 ordinary insert path (#24919 review) XuPeng-SH review: in the compileInsert non-S3 path each bucket carries an Insert as its top-level RootOp. groupShuffleBucketsByCNIfNeeded would move those Insert ops into a per-CN Merge container PreScopes, where run()/affectedRows() (which only walk the RootOp child chain, not PreScopes) miss them -> data written but "0 rows affected". The other grouped paths are unaffected: their write op already sits in PreScopes with a top-level aggregator (mergeblock for write-S3 insert, FlushS3Info for multi-update) that reports affected rows, so grouping only deepens PreScopes without changing the accounting. Non-S3 inserts are small and do not produce cross-CN shuffle, so removing grouping here loses no hang coverage. Co-Authored-By: Claude --- pkg/sql/compile/compile.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index 8f437f7df23b0..c4f81eb684d79 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -3828,8 +3828,14 @@ func (c *Compile) compileInsert(ns []*plan.Node, n *plan.Node, ss []*Scope) ([]* ss[i].setRootOperator(insertArg) } c.anal.isFirst = false - // keep a cross-CN shuffle dispatch in the same send unit as all its local buckets (issue #24919). - ss = c.groupShuffleBucketsByCNIfNeeded(ss) + // NOTE: intentionally do NOT group shuffle buckets by CN here. Unlike the write-S3 and + // multi-update paths, this non-S3 path keeps a per-bucket Insert as each scope's root + // operator. Wrapping the buckets into a per-CN Merge container would move those Insert + // operators into PreScopes, where run()/affectedRows() -- which only walk the RootOp + // child chain, not PreScopes -- would miss them, so the rows get written but the + // statement reports "0 rows affected". This path only handles small (non-S3) inserts, + // which do not produce cross-CN shuffle in practice, so the issue #24919 hang does not + // arise here. return ss, nil } From cf95fb5f21c6daffbfc6cff383823be000d6c936 Mon Sep 17 00:00:00 2001 From: Cao Kai Date: Fri, 26 Jun 2026 15:29:04 +0800 Subject: [PATCH 4/4] fix: group non-S3 insert shuffle buckets before Insert (#24919 review) Critical: my earlier "skip grouping in non-S3 insert" was wrong. toWriteS3 is decided by INSERT output size (~35K-row threshold) while cross-CN shuffle is decided by JOIN input size + CN count -- independent. A large shuffle join with a highly selective filter / low match rate yields few output rows (non-S3) yet still shuffles across CNs, so the non-S3 path can carry a cross-CN dispatch and would silently hang without grouping. Apply groupShuffleBucketsByCNIfNeeded BEFORE attaching Insert, so the per-CN container gets Insert as its RootOp (Insert -> Merge): checkPipeline returns true (no hang) and affectedRows() still finds the Insert on the RootOp chain (correct row count). Noop when no cross-CN dispatch is present. Also from review: - scopeTreeHasCrossCNDispatch: document the intentional narrower scope vs checkPipelineStandaloneExecutableAtRemote (dispatch only, not connector). - isSameCN: log malformed-address fallback at Warn (was Debug) so a wrong-CN merge is visible in ops. Co-Authored-By: Claude --- pkg/sql/compile/compile.go | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index c4f81eb684d79..7574bd1213c6f 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -3816,6 +3816,17 @@ func (c *Compile) compileInsert(ns []*plan.Node, n *plan.Node, ss []*Scope) ([]* float64(DistributedThreshold) || c.anal.qry.LoadWriteS3 if !toWriteS3 { + // A non-S3 INSERT can still drive a cross-CN shuffle join: toWriteS3 is decided by the + // INSERT *output* row count, while shuffle is decided by the JOIN *input* table size and + // CN count -- independent decisions. A large shuffle join with a highly selective filter + // can produce few output rows (non-S3) yet still shuffle across CNs. So group the same-CN + // shuffle buckets (with their nested cross-CN dispatch) into one per-CN send unit *before* + // attaching Insert. This (a) keeps the dispatch in the same tree as all its local buckets + // so it is sent to its own CN instead of being converted to local on the coordinator and + // hanging (issue #24919), and (b) puts Insert on the per-CN container's RootOp chain + // (Insert -> Merge), so affectedRows() -- which walks the RootOp chain -- still counts it. + // Noop when ss carries no cross-CN shuffle dispatch. + ss = c.groupShuffleBucketsByCNIfNeeded(ss) currentFirstFlag := c.anal.isFirst // Not write S3 for i := range ss { @@ -3828,14 +3839,6 @@ func (c *Compile) compileInsert(ns []*plan.Node, n *plan.Node, ss []*Scope) ([]* ss[i].setRootOperator(insertArg) } c.anal.isFirst = false - // NOTE: intentionally do NOT group shuffle buckets by CN here. Unlike the write-S3 and - // multi-update paths, this non-S3 path keeps a per-bucket Insert as each scope's root - // operator. Wrapping the buckets into a per-CN Merge container would move those Insert - // operators into PreScopes, where run()/affectedRows() -- which only walk the RootOp - // child chain, not PreScopes -- would miss them, so the rows get written but the - // statement reports "0 rows affected". This path only handles small (non-S3) inserts, - // which do not produce cross-CN shuffle in practice, so the issue #24919 hang does not - // arise here. return ss, nil } @@ -4421,6 +4424,12 @@ func (c *Compile) mergeShuffleScopesIfNeeded(ss []*Scope, force bool) []*Scope { // the root operator of its scope (constructDispatch results are attached via setRootOperator // with IsEnd=true, so nothing is appended on top of them). A dispatch nested as a child of // another operator would be missed -- which does not happen for shuffle dispatches today. +// +// Scope of the check (intentionally narrower than checkPipelineStandaloneExecutableAtRemote, +// which rejects out-of-tree dispatch *and* out-of-tree connector targets): this gate only +// looks for a cross-CN dispatch, because per-CN grouping only reorganizes the same-CN shuffle +// dispatch together with its dop buckets. A scope tree that crosses CNs solely via a connector +// is not the shuffle-bucket pattern grouping addresses and is intentionally left untouched. func scopeTreeHasCrossCNDispatch(s *Scope) bool { if d, ok := s.RootOp.(*dispatch.Dispatch); ok && len(d.RemoteRegs) > 0 { return true @@ -5219,12 +5228,12 @@ func isSameCN(addr string, currentCNAddr string) bool { // just a defensive judgment. In fact, we shouldn't have received such data. parts1 := strings.Split(addr, ":") if len(parts1) != 2 { - logutil.Debugf("compileScope received a malformed cn address '%s', expected 'ip:port'", addr) + logutil.Warnf("compileScope received a malformed cn address '%s', expected 'ip:port'; treating as same-CN", addr) return true } parts2 := strings.Split(currentCNAddr, ":") if len(parts2) != 2 { - logutil.Debugf("compileScope received a malformed current-cn address '%s', expected 'ip:port'", currentCNAddr) + logutil.Warnf("compileScope received a malformed current-cn address '%s', expected 'ip:port'; treating as same-CN", currentCNAddr) return true } return parts1[0] == parts2[0] && parts1[1] == parts2[1]