diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index 06ac02837ba47..8f437f7df23b0 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -3828,6 +3828,8 @@ func (c *Compile) compileInsert(ns []*plan.Node, n *plan.Node, ss []*Scope) ([]* ss[i].setRootOperator(insertArg) } c.anal.isFirst = false + // keep a cross-CN shuffle dispatch in the same send unit as all its local buckets (issue #24919). + ss = c.groupShuffleBucketsByCNIfNeeded(ss) return ss, nil } @@ -3836,6 +3838,10 @@ func (c *Compile) compileInsert(ns []*plan.Node, n *plan.Node, ss []*Scope) ([]* // todo : pipelines with sink scan ,must refactor this in the future currentFirstFlag := c.anal.isFirst c.anal.isFirst = false + // dataScope merges the buckets, but dataScope.MergeRun still sends each bucket as an + // individual RemoteRun unit, so a cross-CN shuffle dispatch here would hit the same + // convert-to-local hang. Group same-CN buckets into one per-CN send unit first (issue #24919). + ss = c.groupShuffleBucketsByCNIfNeeded(ss) dataScope := c.newMergeScope(ss) if c.anal.qry.LoadTag { // reset the channel buffer of sink for load @@ -3903,6 +3909,11 @@ func (c *Compile) compileInsert(ns []*plan.Node, n *plan.Node, ss []*Scope) ([]* ss[i].setRootOperator(insertArg) } currentFirstFlag = false + // Group a CN's dop shuffle buckets (and the shuffle dispatch nested under them) into one + // per-CN send unit before the coordinator merge, so the cross-CN shuffle dispatch is sent + // to and executed at its own CN instead of being converted to local on the coordinator + // (which mispairs the cross-CN receiver handshake and hangs -- issue #24919). + ss = c.groupShuffleBucketsByCNIfNeeded(ss) rs := c.newMergeScope(ss) rs.Magic = MergeInsert mergeInsertArg := constructMergeblock(c.e, n) @@ -3952,6 +3963,11 @@ func (c *Compile) compileMultiUpdate(_ []*plan.Node, n *plan.Node, ss []*Scope) ss[i].setRootOperator(multiUpdateArg) } + // Group a CN's dop shuffle buckets (and the shuffle dispatch nested under them) into one + // per-CN send unit before the coordinator merge, so the cross-CN shuffle dispatch is sent + // to and executed at its own CN instead of being converted to local on the coordinator + // (which mispairs the cross-CN receiver handshake and hangs -- issue #24919). + ss = c.groupShuffleBucketsByCNIfNeeded(ss) rs := ss[0] if len(ss) > 1 || ss[0].NodeInfo.Mcpu > 1 { rs = c.newMergeScope(ss) @@ -3966,6 +3982,8 @@ func (c *Compile) compileMultiUpdate(_ []*plan.Node, n *plan.Node, ss []*Scope) ss = []*Scope{rs} } else { if !c.IsTpQuery() { + // keep a cross-CN shuffle dispatch in the same send unit as all its local buckets (issue #24919). + ss = c.groupShuffleBucketsByCNIfNeeded(ss) rs := c.newMergeScope(ss) ss = []*Scope{rs} } @@ -4389,6 +4407,64 @@ func (c *Compile) mergeShuffleScopesIfNeeded(ss []*Scope, force bool) []*Scope { return rs } +// scopeTreeHasCrossCNDispatch reports whether the scope tree rooted at s contains a +// dispatch operator that sends to remote (cross-CN) receivers. A non-empty RemoteRegs is +// the signature of a cross-CN shuffle dispatch (see constructDispatchLocalAndRemote). +// +// Precondition: it only inspects each scope's RootOp, assuming a shuffle dispatch is always +// the root operator of its scope (constructDispatch results are attached via setRootOperator +// with IsEnd=true, so nothing is appended on top of them). A dispatch nested as a child of +// another operator would be missed -- which does not happen for shuffle dispatches today. +func scopeTreeHasCrossCNDispatch(s *Scope) bool { + if d, ok := s.RootOp.(*dispatch.Dispatch); ok && len(d.RemoteRegs) > 0 { + return true + } + for _, pre := range s.PreScopes { + if scopeTreeHasCrossCNDispatch(pre) { + return true + } + } + return false +} + +// groupShuffleBucketsByCNIfNeeded groups the same-CN shuffle buckets (together with the +// shuffle dispatch nested under them) into one per-CN send unit, so a cross-CN shuffle +// dispatch always travels in the same pipeline tree as all of its dop local buckets. +// +// Background (issue #24919): newShuffleJoinScopeList leaves a CN's dop join buckets in +// separate RemoteRun trees while the shuffle dispatch only attaches to the first bucket. +// When the consumer (here compileInsert) sends each bucket individually, RemoteRun -> +// checkPipelineStandaloneExecutableAtRemote sees the dispatch.LocalRegs pointing to the +// sibling out-of-tree buckets, converts the pipeline to local on the coordinator, and the +// dispatch then runs on the coordinator instead of its compile-time CN -- mispaired with +// the cross-CN receiver's FromAddr -> the remote receiver's GetProcByUuid spins / merge +// WaitingEnd waits forever -> hang. Regrouping by CN keeps all of a CN's buckets in one +// tree, so the whole group is really executed at the remote CN and the pairing is correct. +// +// It is a no-op unless we are multi-CN and ss actually carries a cross-CN shuffle dispatch, +// so single-CN and non-shuffle inserts are completely unaffected. +// +// Operator-chain note: callers attach their own root operator to each bucket first (e.g. +// the insert / multiUpdate operator). mergeScopesByCN (via newMergeScopeByCN -> +// doSetRootOperator) appends a connector *on top of* that existing root using AppendChild +// semantics, so the caller's operator is preserved as the connector's child, not replaced. +func (c *Compile) groupShuffleBucketsByCNIfNeeded(ss []*Scope) []*Scope { + if len(c.cnList) <= 1 || len(ss) <= len(c.cnList) { + return ss + } + hasCrossCNDispatch := false + for _, s := range ss { + if scopeTreeHasCrossCNDispatch(s) { + hasCrossCNDispatch = true + break + } + } + if !hasCrossCNDispatch { + return ss + } + return c.mergeScopesByCN(ss) +} + func (c *Compile) mergeScopesByCN(ss []*Scope) []*Scope { rs := make([]*Scope, 0, len(c.cnList)) for i := range c.cnList { diff --git a/pkg/sql/compile/remoterun_test.go b/pkg/sql/compile/remoterun_test.go index 65c3d6bb1de83..04b9ff76cee44 100644 --- a/pkg/sql/compile/remoterun_test.go +++ b/pkg/sql/compile/remoterun_test.go @@ -86,6 +86,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/txn/client" "github.com/matrixorigin/matrixone/pkg/vm" + "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/process" ) @@ -803,3 +804,111 @@ func Test_checkPipelineStandaloneExecutableAtRemote(t *testing.T) { require.False(t, checkPipelineStandaloneExecutableAtRemote(s0)) } } + +// newDispatchSrcScopeForTest builds a cross-CN shuffle dispatch source scope: +// its dispatch sends to localBuckets via LocalRegs (same CN) and to remoteBuckets +// via RemoteRegs (other CN), exactly like constructDispatchLocalAndRemote does. +func newDispatchSrcScopeForTest(proc *process.Process, addr string, localBuckets, remoteBuckets []*Scope) *Scope { + src := &Scope{ + Magic: Remote, + NodeInfo: engine.Node{Addr: addr, Mcpu: 1}, + Proc: proc.NewContextChildProc(0), + } + d := dispatch.NewArgument() + d.FuncId = dispatch.ShuffleToAllFunc + for _, b := range localBuckets { + d.LocalRegs = append(d.LocalRegs, b.Proc.Reg.MergeReceivers[0]) + } + for _, b := range remoteBuckets { + uid, _ := uuid.NewV7() + d.RemoteRegs = append(d.RemoteRegs, colexec.ReceiveInfo{Uuid: uid, NodeAddr: b.NodeInfo.Addr}) + } + src.setRootOperator(d) + src.IsEnd = true + return src +} + +// TestGroupShuffleBucketsByCNIfNeeded reproduces the issue #24919 root cause and +// verifies the per-CN regrouping fix: +// +// before regrouping, the bucket that carries a cross-CN shuffle dispatch is wrongly +// judged non-standalone-executable (its dispatch LocalRegs point to a sibling bucket +// that lives in a separate send tree) -> RemoteRun converts it to local -> the dispatch +// lands on the coordinator, mispaired with the compile-time cross-CN receiver FromAddr +// -> hang. +// +// after regrouping, the dop same-CN buckets (and the nested dispatch) become one per-CN +// send unit, so checkPipelineStandaloneExecutableAtRemote returns true and the whole +// group is really executed at the remote CN. +func TestGroupShuffleBucketsByCNIfNeeded(t *testing.T) { + c := NewMockCompile(t) + c.cnList = engine.Nodes{ + engine.Node{Addr: "cn1:6001", Mcpu: 2}, + engine.Node{Addr: "cn2:6001", Mcpu: 2}, + } + c.addr = "cn1:6001" + c.anal = &AnalyzeModule{qry: &plan.Query{}} + c.proc.Base.TxnOperator = fakeTxnOperator{} + proc := c.proc + + // dop=2, 2 CN -> bucketNum=4. buckets[0,1] on cn1, buckets[2,3] on cn2. + addrs := []string{"cn1:6001", "cn1:6001", "cn2:6001", "cn2:6001"} + buckets := make([]*Scope, 4) + for i := range buckets { + buckets[i] = &Scope{ + Magic: Remote, + NodeInfo: engine.Node{Addr: addrs[i], Mcpu: 1}, + Proc: proc.NewContextChildProc(1), + } + buckets[i].setRootOperator(merge.NewArgument()) + } + + // each CN's dispatch source is attached to that CN's first bucket (like compile.go:4500). + srcCN1 := newDispatchSrcScopeForTest(proc, "cn1:6001", + []*Scope{buckets[0], buckets[1]}, []*Scope{buckets[2], buckets[3]}) + buckets[0].PreScopes = append(buckets[0].PreScopes, srcCN1) + srcCN2 := newDispatchSrcScopeForTest(proc, "cn2:6001", + []*Scope{buckets[2], buckets[3]}, []*Scope{buckets[0], buckets[1]}) + buckets[2].PreScopes = append(buckets[2].PreScopes, srcCN2) + + // before regrouping: the dispatch-carrying buckets are wrongly judged not standalone. + require.False(t, checkPipelineStandaloneExecutableAtRemote(buckets[0])) + require.False(t, checkPipelineStandaloneExecutableAtRemote(buckets[2])) + + // after regrouping: one per-CN container each, all standalone-executable at remote. + grouped := c.groupShuffleBucketsByCNIfNeeded(buckets) + require.Equal(t, 2, len(grouped)) + for _, container := range grouped { + require.Equal(t, Remote, container.Magic) + require.True(t, checkPipelineStandaloneExecutableAtRemote(container)) + } +} + +// TestGroupShuffleBucketsByCNIfNeeded_Gating verifies the regrouping is a no-op when +// there is no cross-CN shuffle dispatch (single CN, or no dispatch), so non-shuffle / +// single-CN inserts are completely unaffected. +func TestGroupShuffleBucketsByCNIfNeeded_Gating(t *testing.T) { + c := NewMockCompile(t) + c.cnList = engine.Nodes{ + engine.Node{Addr: "cn1:6001", Mcpu: 2}, + engine.Node{Addr: "cn2:6001", Mcpu: 2}, + } + c.anal = &AnalyzeModule{qry: &plan.Query{}} + proc := c.proc + + // scopes without any cross-CN dispatch -> returned unchanged. + ss := make([]*Scope, 4) + for i := range ss { + ss[i] = &Scope{ + Magic: Remote, + NodeInfo: engine.Node{Addr: "cn1:6001", Mcpu: 1}, + Proc: proc.NewContextChildProc(0), + } + ss[i].setRootOperator(merge.NewArgument()) + } + require.Equal(t, 4, len(c.groupShuffleBucketsByCNIfNeeded(ss))) + + // single CN -> returned unchanged even if a cross-CN dispatch is present. + c.cnList = engine.Nodes{engine.Node{Addr: "cn1:6001", Mcpu: 2}} + require.Equal(t, 4, len(c.groupShuffleBucketsByCNIfNeeded(ss))) +}