Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions pkg/lockservice/lock_table_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"
"time"

"github.com/google/uuid"
"github.com/matrixorigin/matrixone/pkg/common/log"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/morpc"
Expand All @@ -40,8 +41,11 @@ type lockTableAllocator struct {
client Client
inactiveService sync.Map // lock service id -> inactive time
ctl sync.Map // lock service id -> *commitCtl
version uint64
mu struct {
allocatorID string
// version is the allocator process epoch. It is set once when the
// allocator is constructed; production code must not mutate it at runtime.
version uint64
mu struct {
sync.RWMutex
services map[string]*serviceBinds
lockTables map[uint32]map[uint64]pb.LockTable
Expand Down Expand Up @@ -83,6 +87,7 @@ func NewLockTableAllocator(
stopper.WithLogger(logger.RawLogger().Named(tag))),
keepBindTimeout: keepBindTimeout,
client: rpcClient,
allocatorID: uuid.New().String(),
version: uint64(time.Now().UnixNano()),
}
la.mu.lockTables = make(map[uint32]map[uint64]pb.LockTable)
Expand Down Expand Up @@ -461,6 +466,9 @@ func (l *lockTableAllocator) tryRebindLocked(

// If not a newer version of the same service, create new binding
old.ServiceID = binds.serviceID
if old.AllocatorID == "" {
old.AllocatorID = l.allocatorID
}
old.Version++
old.Valid = true
l.getLockTablesLocked(group)[tableID] = old
Expand Down Expand Up @@ -499,6 +507,7 @@ func (l *lockTableAllocator) createBindLocked(
Valid: true,
Sharding: sharding,
Group: group,
AllocatorID: l.allocatorID,
}
l.getLockTablesLocked(group)[tableID] = b
l.logger.Info("bind created",
Expand Down Expand Up @@ -843,6 +852,8 @@ func (l *lockTableAllocator) handleGetBind(
req *pb.Request,
resp *pb.Response,
cs morpc.ClientSession) {
resp.GetBind.AllocatorID = l.allocatorID
resp.GetBind.AllocatorVersion = l.version
if !l.canGetBind(req.GetBind.ServiceID) {
writeResponse(l.logger, cancel, resp, moerr.NewNewTxnInCNRollingRestart(), cs)
return
Expand All @@ -862,6 +873,8 @@ func (l *lockTableAllocator) handleKeepLockTableBind(
req *pb.Request,
resp *pb.Response,
cs morpc.ClientSession) {
resp.KeepLockTableBind.AllocatorID = l.allocatorID
resp.KeepLockTableBind.AllocatorVersion = l.version
resp.KeepLockTableBind.OK = l.KeepLockTableBind(req.KeepLockTableBind.ServiceID)
if !resp.KeepLockTableBind.OK {
// resp.KeepLockTableBind.Status = pb.Status_ServiceCanRestart
Expand Down
8 changes: 4 additions & 4 deletions pkg/lockservice/lock_table_allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestGetWithNoBind(t *testing.T) {
time.Hour,
func(a *lockTableAllocator) {
assert.Equal(t,
pb.LockTable{Valid: true, ServiceID: "s1", Table: 1, OriginTable: 1, Version: a.version},
pb.LockTable{Valid: true, ServiceID: "s1", Table: 1, OriginTable: 1, Version: a.version, AllocatorID: a.allocatorID},
a.Get("s1", 0, 1, 0, pb.Sharding_None))
})
}
Expand All @@ -98,7 +98,7 @@ func TestGetWithAlreadyBind(t *testing.T) {
// register s1 first
a.Get("s1", 0, 1, 0, pb.Sharding_None)
assert.Equal(t,
pb.LockTable{Valid: true, ServiceID: "s1", Table: 1, OriginTable: 1, Version: a.version},
pb.LockTable{Valid: true, ServiceID: "s1", Table: 1, OriginTable: 1, Version: a.version, AllocatorID: a.allocatorID},
a.Get("s2", 0, 1, 0, pb.Sharding_None))
})
}
Expand All @@ -112,7 +112,7 @@ func TestGetWithBindInvalid(t *testing.T) {
a.Get("s1", 0, 1, 0, pb.Sharding_None)
a.disableTableBinds(a.getServiceBinds("s1"))
assert.Equal(t,
pb.LockTable{Valid: true, ServiceID: "s2", Table: 1, OriginTable: 1, Version: a.version + 1},
pb.LockTable{Valid: true, ServiceID: "s2", Table: 1, OriginTable: 1, Version: a.version + 1, AllocatorID: a.allocatorID},
a.Get("s2", 0, 1, 0, pb.Sharding_None))
})
}
Expand All @@ -131,7 +131,7 @@ func TestGetWithBindAndServiceBothInvalid(t *testing.T) {
a.getServiceBinds("s2").disable()

assert.Equal(t,
pb.LockTable{Valid: false, ServiceID: "s1", Table: 1, OriginTable: 1, Version: a.version},
pb.LockTable{Valid: false, ServiceID: "s1", Table: 1, OriginTable: 1, Version: a.version, AllocatorID: a.allocatorID},
a.Get("s2", 0, 1, 0, pb.Sharding_None))
})
}
Expand Down
34 changes: 21 additions & 13 deletions pkg/lockservice/lock_table_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (k *lockTableKeeper) doKeepLockTableBind(ctx context.Context) {
req.KeepLockTableBind.TxnIDs = k.service.activeTxnHolder.getAllTxnID()
}

requestAllocator := k.service.allocatorStateSnapshot()
ctx, cancel := context.WithTimeoutCause(ctx, k.keepLockTableBindInterval, moerr.CauseDoKeepLockTableBind)
defer cancel()
resp, err := k.client.Send(ctx, req)
Expand All @@ -196,6 +197,17 @@ func (k *lockTableKeeper) doKeepLockTableBind(ctx context.Context) {
defer releaseResponse(resp)

if resp.KeepLockTableBind.OK {
if _, accepted := k.service.observeAllocatorStateWithHoldersFromSnapshot(
"keepalive",
allocatorState{
id: resp.KeepLockTableBind.AllocatorID,
version: resp.KeepLockTableBind.AllocatorVersion,
},
requestAllocator,
true,
k.groupTables); !accepted {
return
}
switch resp.KeepLockTableBind.Status {
case pb.Status_ServiceLockEnable:
if !k.service.isStatus(pb.Status_ServiceLockEnable) {
Expand All @@ -219,19 +231,15 @@ func (k *lockTableKeeper) doKeepLockTableBind(ctx context.Context) {
return
}

n := 0
k.groupTables.removeWithFilter(func(_ uint64, v lockTable) bool {
newVersion := k.groupTables.getVersion()
if oldVersion != newVersion {
return false
}
bind := v.getBind()
if bind.ServiceID == k.serviceID {
n++
return true
}
return false
})
n := k.service.handleKeepBindFailed(
k.serviceID,
k.groupTables,
oldVersion,
allocatorState{
id: resp.KeepLockTableBind.AllocatorID,
version: resp.KeepLockTableBind.AllocatorVersion,
},
requestAllocator)

if n > 0 {
// Keep bind receiving an explicit failure means that all the binds of the local
Expand Down
4 changes: 2 additions & 2 deletions pkg/lockservice/lock_table_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func (l *localLockTable) getBind() pb.LockTable {
return l.bind
}

func (l *localLockTable) close() {
func (l *localLockTable) close(reasons ...closeReason) {
l.mu.Lock()
defer l.mu.Unlock()
l.mu.closed = true
Expand All @@ -405,7 +405,7 @@ func (l *localLockTable) close() {
return true
})
l.mu.store.Clear()
logLockTableClosed(l.logger, l.bind, false)
logLockTableClosed(l.logger, l.bind, false, closeReasonOrDefault(reasons))
}

func (l *localLockTable) doAcquireLock(c *lockContext) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/lockservice/lock_table_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ func (lp *localLockTableProxy) getBind() pb.LockTable {
return lp.remote.getBind()
}

func (lp *localLockTableProxy) close() {
lp.remote.close()
func (lp *localLockTableProxy) close(reasons ...closeReason) {
lp.remote.close(reasons...)
}

type sharedOps struct {
Expand Down
40 changes: 34 additions & 6 deletions pkg/lockservice/lock_table_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,25 @@ func (l *remoteLockTable) lock(
cb(pb.Result{}, ErrTxnNotFound)
return
}
if txn.bindChanged {
cb(pb.Result{}, ErrLockTableBindChanged)
return
}

if err == nil {
defer releaseResponse(resp)
if err := l.maybeHandleBindChanged(resp); err != nil {
if resp.NewBind != nil {
txn.Unlock()
err = l.maybeHandleBindChanged(resp)
txn.Lock()
if !bytes.Equal(req.Lock.TxnID, txn.txnID) {
cb(pb.Result{}, ErrTxnNotFound)
return
}
if txn.bindChanged {
cb(pb.Result{}, ErrLockTableBindChanged)
return
}
logRemoteLockFailed(l.logger, txn, rows, opts, l.bind, err)
cb(pb.Result{}, err)
return
Expand All @@ -150,14 +165,27 @@ func (l *remoteLockTable) lock(
return
}

// encounter any error, we also added lock to txn, because we need unlock on remote
// The request may have reached the remote owner and acquired locks even if
// the response was lost or the client-side context timed out. Keep local
// bookkeeping so normal transaction close can send the remote unlock.
_ = txn.lockAdded(l.bind.Group, l.bind, rows, l.logger)
logRemoteLockFailed(l.logger, txn, rows, opts, l.bind, err)
// encounter any error, we need try to check bind is valid.
// And use origin error to return, because once handlerError
// swallows the error, the transaction will not be abort.
originalErr := err
if e := l.handleError(err, true); e != nil {
txn.Unlock()
e := l.handleError(err, true)
txn.Lock()
if !bytes.Equal(req.Lock.TxnID, txn.txnID) {
cb(pb.Result{}, ErrTxnNotFound)
return
}
if txn.bindChanged {
cb(pb.Result{}, ErrLockTableBindChanged)
return
}
if e != nil {
err = e
} else {
// handleError returned nil, meaning bind changed and error was swallowed
Expand Down Expand Up @@ -318,8 +346,8 @@ func (l *remoteLockTable) getBind() pb.LockTable {
return l.bind
}

func (l *remoteLockTable) close() {
logLockTableClosed(l.logger, l.bind, true)
func (l *remoteLockTable) close(reasons ...closeReason) {
logLockTableClosed(l.logger, l.bind, true, closeReasonOrDefault(reasons))
}

func (l *remoteLockTable) handleError(
Expand All @@ -339,7 +367,7 @@ func (l *remoteLockTable) handleError(
// Note. Since the cn where the remote lock table is located may
// be permanently gone, we need to go to the allocator to check if
// the bind is valid.
new, err := getLockTableBind(
new, _, err := getLockTableBind(
l.client,
l.bind.Group,
l.bind.Table,
Expand Down
68 changes: 68 additions & 0 deletions pkg/lockservice/lock_table_remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,74 @@ func TestIssue20747(t *testing.T) {
)
}

func TestLockRemoteWithContextTimeoutTracksLockForUnlock(t *testing.T) {
unlockCalled := make(chan struct{})
runRemoteLockTableTests(
t,
pb.LockTable{ServiceID: "s1"},
func(s Server) {
s.RegisterMethodHandler(
pb.Method_Lock,
func(
ctx context.Context,
cancel context.CancelFunc,
req *pb.Request,
resp *pb.Response,
cs morpc.ClientSession) {
// Simulate a slow or dropped response. The server just waits for
// the client context to expire.
<-ctx.Done()
},
)
s.RegisterMethodHandler(
pb.Method_Unlock,
func(
ctx context.Context,
cancel context.CancelFunc,
req *pb.Request,
resp *pb.Response,
cs morpc.ClientSession) {
close(unlockCalled)
writeResponse(getLogger(""), cancel, resp, nil, cs)
},
)
},
func(l *remoteLockTable, s Server) {
txnID := []byte("txn-timeout")
txn := newActiveTxn(txnID, string(txnID), newFixedSlicePool(32), "")
closed := false
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
txn.Lock()
defer func() {
txn.Unlock()
if !closed {
reuse.Free(txn, nil)
}
}()

l.lock(ctx, txn, [][]byte{{1}}, LockOptions{}, func(r pb.Result, err error) {
require.Error(t, err)
require.True(t, moerr.IsMoErrCode(err, moerr.ErrBackendCannotConnect))
})
holder := txn.getHoldLocksLocked(l.bind.Group)
require.Contains(t, holder.tableKeys, l.bind.Table)
require.Contains(t, holder.tableBinds, l.bind.Table)

require.NoError(t, txn.close(txnID, timestamp.Timestamp{}, func(uint32, uint64) (lockTable, error) {
return l, nil
}, l.logger))
closed = true
select {
case <-unlockCalled:
default:
require.Fail(t, "expected remote unlock")
}
},
func(lt pb.LockTable) {},
)
}

func TestLockRemoteWithNeedUpgrade(t *testing.T) {
runRemoteLockTableTests(
t,
Expand Down
Loading
Loading