Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/frontend/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"sync"
"time"

"github.com/google/uuid"
"go.uber.org/zap"
Expand Down Expand Up @@ -432,6 +433,15 @@ func (th *TxnHandler) createTxnOpUnsafe(execCtx *ExecCtx) error {
}
}

// Attach session-level lock_wait_timeout to the txn so the lock service
// uses it instead of the global config.
if varVal, err := execCtx.ses.GetSessionSysVar("lock_wait_timeout"); err == nil {
if seconds, ok := varVal.(int64); ok && seconds > 0 {
opts = append(opts,
txnclient.WithTxnLockWaitTimeout(time.Duration(seconds)*time.Second))
}
}

tempCtx, tempCancel := context.WithTimeoutCause(th.txnCtx, pu.SV.CreateTxnOpTimeout.Duration, moerr.CauseCreateTxnOpUnsafe)
defer tempCancel()

Expand Down
37 changes: 36 additions & 1 deletion pkg/lockservice/lock_table_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package lockservice
import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -102,6 +103,9 @@ func (l *localLockTable) doLock(
var oldOffset int
var err error
table := l.bind.Table
// Session-level SET lock_wait_timeout takes highest priority (passed via
// pb.LockOptions). The budget only counts time actually spent waiting.
leftTimeout := time.Duration(c.opts.LockWaitTimeout) * time.Second
for {
// blocked used for async callback, waiter is created, and added to wait list.
// So only need wait notify.
Expand Down Expand Up @@ -163,12 +167,40 @@ func (l *localLockTable) doLock(
l.options.beforeWait(c)()
}

v := c.w.wait(c.ctx, l.logger)
waitCtx := c.ctx
var cancel context.CancelFunc
if leftTimeout > 0 {
waitCtx, cancel = context.WithTimeoutCause(c.ctx, leftTimeout, ErrLockTimeout)
}
waitStart := time.Now()
v := c.w.wait(waitCtx, l.logger)
lockWaitTimeoutHit := leftTimeout > 0 &&
errors.Is(v.err, context.DeadlineExceeded) &&
context.Cause(waitCtx) == ErrLockTimeout
if cancel != nil {
cancel()
}

if l.options.afterWait != nil {
l.options.afterWait(c)()
}

// Update the remaining lock_wait_timeout budget using only wait time.
if leftTimeout > 0 {
waited := time.Since(waitStart)
if waited < leftTimeout {
leftTimeout -= waited
} else {
leftTimeout = 0
}
if lockWaitTimeoutHit {
// lock_wait_timeout expired: return ErrLockTimeout directly
// (not errors.Join) so upper layers can recognize it via
// moerr.IsMoErrCode(err, moerr.ErrInvalidState).
v.err = ErrLockTimeout
}
}

c.txn.Lock()

logLocalLockWaitOnResult(l.logger, c.txn, table, c.rows[c.idx], c.opts, c.w, v)
Expand Down Expand Up @@ -534,6 +566,9 @@ func (l *localLockTable) handleLockConflictLocked(
if c.opts.Policy == pb.WaitPolicy_FastFail {
return ErrLockConflict
}
if c.opts.async && !c.lockWaitDeadline.IsZero() && !time.Now().Before(c.lockWaitDeadline) {
return ErrLockTimeout
}

if c.opts.Granularity == pb.Granularity_Range {
l.closeRangeLastWaiterLocked(c)
Expand Down
35 changes: 33 additions & 2 deletions pkg/lockservice/lock_table_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ var (
remoteRetryMaxBackoff = 5 * time.Second
)

const (
// lockRpcSlack is the extra budget added to the RPC deadline beyond
// LockWaitTimeout. The lock-table owner starts its own wait budget only
// after receiving the RPC, so the client-side RPC deadline must outlive the
// server-side wait timer for the owner to observe and return ErrLockTimeout.
// Without this slack, the client deadline can fire before the owner returns
// ErrLockTimeout, causing the client to see a retryable connectivity error
// instead of a lock-timeout result.
lockRpcSlack = 30 * time.Second
)

// remoteLockTable the lock corresponding to the Table is managed by a remote LockTable.
// And the remoteLockTable acts as a proxy for this LockTable locally.
type remoteLockTable struct {
Expand Down Expand Up @@ -97,7 +108,26 @@ func (l *remoteLockTable) lock(
// rpc maybe wait too long, to avoid deadlock, we need unlock txn, and lock again
// after rpc completed
txn.Unlock()
resp, err := l.client.Send(ctx, req)

// When session-level lock_wait_timeout is set, bound the RPC by that
// timeout plus slack so the lock-table owner has enough time to observe
// and return ErrLockTimeout before the client-side RPC deadline fires.
// Without a session timeout, use the caller context as-is.
var rpcCtx context.Context
var rpcCancel context.CancelFunc
if d := time.Duration(opts.LockWaitTimeout) * time.Second; d > 0 {
lockRpcTimeout := d + lockRpcSlack
rpcCtx, rpcCancel = context.WithTimeout(ctx, lockRpcTimeout)
} else {
rpcCtx = ctx
}
defer func() {
if rpcCancel != nil {
rpcCancel()
}
}()
resp, err := l.client.Send(rpcCtx, req)

txn.Lock()

// txn closed
Expand Down Expand Up @@ -455,7 +485,8 @@ func skipTrackLockOnError(ctx context.Context, err error) bool {
}

func retryRemoteLockError(err error) bool {
if e, ok := err.(net.Error); ok && e.Timeout() {
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
return true
}
if errors.Is(err, io.EOF) ||
Expand Down
111 changes: 111 additions & 0 deletions pkg/lockservice/service_remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,3 +780,114 @@ func checkBind(
l := s.tableGroups.get(0, bind.Table)
assert.Equal(t, bind, l.getBind())
}

// TestRemoteLockWaitTimeout_PrecisionIndependentOfLazyCheck verifies that
// the async lock_wait_timeout fires at the waiter's own deadline, NOT on
// the coarse defaultLazyCheckDuration tick. It temporarily sets the
// global lazy-check interval to 10s, so without the precise AfterFunc
// timer the test would timeout or take >10s.
func TestRemoteLockWaitTimeout_PrecisionIndependentOfLazyCheck(t *testing.T) {
// Force the lazy-check interval to 10s so any enforcement that relies
// only on the periodic check() tick would take ≥10s.
orig := defaultLazyCheckDuration.Load().(time.Duration)
defaultLazyCheckDuration.Store(10 * time.Second)
defer defaultLazyCheckDuration.Store(orig)

runLockServiceTests(
t,
[]string{"s1", "s2"},
func(alloc *lockTableAllocator, s []*service) {
tableID := uint64(10)
l1 := s[0] // lock-table owner
l2 := s[1] // remote CN
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

txn1 := []byte("txn1")
txn2 := []byte("txn2")
row1 := []byte{1}

// txn1 holds the lock.
mustAddTestLock(t, ctx, l1, tableID, txn1, [][]byte{row1}, pb.Granularity_Row)

// txn2 on remote CN requests with 1-second timeout.
// With the check tick at 10s, a coarse-tick-only approach
// would need >10s. The precise AfterFunc timer must fire at ~1s.
start := time.Now()
_, err := l2.Lock(
ctx, tableID, [][]byte{row1}, txn2,
pb.LockOptions{
Granularity: pb.Granularity_Row,
Mode: pb.LockMode_Exclusive,
Policy: pb.WaitPolicy_Wait,
LockWaitTimeout: 1,
})
elapsed := time.Since(start)

require.Error(t, err)
require.True(t, moerr.IsMoErrCode(err, moerr.ErrInvalidState),
"expected lock-timeout, got %v", err)
// Must fire well before the 10s coarse tick.
require.Less(t, elapsed, 3*time.Second,
"precise timer should fire at ~1s, elapsed=%v", elapsed)
},
)
}

// TestRemoteLockWaitTimeout_ReturnsLockTimeout ensures that in a multi-CN
// deployment, when txn2 on a remote CN waits on a lock held by txn1 on the
// lock-table owner CN, txn2 receives ErrLockTimeout (lock timeout) after
// LockWaitTimeout elapses, NOT a retryable connectivity/backend error.
//
// Before the fix, the client-side RPC deadline was set to exactly
// LockWaitTimeout. The lock-table owner started its own wait budget only
// after receiving the RPC, so the client deadline could fire before the
// owner returned ErrLockTimeout. When that happened the client saw a
// retryable error instead of lock-timeout.
func TestRemoteLockWaitTimeout_ReturnsLockTimeout(t *testing.T) {
runLockServiceTests(
t,
[]string{"s1", "s2"},
func(alloc *lockTableAllocator, s []*service) {
tableID := uint64(10)
l1 := s[0] // lock-table owner
l2 := s[1] // remote CN
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

txn1 := []byte("txn1")
txn2 := []byte("txn2")
row1 := []byte{1}

// txn1 acquires and holds the lock on the owner.
mustAddTestLock(t, ctx, l1, tableID, txn1, [][]byte{row1}, pb.Granularity_Row)

// txn2 on the remote CN tries to lock the same row with a 1-second
// LockWaitTimeout.
opt := pb.LockOptions{
Granularity: pb.Granularity_Row,
Mode: pb.LockMode_Exclusive,
Policy: pb.WaitPolicy_Wait,
LockWaitTimeout: 1, // 1 second
}

start := time.Now()
_, err := l2.Lock(ctx, tableID, [][]byte{row1}, txn2, opt)
elapsed := time.Since(start)

require.Error(t, err)
// Must receive lock-timeout, not connectivity/backend error.
require.True(t, moerr.IsMoErrCode(err, moerr.ErrInvalidState),
"expected ErrLockTimeout (InvalidState), got %v", err)
require.Contains(t, err.Error(), "lock timeout",
"expected lock timeout message, got %v", err)
require.GreaterOrEqual(t, elapsed, time.Second,
"should have waited at least LockWaitTimeout")
// With the lazy check interval at 50ms in tests, the async timeout
// should be detected very close to LockWaitTimeout. Allow a
// generous upper bound to absorb scheduling jitter.
require.Less(t, elapsed, 5*time.Second,
"should not wait beyond LockWaitTimeout + check interval")
},
)
}
Loading
Loading