Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions pkg/sql/colexec/multi_update/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,140 @@
package multi_update

import (
"encoding/hex"
"fmt"
"os"
"strings"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/perfcounter"
"github.com/matrixorigin/matrixone/pkg/sql/plan"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

// debugCpkeyAssert controls the main-table __mo_cpkey_col consistency check,
// via env MO_DEBUG_CPKEY_ASSERT. Disabled by default (no per-row overhead):
// - unset / anything else (default): off
// - "log": log an error on mismatch, keep running
// - "panic": log + panic on mismatch (dumps stack to locate the cause)
var debugCpkeyAssert = os.Getenv("MO_DEBUG_CPKEY_ASSERT")

// debugCheckMainCpkey verifies, for a composite-PK main table, that the
// __mo_cpkey_col value of every row in insertBatch equals serial(pk parts)
// recomputed from the same row. A mismatch means the cpkey column got
// row-misaligned with the data columns somewhere upstream (the genesis of the
// duplicate-primary-key bug). Cheap no-op unless MO_DEBUG_CPKEY_ASSERT is set.
func debugCheckMainCpkey(proc *process.Process, updateCtx *MultiUpdateCtx, insertBatch *batch.Batch) {
if debugCpkeyAssert != "log" && debugCpkeyAssert != "panic" {
return
}
tableDef := updateCtx.TableDef
if tableDef.Pkey == nil || tableDef.Pkey.PkeyColName != catalog.CPrimaryKeyColName {
return // only composite PK has a separate __mo_cpkey_col
}
// insertBatch.Vecs[i] corresponds to tableDef.Cols[i] (rowid is last & skipped)
cpkeyVecIdx := -1
for i, col := range tableDef.Cols {
if col.Name == catalog.CPrimaryKeyColName {
cpkeyVecIdx = i
break
}
}
if cpkeyVecIdx < 0 || cpkeyVecIdx >= len(insertBatch.Vecs) {
return
}
pkPartIdx := make([]int, 0, len(tableDef.Pkey.Names))
for _, name := range tableDef.Pkey.Names {
idx := -1
for i, col := range tableDef.Cols {
if col.Name == name {
idx = i
break
}
}
if idx < 0 || idx >= len(insertBatch.Vecs) {
return
}
pkPartIdx = append(pkPartIdx, idx)
}

packer := types.NewPacker()
defer packer.Close()
cpkeyVec := insertBatch.Vecs[cpkeyVecIdx]
rows := insertBatch.RowCount()
for r := 0; r < rows; r++ {
packer.Reset()
ok := true
for _, idx := range pkPartIdx {
if !encodePackForAssert(insertBatch.Vecs[idx], r, packer) {
ok = false
break
}
}
if !ok {
return // unsupported pk type, skip rather than false-positive
}
want := packer.GetBuf()
got := cpkeyVec.GetBytesAt(r)
if string(want) != string(got) {
msg := fmt.Sprintf("CPKEY MISALIGN detected on table %s row %d/%d: "+
"recomputed serial(pk)=%s but stored __mo_cpkey_col=%s",
tableDef.Name, r, rows, hex.EncodeToString(want), hex.EncodeToString(got))
logutil.Errorf("%s", msg)
if debugCpkeyAssert == "panic" {
panic(msg)
}
return
}
}
}

// encodePackForAssert mirrors getPackFun in pkg/sql/plan/function for the PK
// column types; returns false for unsupported types so the caller can skip.
func encodePackForAssert(v *vector.Vector, idx int, ps *types.Packer) bool {
switch v.GetType().Oid {
case types.T_uuid:
ps.EncodeUuid(vector.GetFixedAtNoTypeCheck[types.Uuid](v, idx))
case types.T_int8:
ps.EncodeInt8(vector.GetFixedAtNoTypeCheck[int8](v, idx))
case types.T_int16:
ps.EncodeInt16(vector.GetFixedAtNoTypeCheck[int16](v, idx))
case types.T_int32:
ps.EncodeInt32(vector.GetFixedAtNoTypeCheck[int32](v, idx))
case types.T_int64:
ps.EncodeInt64(vector.GetFixedAtNoTypeCheck[int64](v, idx))
case types.T_uint8:
ps.EncodeUint8(vector.GetFixedAtNoTypeCheck[uint8](v, idx))
case types.T_uint16:
ps.EncodeUint16(vector.GetFixedAtNoTypeCheck[uint16](v, idx))
case types.T_uint32:
ps.EncodeUint32(vector.GetFixedAtNoTypeCheck[uint32](v, idx))
case types.T_uint64:
ps.EncodeUint64(vector.GetFixedAtNoTypeCheck[uint64](v, idx))
case types.T_date:
ps.EncodeDate(vector.GetFixedAtNoTypeCheck[types.Date](v, idx))
case types.T_datetime:
ps.EncodeDatetime(vector.GetFixedAtNoTypeCheck[types.Datetime](v, idx))
case types.T_timestamp:
ps.EncodeTimestamp(vector.GetFixedAtNoTypeCheck[types.Timestamp](v, idx))
case types.T_decimal64:
ps.EncodeDecimal64(vector.GetFixedAtNoTypeCheck[types.Decimal64](v, idx))
case types.T_decimal128:
ps.EncodeDecimal128(vector.GetFixedAtNoTypeCheck[types.Decimal128](v, idx))
case types.T_char, types.T_varchar, types.T_binary, types.T_varbinary,
types.T_blob, types.T_text, types.T_json, types.T_datalink:
ps.EncodeStringType(v.GetBytesAt(idx))
default:
return false
}
return true
}

//@todo add test case: only insert hidden table

func (update *MultiUpdate) insert_main_table(
Expand Down Expand Up @@ -137,6 +259,7 @@ func (update *MultiUpdate) check_null_and_insert_main_table(
if err = checkMainTableNotNull(proc, updateCtx, insertBatch); err != nil {
return err
}
debugCheckMainCpkey(proc, updateCtx, insertBatch)
tableType := update.ctr.updateCtxInfos[updateCtx.TableDef.Name].tableType
update.addInsertAffectRows(tableType, uint64(newRowCount))
source := update.ctr.updateCtxInfos[updateCtx.TableDef.Name].Source
Expand Down
Loading