Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -3828,6 +3828,8 @@ func (c *Compile) compileInsert(ns []*plan.Node, n *plan.Node, ss []*Scope) ([]*
ss[i].setRootOperator(insertArg)
}
c.anal.isFirst = false
// keep a cross-CN shuffle dispatch in the same send unit as all its local buckets (issue #24919).
ss = c.groupShuffleBucketsByCNIfNeeded(ss)
return ss, nil
}

Expand All @@ -3836,6 +3838,10 @@ func (c *Compile) compileInsert(ns []*plan.Node, n *plan.Node, ss []*Scope) ([]*
// todo : pipelines with sink scan ,must refactor this in the future
currentFirstFlag := c.anal.isFirst
c.anal.isFirst = false
// dataScope merges the buckets, but dataScope.MergeRun still sends each bucket as an
// individual RemoteRun unit, so a cross-CN shuffle dispatch here would hit the same
// convert-to-local hang. Group same-CN buckets into one per-CN send unit first (issue #24919).
ss = c.groupShuffleBucketsByCNIfNeeded(ss)
dataScope := c.newMergeScope(ss)
if c.anal.qry.LoadTag {
// reset the channel buffer of sink for load
Expand Down Expand Up @@ -3903,6 +3909,11 @@ func (c *Compile) compileInsert(ns []*plan.Node, n *plan.Node, ss []*Scope) ([]*
ss[i].setRootOperator(insertArg)
}
currentFirstFlag = false
// Group a CN's dop shuffle buckets (and the shuffle dispatch nested under them) into one
// per-CN send unit before the coordinator merge, so the cross-CN shuffle dispatch is sent
// to and executed at its own CN instead of being converted to local on the coordinator
// (which mispairs the cross-CN receiver handshake and hangs -- issue #24919).
ss = c.groupShuffleBucketsByCNIfNeeded(ss)
rs := c.newMergeScope(ss)
rs.Magic = MergeInsert
mergeInsertArg := constructMergeblock(c.e, n)
Expand Down Expand Up @@ -3952,6 +3963,11 @@ func (c *Compile) compileMultiUpdate(_ []*plan.Node, n *plan.Node, ss []*Scope)
ss[i].setRootOperator(multiUpdateArg)
}

// Group a CN's dop shuffle buckets (and the shuffle dispatch nested under them) into one
// per-CN send unit before the coordinator merge, so the cross-CN shuffle dispatch is sent
// to and executed at its own CN instead of being converted to local on the coordinator
// (which mispairs the cross-CN receiver handshake and hangs -- issue #24919).
ss = c.groupShuffleBucketsByCNIfNeeded(ss)
rs := ss[0]
if len(ss) > 1 || ss[0].NodeInfo.Mcpu > 1 {
rs = c.newMergeScope(ss)
Expand All @@ -3966,6 +3982,8 @@ func (c *Compile) compileMultiUpdate(_ []*plan.Node, n *plan.Node, ss []*Scope)
ss = []*Scope{rs}
} else {
if !c.IsTpQuery() {
// keep a cross-CN shuffle dispatch in the same send unit as all its local buckets (issue #24919).
ss = c.groupShuffleBucketsByCNIfNeeded(ss)
rs := c.newMergeScope(ss)
ss = []*Scope{rs}
}
Expand Down Expand Up @@ -4389,6 +4407,64 @@ func (c *Compile) mergeShuffleScopesIfNeeded(ss []*Scope, force bool) []*Scope {
return rs
}

// scopeTreeHasCrossCNDispatch reports whether the scope tree rooted at s contains a
// dispatch operator that sends to remote (cross-CN) receivers. A non-empty RemoteRegs is
// the signature of a cross-CN shuffle dispatch (see constructDispatchLocalAndRemote).
//
// Precondition: it only inspects each scope's RootOp, assuming a shuffle dispatch is always
// the root operator of its scope (constructDispatch results are attached via setRootOperator
// with IsEnd=true, so nothing is appended on top of them). A dispatch nested as a child of
// another operator would be missed -- which does not happen for shuffle dispatches today.
func scopeTreeHasCrossCNDispatch(s *Scope) bool {
if d, ok := s.RootOp.(*dispatch.Dispatch); ok && len(d.RemoteRegs) > 0 {
return true
}
for _, pre := range s.PreScopes {
if scopeTreeHasCrossCNDispatch(pre) {
return true
}
}
return false
}

// groupShuffleBucketsByCNIfNeeded groups the same-CN shuffle buckets (together with the
// shuffle dispatch nested under them) into one per-CN send unit, so a cross-CN shuffle
// dispatch always travels in the same pipeline tree as all of its dop local buckets.
//
// Background (issue #24919): newShuffleJoinScopeList leaves a CN's dop join buckets in
// separate RemoteRun trees while the shuffle dispatch only attaches to the first bucket.
// When the consumer (here compileInsert) sends each bucket individually, RemoteRun ->
// checkPipelineStandaloneExecutableAtRemote sees the dispatch.LocalRegs pointing to the
// sibling out-of-tree buckets, converts the pipeline to local on the coordinator, and the
// dispatch then runs on the coordinator instead of its compile-time CN -- mispaired with
// the cross-CN receiver's FromAddr -> the remote receiver's GetProcByUuid spins / merge
// WaitingEnd waits forever -> hang. Regrouping by CN keeps all of a CN's buckets in one
// tree, so the whole group is really executed at the remote CN and the pairing is correct.
//
// It is a no-op unless we are multi-CN and ss actually carries a cross-CN shuffle dispatch,
// so single-CN and non-shuffle inserts are completely unaffected.
//
// Operator-chain note: callers attach their own root operator to each bucket first (e.g.
// the insert / multiUpdate operator). mergeScopesByCN (via newMergeScopeByCN ->
// doSetRootOperator) appends a connector *on top of* that existing root using AppendChild
// semantics, so the caller's operator is preserved as the connector's child, not replaced.
func (c *Compile) groupShuffleBucketsByCNIfNeeded(ss []*Scope) []*Scope {
if len(c.cnList) <= 1 || len(ss) <= len(c.cnList) {
return ss
}
hasCrossCNDispatch := false
for _, s := range ss {
if scopeTreeHasCrossCNDispatch(s) {
hasCrossCNDispatch = true
break
}
}
if !hasCrossCNDispatch {
return ss
}
return c.mergeScopesByCN(ss)
}

func (c *Compile) mergeScopesByCN(ss []*Scope) []*Scope {
rs := make([]*Scope, 0, len(c.cnList))
for i := range c.cnList {
Expand Down
109 changes: 109 additions & 0 deletions pkg/sql/compile/remoterun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/testutil"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/vm"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

Expand Down Expand Up @@ -803,3 +804,111 @@ func Test_checkPipelineStandaloneExecutableAtRemote(t *testing.T) {
require.False(t, checkPipelineStandaloneExecutableAtRemote(s0))
}
}

// newDispatchSrcScopeForTest builds a cross-CN shuffle dispatch source scope:
// its dispatch sends to localBuckets via LocalRegs (same CN) and to remoteBuckets
// via RemoteRegs (other CN), exactly like constructDispatchLocalAndRemote does.
func newDispatchSrcScopeForTest(proc *process.Process, addr string, localBuckets, remoteBuckets []*Scope) *Scope {
src := &Scope{
Magic: Remote,
NodeInfo: engine.Node{Addr: addr, Mcpu: 1},
Proc: proc.NewContextChildProc(0),
}
d := dispatch.NewArgument()
d.FuncId = dispatch.ShuffleToAllFunc
for _, b := range localBuckets {
d.LocalRegs = append(d.LocalRegs, b.Proc.Reg.MergeReceivers[0])
}
for _, b := range remoteBuckets {
uid, _ := uuid.NewV7()
d.RemoteRegs = append(d.RemoteRegs, colexec.ReceiveInfo{Uuid: uid, NodeAddr: b.NodeInfo.Addr})
}
src.setRootOperator(d)
src.IsEnd = true
return src
}

// TestGroupShuffleBucketsByCNIfNeeded reproduces the issue #24919 root cause and
// verifies the per-CN regrouping fix:
//
// before regrouping, the bucket that carries a cross-CN shuffle dispatch is wrongly
// judged non-standalone-executable (its dispatch LocalRegs point to a sibling bucket
// that lives in a separate send tree) -> RemoteRun converts it to local -> the dispatch
// lands on the coordinator, mispaired with the compile-time cross-CN receiver FromAddr
// -> hang.
//
// after regrouping, the dop same-CN buckets (and the nested dispatch) become one per-CN
// send unit, so checkPipelineStandaloneExecutableAtRemote returns true and the whole
// group is really executed at the remote CN.
func TestGroupShuffleBucketsByCNIfNeeded(t *testing.T) {
c := NewMockCompile(t)
c.cnList = engine.Nodes{
engine.Node{Addr: "cn1:6001", Mcpu: 2},
engine.Node{Addr: "cn2:6001", Mcpu: 2},
}
c.addr = "cn1:6001"
c.anal = &AnalyzeModule{qry: &plan.Query{}}
c.proc.Base.TxnOperator = fakeTxnOperator{}
proc := c.proc

// dop=2, 2 CN -> bucketNum=4. buckets[0,1] on cn1, buckets[2,3] on cn2.
addrs := []string{"cn1:6001", "cn1:6001", "cn2:6001", "cn2:6001"}
buckets := make([]*Scope, 4)
for i := range buckets {
buckets[i] = &Scope{
Magic: Remote,
NodeInfo: engine.Node{Addr: addrs[i], Mcpu: 1},
Proc: proc.NewContextChildProc(1),
}
buckets[i].setRootOperator(merge.NewArgument())
}

// each CN's dispatch source is attached to that CN's first bucket (like compile.go:4500).
srcCN1 := newDispatchSrcScopeForTest(proc, "cn1:6001",
[]*Scope{buckets[0], buckets[1]}, []*Scope{buckets[2], buckets[3]})
buckets[0].PreScopes = append(buckets[0].PreScopes, srcCN1)
srcCN2 := newDispatchSrcScopeForTest(proc, "cn2:6001",
[]*Scope{buckets[2], buckets[3]}, []*Scope{buckets[0], buckets[1]})
buckets[2].PreScopes = append(buckets[2].PreScopes, srcCN2)

// before regrouping: the dispatch-carrying buckets are wrongly judged not standalone.
require.False(t, checkPipelineStandaloneExecutableAtRemote(buckets[0]))
require.False(t, checkPipelineStandaloneExecutableAtRemote(buckets[2]))

// after regrouping: one per-CN container each, all standalone-executable at remote.
grouped := c.groupShuffleBucketsByCNIfNeeded(buckets)
require.Equal(t, 2, len(grouped))
for _, container := range grouped {
require.Equal(t, Remote, container.Magic)
require.True(t, checkPipelineStandaloneExecutableAtRemote(container))
}
}

// TestGroupShuffleBucketsByCNIfNeeded_Gating verifies the regrouping is a no-op when
// there is no cross-CN shuffle dispatch (single CN, or no dispatch), so non-shuffle /
// single-CN inserts are completely unaffected.
func TestGroupShuffleBucketsByCNIfNeeded_Gating(t *testing.T) {
c := NewMockCompile(t)
c.cnList = engine.Nodes{
engine.Node{Addr: "cn1:6001", Mcpu: 2},
engine.Node{Addr: "cn2:6001", Mcpu: 2},
}
c.anal = &AnalyzeModule{qry: &plan.Query{}}
proc := c.proc

// scopes without any cross-CN dispatch -> returned unchanged.
ss := make([]*Scope, 4)
for i := range ss {
ss[i] = &Scope{
Magic: Remote,
NodeInfo: engine.Node{Addr: "cn1:6001", Mcpu: 1},
Proc: proc.NewContextChildProc(0),
}
ss[i].setRootOperator(merge.NewArgument())
}
require.Equal(t, 4, len(c.groupShuffleBucketsByCNIfNeeded(ss)))

// single CN -> returned unchanged even if a cross-CN dispatch is present.
c.cnList = engine.Nodes{engine.Node{Addr: "cn1:6001", Mcpu: 2}}
require.Equal(t, 4, len(c.groupShuffleBucketsByCNIfNeeded(ss)))
}
Loading