diff --git a/docs/release-notes/release-notes-0.21.1.md b/docs/release-notes/release-notes-0.21.1.md index 4741674930..8a243e339b 100644 --- a/docs/release-notes/release-notes-0.21.1.md +++ b/docs/release-notes/release-notes-0.21.1.md @@ -35,6 +35,15 @@ non-SRV record. Non-SRV records are now skipped, and an empty `LookupHost` result for the shim no longer triggers an out-of-bounds index. +- [Fixed on-chain forward interceptor + settlement](https://github.com/lightningnetwork/lnd/pull/10895) after the + incoming channel force closes. Held forwards are now tracked as off-chain or + on-chain entries, allowing an on-chain re-offer to replace the old off-chain + hold so settlement reaches the witness beacon. Go callers of the exported + `htlcswitch.InterceptedPacket` type should use the new `Deadline` field to + distinguish off-chain auto-fail heights from on-chain settlement deadlines, + or `AutoFailHeight()` if they only need the legacy flattened value. + # New Features ## Functional Enhancements @@ -77,4 +86,5 @@ # Contributors (Alphabetical Order) +* Erick Cestari * Ziggie diff --git a/htlcswitch/held_htlc_set.go b/htlcswitch/held_htlc_set.go index c04880dc3d..dd24b99316 100644 --- a/htlcswitch/held_htlc_set.go +++ b/htlcswitch/held_htlc_set.go @@ -5,62 +5,333 @@ import ( "fmt" "github.com/lightningnetwork/lnd/graph/db/models" + "github.com/lightningnetwork/lnd/lnwire" ) -// heldHtlcSet keeps track of outstanding intercepted forwards. It exposes -// several methods to manipulate the underlying map structure in a consistent -// way. +var ( + // ErrCannotResumeOnChain is returned when an on-chain held HTLC is + // resolved with a resume action. + ErrCannotResumeOnChain = errors.New( + "cannot resume held htlc in the on-chain flow", + ) + + // ErrCannotFailOnChain is returned when an on-chain held HTLC is + // resolved with a fail action. + ErrCannotFailOnChain = errors.New( + "cannot fail held htlc in the on-chain flow", + ) + + // errNilHeldForward is returned when a nil forward is used to create a + // held HTLC entry. + errNilHeldForward = errors.New("nil held htlc forward") + + // errInvalidHeldDeadline is returned when a held HTLC has an + // interceptor deadline that is not a positive block height. + errInvalidHeldDeadline = errors.New( + "invalid held htlc interceptor deadline", + ) + + // errInvalidHeldDeadlineType is returned when a held HTLC entry is + // created with the wrong deadline type for its source. + errInvalidHeldDeadlineType = errors.New( + "invalid held htlc interceptor deadline type", + ) +) + +// heldEntry models the behavior of a held HTLC based on whether it is still +// controlled by the off-chain link flow or the on-chain contractcourt flow. +type heldEntry interface { + // interceptedForward returns the forward that should be replayed to the + // external interceptor. + interceptedForward() InterceptedForward + + // resolve applies an interceptor resolution to the held entry. + resolve(*FwdResolution) error + + // expire expires the held entry at the given block height. The boolean + // return value indicates whether the entry should be removed. + expire(height uint32) (bool, error) +} + +// offChainHeld is a held HTLC that is still controlled by the off-chain link +// flow. +type offChainHeld struct { + fwd InterceptedForward + + // autoFailHeight is the block height at which the held off-chain HTLC + // must be failed back to avoid forcing the incoming channel closed. + autoFailHeight uint32 +} + +// Assert that offChainHeld implements heldEntry. +var _ heldEntry = (*offChainHeld)(nil) + +// newOffChainHeld creates a held off-chain HTLC entry and validates that it has +// a positive auto-fail height. +func newOffChainHeld(fwd InterceptedForward) (*offChainHeld, error) { + if fwd == nil { + return nil, errNilHeldForward + } + + autoFailHeight, err := fwd.Packet().Deadline.LeftToSome().UnwrapOrErr( + errInvalidHeldDeadlineType, + ) + if err != nil { + return nil, err + } + + if autoFailHeight <= 0 { + return nil, fmt.Errorf("%w: %v", errInvalidHeldDeadline, + autoFailHeight) + } + + return &offChainHeld{ + fwd: fwd, + autoFailHeight: uint32(autoFailHeight), + }, nil +} + +// interceptedForward returns the intercepted forward backing the off-chain +// entry. +func (h *offChainHeld) interceptedForward() InterceptedForward { + return h.fwd +} + +// release resumes the held off-chain HTLC into the normal link forwarding +// flow. +func (h *offChainHeld) release() error { + return h.fwd.Resume() +} + +// resolve applies an interceptor resolution to the held off-chain HTLC. +func (h *offChainHeld) resolve(res *FwdResolution) error { + switch res.Action { + case FwdActionResume: + return h.fwd.Resume() + + case FwdActionResumeModified: + return h.fwd.ResumeModified( + res.InAmountMsat, res.OutAmountMsat, + res.OutWireCustomRecords, + ) + + case FwdActionSettle: + return h.fwd.Settle(res.Preimage) + + case FwdActionFail: + if len(res.FailureMessage) > 0 { + return h.fwd.Fail(res.FailureMessage) + } + + return h.fwd.FailWithCode(res.FailureCode) + + default: + return fmt.Errorf("unrecognized action %v", res.Action) + } +} + +// expire fails back the held off-chain HTLC once its auto-fail height has been +// reached. +func (h *offChainHeld) expire(height uint32) (bool, error) { + if h.autoFailHeight > height { + return false, nil + } + + err := h.fwd.FailWithCode(lnwire.CodeTemporaryChannelFailure) + if err != nil { + return false, err + } + + return true, nil +} + +// onChainHeld is a held HTLC that is controlled by the on-chain contractcourt +// flow. +type onChainHeld struct { + fwd InterceptedForward + + // settleDeadline is the on-chain HTLC expiry. Once this height is + // reached, the remote party can also sweep the HTLC using the timeout + // path, so any late preimage would race that spend. At that point the + // interceptor entry is pruned locally instead of failed back through + // the link. + settleDeadline uint32 +} + +// Assert that onChainHeld implements heldEntry. +var _ heldEntry = (*onChainHeld)(nil) + +// newOnChainHeld creates a held on-chain HTLC entry and validates that it has a +// positive settlement deadline. +func newOnChainHeld(fwd InterceptedForward) (*onChainHeld, error) { + if fwd == nil { + return nil, errNilHeldForward + } + + settleDeadline, err := fwd.Packet().Deadline.RightToSome().UnwrapOrErr( + errInvalidHeldDeadlineType, + ) + if err != nil { + return nil, err + } + + if settleDeadline <= 0 { + return nil, fmt.Errorf("%w: %v", errInvalidHeldDeadline, + settleDeadline) + } + + return &onChainHeld{ + fwd: fwd, + settleDeadline: uint32(settleDeadline), + }, nil +} + +// interceptedForward returns the intercepted forward backing the on-chain +// entry. +func (h *onChainHeld) interceptedForward() InterceptedForward { + return h.fwd +} + +// resolve applies an interceptor resolution to the held on-chain HTLC. +func (h *onChainHeld) resolve(res *FwdResolution) error { + switch res.Action { + case FwdActionSettle: + return h.fwd.Settle(res.Preimage) + + case FwdActionFail: + return ErrCannotFailOnChain + + case FwdActionResume: + return ErrCannotResumeOnChain + + case FwdActionResumeModified: + return ErrCannotResumeOnChain + + default: + return fmt.Errorf("unrecognized action %v", res.Action) + } +} + +// expire reports whether the held on-chain HTLC should be pruned locally +// because its settlement deadline has been reached. +func (h *onChainHeld) expire(height uint32) (bool, error) { + return h.settleDeadline <= height, nil +} + +// heldHtlcExpireError records an error returned while expiring a held HTLC. +type heldHtlcExpireError struct { + key models.CircuitKey + err error +} + +// heldHtlcReleaseError records an error returned while releasing a held HTLC. +type heldHtlcReleaseError struct { + key models.CircuitKey + err error +} + +// heldHtlcSet keeps track of outstanding intercepted forwards. It models +// whether each forward is still controlled by the off-chain link flow or has +// moved to the on-chain contractcourt flow. type heldHtlcSet struct { - set map[models.CircuitKey]InterceptedForward + set map[models.CircuitKey]heldEntry } func newHeldHtlcSet() *heldHtlcSet { return &heldHtlcSet{ - set: make(map[models.CircuitKey]InterceptedForward), + set: make(map[models.CircuitKey]heldEntry), } } // forEach iterates over all held forwards and calls the given callback for each // of them. func (h *heldHtlcSet) forEach(cb func(InterceptedForward)) { - for _, fwd := range h.set { - cb(fwd) + for _, entry := range h.set { + cb(entry.interceptedForward()) } } -// popAll calls the callback for each forward and removes them from the set. -func (h *heldHtlcSet) popAll(cb func(InterceptedForward)) { - for _, fwd := range h.set { - cb(fwd) +// releaseOffChainHeld releases off-chain entries when the optional interceptor +// disconnects. On-chain entries are kept because there is no link flow to +// resume, preserving the replay/settle handle while contractcourt waits for +// the preimage or on-chain expiry. +func (h *heldHtlcSet) releaseOffChainHeld() []heldHtlcReleaseError { + var errs []heldHtlcReleaseError + + for key, entry := range h.set { + offChain, ok := entry.(*offChainHeld) + if !ok { + continue + } + + if err := offChain.release(); err != nil { + errs = append(errs, heldHtlcReleaseError{ + key: key, + err: err, + }) + + // Keep the entry tracked so it can still be resolved or + // failed back by the normal expiry path. + continue + } + + delete(h.set, key) } - h.set = make(map[models.CircuitKey]InterceptedForward) + return errs +} + +// removeOnChain removes an on-chain held entry by circuit key. Off-chain +// entries are left untouched because their lifecycle is owned by the link flow, +// not contractcourt. +func (h *heldHtlcSet) removeOnChain(key models.CircuitKey) bool { + if _, ok := h.set[key].(*onChainHeld); !ok { + return false + } + + delete(h.set, key) + + return true } -// popAutoFails calls the callback for each forward that has an auto-fail height -// equal or less then the specified pop height and removes them from the set. -func (h *heldHtlcSet) popAutoFails(height uint32, cb func(InterceptedForward)) { - for key, fwd := range h.set { - if uint32(fwd.Packet().AutoFailHeight) > height { +// expire expires held forwards whose deadline has passed. +func (h *heldHtlcSet) expire(height uint32) []heldHtlcExpireError { + var errs []heldHtlcExpireError + + for key, entry := range h.set { + remove, err := entry.expire(height) + if err != nil { + errs = append(errs, heldHtlcExpireError{ + key: key, + err: err, + }) + continue } - cb(fwd) - - delete(h.set, key) + if remove { + delete(h.set, key) + } } + + return errs } -// pop returns the specified forward and removes it from the set. -func (h *heldHtlcSet) pop(key models.CircuitKey) (InterceptedForward, error) { - intercepted, ok := h.set[key] +// resolve applies the given resolution and removes the forward from the set if +// the resolution succeeds. +func (h *heldHtlcSet) resolve(res *FwdResolution) error { + entry, ok := h.set[res.Key] if !ok { - return nil, fmt.Errorf("fwd %v not found", key) + return fmt.Errorf("%w: %v", ErrFwdNotExists, res.Key) } - delete(h.set, key) + if err := entry.resolve(res); err != nil { + return err + } - return intercepted, nil + delete(h.set, res.Key) + + return nil } // exists tests whether the specified forward is part of the set. @@ -70,20 +341,53 @@ func (h *heldHtlcSet) exists(key models.CircuitKey) bool { return ok } -// push adds the specified forward to the set. An error is returned if the -// forward exists already. -func (h *heldHtlcSet) push(key models.CircuitKey, - fwd InterceptedForward) error { - +// addOffChain adds an off-chain forward to the set. Existing forwards are kept +// because replayed packets may already be held. +func (h *heldHtlcSet) addOffChain(fwd InterceptedForward) error { if fwd == nil { - return errors.New("nil fwd pushed") + return errNilHeldForward } + key := fwd.Packet().IncomingCircuit if h.exists(key) { - return errors.New("htlc already exists in set") + return nil + } + + entry, err := newOffChainHeld(fwd) + if err != nil { + return err + } + + h.set[key] = entry + + return nil +} + +// addOnChain adds an on-chain forward to the set. If the same HTLC is currently +// held off-chain, it is replaced so future resolutions go to the witness beacon +// instead of the old link mailbox path. +func (h *heldHtlcSet) addOnChain(fwd InterceptedForward) error { + if fwd == nil { + return errNilHeldForward + } + + key := fwd.Packet().IncomingCircuit + + if _, ok := h.set[key].(*onChainHeld); ok { + return nil + } + + if _, ok := h.set[key].(*offChainHeld); ok { + log.Infof("Promoting held htlc %v from off-chain to "+ + "on-chain resolution", key) + } + + entry, err := newOnChainHeld(fwd) + if err != nil { + return err } - h.set[key] = fwd + h.set[key] = entry return nil } diff --git a/htlcswitch/held_htlc_set_test.go b/htlcswitch/held_htlc_set_test.go index ca1a1750bc..15d541128f 100644 --- a/htlcswitch/held_htlc_set_test.go +++ b/htlcswitch/held_htlc_set_test.go @@ -1,126 +1,513 @@ package htlcswitch import ( + "errors" "testing" + "time" + "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/fn/v2" "github.com/lightningnetwork/lnd/graph/db/models" + lntestmock "github.com/lightningnetwork/lnd/lntest/mock" + "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwire" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) -func TestHeldHtlcSetEmpty(t *testing.T) { - set := newHeldHtlcSet() +var errTestForward = errors.New("test forward error") - // Test operations on an empty set. - require.False(t, set.exists(models.CircuitKey{})) +// mockInterceptedForward is an InterceptedForward test double that records +// resolution calls and returns configured errors. +type mockInterceptedForward struct { + mock.Mock + + packet InterceptedPacket +} - _, err := set.pop(models.CircuitKey{}) - require.Error(t, err) +// newMockInterceptedForward creates a mock intercepted forward with the given +// circuit key and auto-fail deadline. +func newMockInterceptedForward(key models.CircuitKey, + deadline int32) *mockInterceptedForward { - set.popAll( - func(_ InterceptedForward) { - require.Fail(t, "unexpected fwd") + return &mockInterceptedForward{ + packet: InterceptedPacket{ + IncomingCircuit: key, + Deadline: fn.NewLeft[ + OffChainAutoFailHeight, OnChainSettleDeadline, + ](OffChainAutoFailHeight(deadline)), }, - ) + } } -func TestHeldHtlcSet(t *testing.T) { - set := newHeldHtlcSet() +// newMockOnChainInterceptedForward creates a mock on-chain intercepted forward +// with the given circuit key and settlement deadline. +func newMockOnChainInterceptedForward(key models.CircuitKey, + deadline int32) *mockInterceptedForward { + + return &mockInterceptedForward{ + packet: InterceptedPacket{ + IncomingCircuit: key, + Deadline: fn.NewRight[ + OffChainAutoFailHeight, OnChainSettleDeadline, + ](OnChainSettleDeadline(deadline)), + }, + } +} + +// Packet returns the intercepted packet represented by the mock. +func (m *mockInterceptedForward) Packet() InterceptedPacket { + return m.packet +} + +// Resume records a resume call and returns the configured return error. +func (m *mockInterceptedForward) Resume() error { + args := m.Called() + + return args.Error(0) +} - key := models.CircuitKey{ +// ResumeModified records a modified resume call and returns the configured +// return error. +func (m *mockInterceptedForward) ResumeModified( + _ fn.Option[lnwire.MilliSatoshi], + _ fn.Option[lnwire.MilliSatoshi], + _ fn.Option[lnwire.CustomRecords]) error { + + args := m.Called() + + return args.Error(0) +} + +// Settle records a settle call and returns the configured return error. +func (m *mockInterceptedForward) Settle(preimage lntypes.Preimage) error { + args := m.Called(preimage) + + return args.Error(0) +} + +// Fail records an encrypted failure call and returns the configured return +// error. +func (m *mockInterceptedForward) Fail(reason []byte) error { + args := m.Called(reason) + + return args.Error(0) +} + +// FailWithCode records a failure-code call and returns the configured +// return error. +func (m *mockInterceptedForward) FailWithCode(code lnwire.FailCode) error { + args := m.Called(code) + + return args.Error(0) +} + +// testCircuitKey returns a stable circuit key for held HTLC set tests. +func testCircuitKey() models.CircuitKey { + return models.CircuitKey{ ChanID: lnwire.NewShortChanIDFromInt(1), HtlcID: 2, } +} + +// TestHeldHtlcSetEmpty verifies empty held HTLC set behavior. +func TestHeldHtlcSetEmpty(t *testing.T) { + set := newHeldHtlcSet() - // Test pushing a nil forward. - require.Error(t, set.push(key, nil)) + require.False(t, set.exists(models.CircuitKey{})) + require.ErrorIs(t, set.resolve(&FwdResolution{}), ErrFwdNotExists) + require.Empty(t, set.releaseOffChainHeld()) +} - // Test pushing a forward. - fwd := &interceptedForward{ - htlc: &lnwire.UpdateAddHTLC{}, - } - require.NoError(t, set.push(key, fwd)) +// TestHeldHtlcSetRejectsInvalidDeadline verifies invalid deadlines are +// rejected for both off-chain and on-chain held entries. +func TestHeldHtlcSetRejectsInvalidDeadline(t *testing.T) { + set := newHeldHtlcSet() + key := testCircuitKey() - // Re-pushing should fail. - require.Error(t, set.push(key, fwd)) + require.Error(t, set.addOffChain(newMockInterceptedForward(key, 0))) + require.Error(t, set.addOffChain(newMockInterceptedForward(key, -1))) + require.Error(t, set.addOffChain( + newMockOnChainInterceptedForward(key, 100), + )) + require.Error(t, set.addOnChain( + newMockOnChainInterceptedForward(key, 0), + )) + require.Error(t, set.addOnChain( + newMockOnChainInterceptedForward(key, -1), + )) + require.Error(t, set.addOnChain(newMockInterceptedForward(key, 100))) +} - // Test popping the fwd. - poppedFwd, err := set.pop(key) - require.NoError(t, err) - require.Equal(t, fwd, poppedFwd) +// TestHeldHtlcSetOffChainResolve verifies off-chain resolutions call through +// to the backing intercepted forward and remove the held entry. +func TestHeldHtlcSetOffChainResolve(t *testing.T) { + set := newHeldHtlcSet() + key := testCircuitKey() + fwd := newMockInterceptedForward(key, 100) + fwd.On("Resume").Return(nil).Once() - _, err = set.pop(key) - require.Error(t, err) + require.NoError(t, set.addOffChain(fwd)) + require.NoError(t, set.resolve(&FwdResolution{ + Key: key, + Action: FwdActionResume, + })) + fwd.AssertExpectations(t) + require.False(t, set.exists(key)) +} - // Pushing the forward again. - require.NoError(t, set.push(key, fwd)) +// TestHeldHtlcSetResolveKeepsEntryOnError verifies failed resolutions keep the +// held entry available for retry. +func TestHeldHtlcSetResolveKeepsEntryOnError(t *testing.T) { + set := newHeldHtlcSet() + key := testCircuitKey() + fwd := newMockInterceptedForward(key, 100) + fwd.On("Settle", lntypes.Preimage{}).Return(errTestForward).Once() - // Test for each. - var cbCalled bool - set.forEach(func(_ InterceptedForward) { - cbCalled = true + require.NoError(t, set.addOffChain(fwd)) + require.ErrorIs(t, set.resolve(&FwdResolution{ + Key: key, + Action: FwdActionSettle, + }), errTestForward) - require.Equal(t, fwd, poppedFwd) - }) - require.True(t, cbCalled) + require.True(t, set.exists(key)) + fwd.AssertExpectations(t) +} - // Test popping all forwards. - cbCalled = false - set.popAll( - func(_ InterceptedForward) { - cbCalled = true +// TestHeldHtlcSetReleaseOffChainHeld verifies an optional interceptor +// disconnect resumes off-chain entries and clears them from the set. +func TestHeldHtlcSetReleaseOffChainHeld(t *testing.T) { + set := newHeldHtlcSet() + key := testCircuitKey() + fwd := newMockInterceptedForward(key, 100) + fwd.On("Resume").Return(nil).Once() - require.Equal(t, fwd, poppedFwd) - }, - ) - require.True(t, cbCalled) + require.NoError(t, set.addOffChain(fwd)) + require.Empty(t, set.releaseOffChainHeld()) + require.False(t, set.exists(key)) + fwd.AssertExpectations(t) +} + +// TestHeldHtlcSetReleaseOffChainHeldKeepsOnChain verifies an optional +// interceptor disconnect keeps on-chain entries available for replay if the +// interceptor reconnects before expiry. +func TestHeldHtlcSetReleaseOffChainHeldKeepsOnChain(t *testing.T) { + set := newHeldHtlcSet() + key := testCircuitKey() + fwd := newMockOnChainInterceptedForward(key, 100) - _, err = set.pop(key) - require.Error(t, err) + require.NoError(t, set.addOnChain(fwd)) + require.Empty(t, set.releaseOffChainHeld()) + require.True(t, set.exists(key)) + fwd.AssertNotCalled(t, "Resume") } -func TestHeldHtlcSetAutoFails(t *testing.T) { +// TestHeldHtlcSetReleaseOffChainHeldKeepsReleaseErrors verifies release errors +// leave off-chain entries available for later resolution or expiry. +func TestHeldHtlcSetReleaseOffChainHeldKeepsReleaseErrors(t *testing.T) { set := newHeldHtlcSet() + key := testCircuitKey() + fwd := newMockInterceptedForward(key, 100) + fwd.On("Resume").Return(errTestForward).Once() - key := models.CircuitKey{ - ChanID: lnwire.NewShortChanIDFromInt(1), - HtlcID: 2, + require.NoError(t, set.addOffChain(fwd)) + + errs := set.releaseOffChainHeld() + require.Len(t, errs, 1) + require.Equal(t, key, errs[0].key) + require.ErrorIs(t, errs[0].err, errTestForward) + require.True(t, set.exists(key)) + fwd.AssertExpectations(t) +} + +// TestHeldHtlcSetRemoveOnChain verifies contractcourt teardown only removes +// on-chain entries. +func TestHeldHtlcSetRemoveOnChain(t *testing.T) { + set := newHeldHtlcSet() + key := testCircuitKey() + + offChain := newMockInterceptedForward(key, 100) + require.NoError(t, set.addOffChain(offChain)) + require.False(t, set.removeOnChain(key)) + require.True(t, set.exists(key)) + + onChain := newMockOnChainInterceptedForward(key, 100) + require.NoError(t, set.addOnChain(onChain)) + require.True(t, set.removeOnChain(key)) + require.False(t, set.exists(key)) + require.False(t, set.removeOnChain(key)) +} + +// TestHeldHtlcSetOffChainExpire verifies off-chain expiry fails the HTLC back. +func TestHeldHtlcSetOffChainExpire(t *testing.T) { + set := newHeldHtlcSet() + key := testCircuitKey() + fwd := newMockInterceptedForward(key, 100) + + require.NoError(t, set.addOffChain(fwd)) + + require.Empty(t, set.expire(99)) + require.True(t, set.exists(key)) + fwd.AssertNotCalled(t, "FailWithCode", mock.Anything) + + fwd.On( + "FailWithCode", + lnwire.CodeTemporaryChannelFailure, + ).Return(nil).Once() + require.Empty(t, set.expire(100)) + require.False(t, set.exists(key)) + fwd.AssertExpectations(t) +} + +// TestHeldHtlcSetOffChainExpireKeepsEntryOnError verifies expiry errors keep +// the off-chain entry available for retry. +func TestHeldHtlcSetOffChainExpireKeepsEntryOnError(t *testing.T) { + set := newHeldHtlcSet() + key := testCircuitKey() + fwd := newMockInterceptedForward(key, 100) + fwd.On( + "FailWithCode", lnwire.CodeTemporaryChannelFailure, + ).Return(errTestForward).Once() + + require.NoError(t, set.addOffChain(fwd)) + + errs := set.expire(100) + require.Len(t, errs, 1) + require.ErrorIs(t, errs[0].err, errTestForward) + require.Equal(t, key, errs[0].key) + require.True(t, set.exists(key)) + fwd.AssertExpectations(t) +} + +// TestHeldHtlcSetOnChainResolve verifies on-chain entries reject non-settle +// resolutions directly and remain held until settlement. +func TestHeldHtlcSetOnChainResolve(t *testing.T) { + set := newHeldHtlcSet() + key := testCircuitKey() + fwd := newMockOnChainInterceptedForward(key, 100) + fwd.On("Settle", lntypes.Preimage{}).Return(nil).Once() + + require.NoError(t, set.addOnChain(fwd)) + require.ErrorIs(t, set.resolve(&FwdResolution{ + Key: key, + Action: FwdActionFail, + FailureCode: lnwire.CodeTemporaryChannelFailure, + }), ErrCannotFailOnChain) + require.True(t, set.exists(key)) + + require.ErrorIs(t, set.resolve(&FwdResolution{ + Key: key, + Action: FwdActionResume, + }), ErrCannotResumeOnChain) + require.True(t, set.exists(key)) + + require.ErrorIs(t, set.resolve(&FwdResolution{ + Key: key, + Action: FwdActionResumeModified, + }), ErrCannotResumeOnChain) + require.True(t, set.exists(key)) + + require.NoError(t, set.resolve(&FwdResolution{ + Key: key, + Action: FwdActionSettle, + })) + fwd.AssertExpectations(t) + fwd.AssertNotCalled(t, "Fail", mock.Anything) + fwd.AssertNotCalled(t, "FailWithCode", mock.Anything) + fwd.AssertNotCalled(t, "Resume") + fwd.AssertNotCalled(t, "ResumeModified", mock.Anything, mock.Anything, + mock.Anything) + require.False(t, set.exists(key)) +} + +// TestHeldHtlcSetOnChainExpirePrunes verifies on-chain expiry only prunes the +// local held entry. +func TestHeldHtlcSetOnChainExpirePrunes(t *testing.T) { + set := newHeldHtlcSet() + key := testCircuitKey() + fwd := newMockOnChainInterceptedForward(key, 100) + + require.NoError(t, set.addOnChain(fwd)) + + require.Empty(t, set.expire(99)) + require.True(t, set.exists(key)) + + require.Empty(t, set.expire(100)) + require.False(t, set.exists(key)) + fwd.AssertNotCalled(t, "FailWithCode", mock.Anything) +} + +// TestHeldHtlcSetOnChainReplacesOffChain verifies on-chain entries replace +// earlier off-chain entries with the same circuit key. +func TestHeldHtlcSetOnChainReplacesOffChain(t *testing.T) { + set := newHeldHtlcSet() + key := testCircuitKey() + offChain := newMockInterceptedForward(key, 100) + onChain := newMockOnChainInterceptedForward(key, 100) + onChain.On("Settle", lntypes.Preimage{}).Return(nil).Once() + + require.NoError(t, set.addOffChain(offChain)) + require.NoError(t, set.addOnChain(onChain)) + + require.NoError(t, set.resolve(&FwdResolution{ + Key: key, + Action: FwdActionSettle, + })) + + offChain.AssertNotCalled(t, "Settle", mock.Anything) + onChain.AssertExpectations(t) +} + +// TestInterceptableSwitchForwardOnChain verifies on-chain intercept handling +// for fresh and already-held HTLCs. +func TestInterceptableSwitchForwardOnChain(t *testing.T) { + key := testCircuitKey() + + var intercepted []InterceptedPacket + interceptor := func(packet InterceptedPacket) error { + intercepted = append(intercepted, packet) + + return nil + } + + t.Run("fresh on-chain htlc is sent", func(t *testing.T) { + s := &InterceptableSwitch{ + heldHtlcSet: newHeldHtlcSet(), + interceptor: interceptor, + } + fwd := newMockOnChainInterceptedForward(key, 100) + + require.NoError(t, s.interceptOnChain(fwd)) + require.Len(t, intercepted, 1) + require.Equal(t, key, intercepted[0].IncomingCircuit) + }) + + t.Run("on-chain htlc replaces off-chain htlc", func(t *testing.T) { + intercepted = nil + s := &InterceptableSwitch{ + heldHtlcSet: newHeldHtlcSet(), + interceptor: interceptor, + } + offChain := newMockInterceptedForward(key, 100) + onChain := newMockOnChainInterceptedForward(key, 100) + onChain.On("Settle", lntypes.Preimage{}).Return(nil).Once() + + require.NoError(t, s.heldHtlcSet.addOffChain(offChain)) + require.NoError(t, s.interceptOnChain(onChain)) + require.Empty(t, intercepted) + + require.NoError(t, s.resolve(&FwdResolution{ + Key: key, + Action: FwdActionSettle, + })) + offChain.AssertNotCalled(t, "Settle", mock.Anything) + onChain.AssertExpectations(t) + }) + + t.Run("on-chain htlc replays after disconnect", func(t *testing.T) { + intercepted = nil + s := &InterceptableSwitch{ + heldHtlcSet: newHeldHtlcSet(), + interceptor: interceptor, + } + fwd := newMockOnChainInterceptedForward(key, 100) + fwd.On("Settle", lntypes.Preimage{}).Return(nil).Once() + + require.NoError(t, s.interceptOnChain(fwd)) + require.Len(t, intercepted, 1) + + s.setInterceptor(nil) + require.True(t, s.heldHtlcSet.exists(key)) + + intercepted = nil + s.setInterceptor(interceptor) + require.Len(t, intercepted, 1) + require.Equal(t, key, intercepted[0].IncomingCircuit) + + require.NoError(t, s.resolve(&FwdResolution{ + Key: key, + Action: FwdActionSettle, + })) + fwd.AssertExpectations(t) + require.False(t, s.heldHtlcSet.exists(key)) + }) + + t.Run("on-chain htlc removed after teardown", func(t *testing.T) { + intercepted = nil + s := &InterceptableSwitch{ + heldHtlcSet: newHeldHtlcSet(), + interceptor: interceptor, + } + fwd := newMockOnChainInterceptedForward(key, 100) + + require.NoError(t, s.interceptOnChain(fwd)) + require.Len(t, intercepted, 1) + + s.removeOnChainIntercept(key) + require.False(t, s.heldHtlcSet.exists(key)) + + intercepted = nil + s.setInterceptor(interceptor) + require.Empty(t, intercepted) + }) +} + +// TestInterceptableSwitchRemoveOnChainIntercept verifies that the public +// teardown path removes an on-chain hold through the switch run loop. +func TestInterceptableSwitchRemoveOnChainIntercept(t *testing.T) { + notifier := &lntestmock.ChainNotifier{ + EpochChan: make(chan *chainntnfs.BlockEpoch, 1), } + notifier.EpochChan <- &chainntnfs.BlockEpoch{Height: 1} - const autoFailHeight = 100 - fwd := &interceptedForward{ - packet: &htlcPacket{}, - htlc: &lnwire.UpdateAddHTLC{}, - autoFailHeight: autoFailHeight, + s, err := NewInterceptableSwitch(&InterceptableSwitchConfig{ + Notifier: notifier, + CltvRejectDelta: 10, + CltvInterceptDelta: 13, + }) + require.NoError(t, err) + require.NoError(t, s.Start()) + defer func() { + require.NoError(t, s.Stop()) + }() + + intercepted := make(chan InterceptedPacket, 2) + s.SetInterceptor(func(packet InterceptedPacket) error { + intercepted <- packet + + return nil + }) + + key := testCircuitKey() + require.NoError(t, s.ForwardPacket( + newMockOnChainInterceptedForward(key, 100), + )) + select { + case packet := <-intercepted: + require.Equal(t, key, packet.IncomingCircuit) + + case <-time.After(time.Second): + require.Fail(t, "on-chain hold not intercepted") } - require.NoError(t, set.push(key, fwd)) - - // Test popping auto fails up to one block before the auto-fail height - // of our forward. - set.popAutoFails( - autoFailHeight-1, - func(_ InterceptedForward) { - require.Fail(t, "unexpected fwd") - }, - ) - // Popping succeeds at the auto-fail height. - cbCalled := false - set.popAutoFails( - autoFailHeight, - func(poppedFwd InterceptedForward) { - cbCalled = true + require.NoError(t, s.RemoveOnChainIntercept(key)) - require.Equal(t, fwd, poppedFwd) - }, - ) - require.True(t, cbCalled) - - // After this, there should be nothing more to pop. - set.popAutoFails( - autoFailHeight, - func(_ InterceptedForward) { - require.Fail(t, "unexpected fwd") - }, - ) + // Re-registering the interceptor replays all currently held HTLCs. + // The removed on-chain hold should not be replayed. + s.SetInterceptor(func(packet InterceptedPacket) error { + intercepted <- packet + + return nil + }) + + // Synchronize with the switch event loop so any replay triggered by the + // interceptor registration above has already run. + require.NoError(t, s.RemoveOnChainIntercept(models.CircuitKey{})) + + select { + case packet := <-intercepted: + require.Failf(t, "unexpected replay", "packet=%v", packet) + + default: + } } diff --git a/htlcswitch/interceptable_switch.go b/htlcswitch/interceptable_switch.go index 3d0bd90ed4..df1d30c45c 100644 --- a/htlcswitch/interceptable_switch.go +++ b/htlcswitch/interceptable_switch.go @@ -52,6 +52,10 @@ type InterceptableSwitch struct { onchainIntercepted chan InterceptedForward + // onchainInterceptDone receives circuit keys for on-chain intercepted + // forwards whose contractcourt resolver has finished. + onchainInterceptDone chan models.CircuitKey + // interceptorRegistration is a channel that we use to synchronize // client connect and disconnect. interceptorRegistration chan ForwardInterceptor @@ -196,6 +200,7 @@ func NewInterceptableSwitch(cfg *InterceptableSwitchConfig) ( htlcSwitch: cfg.Switch, intercepted: make(chan *interceptedPackets), onchainIntercepted: make(chan InterceptedForward), + onchainInterceptDone: make(chan models.CircuitKey), interceptorRegistration: make(chan ForwardInterceptor), heldHtlcSet: newHeldHtlcSet(), resolutionChan: make(chan *fwdResolution), @@ -318,17 +323,14 @@ func (s *InterceptableSwitch) run() error { } case fwd := <-s.onchainIntercepted: - // For on-chain interceptions, we don't know if it has - // already been offered before. This information is in - // the forwarding package which isn't easily accessible - // from contractcourt. It is likely though that it was - // already intercepted in the off-chain flow. And even - // if not, it is safe to signal replay so that we won't - // unexpectedly skip over this htlc. - if _, err := s.forward(fwd, true); err != nil { - return err + if err := s.interceptOnChain(fwd); err != nil { + log.Errorf("Cannot hold on-chain intercepted "+ + "htlc: %v", err) } + case key := <-s.onchainInterceptDone: + s.removeOnChainIntercept(key) + case res := <-s.resolutionChan: res.errChan <- s.resolve(res.resolution) @@ -339,8 +341,10 @@ func (s *InterceptableSwitch) run() error { s.currentHeight = currentBlock.Height - // A new block is appended. Fail any held htlcs that - // expire at this height to prevent channel force-close. + // A new block is appended. Expire any held HTLCs whose + // deadline has passed. Off-chain HTLCs fail back, while + // on-chain HTLCs are only pruned from the local hold + // set. s.failExpiredHtlcs() case <-s.quit: @@ -350,17 +354,11 @@ func (s *InterceptableSwitch) run() error { } func (s *InterceptableSwitch) failExpiredHtlcs() { - s.heldHtlcSet.popAutoFails( - uint32(s.currentHeight), - func(fwd InterceptedForward) { - err := fwd.FailWithCode( - lnwire.CodeTemporaryChannelFailure, - ) - if err != nil { - log.Errorf("Cannot fail packet: %v", err) - } - }, - ) + errs := s.heldHtlcSet.expire(uint32(s.currentHeight)) + for _, expireErr := range errs { + log.Errorf("Cannot expire held htlc %v: %v", expireErr.key, + expireErr.err) + } } func (s *InterceptableSwitch) sendForward(fwd InterceptedForward) { @@ -394,48 +392,20 @@ func (s *InterceptableSwitch) setInterceptor(interceptor ForwardInterceptor) { return } - // Interceptor is not required. Release held forwards. + // Interceptor is not required. Release off-chain held forwards. log.Infof("Interceptor disconnected, resolving held packets") - s.heldHtlcSet.popAll(func(fwd InterceptedForward) { - err := fwd.Resume() - if err != nil { - log.Errorf("Failed to resume hold forward %v", err) - } - }) + errs := s.heldHtlcSet.releaseOffChainHeld() + for _, releaseErr := range errs { + log.Errorf("Failed to resume hold forward %v: %v", + releaseErr.key, releaseErr.err) + } } // resolve processes a HTLC given the resolution type specified by the // intercepting client. func (s *InterceptableSwitch) resolve(res *FwdResolution) error { - intercepted, err := s.heldHtlcSet.pop(res.Key) - if err != nil { - return err - } - - switch res.Action { - case FwdActionResume: - return intercepted.Resume() - - case FwdActionResumeModified: - return intercepted.ResumeModified( - res.InAmountMsat, res.OutAmountMsat, - res.OutWireCustomRecords, - ) - - case FwdActionSettle: - return intercepted.Settle(res.Preimage) - - case FwdActionFail: - if len(res.FailureMessage) > 0 { - return intercepted.Fail(res.FailureMessage) - } - - return intercepted.FailWithCode(res.FailureCode) - - default: - return fmt.Errorf("unrecognized action %v", res.Action) - } + return s.heldHtlcSet.resolve(res) } // Resolve resolves an intercepted packet. @@ -501,6 +471,21 @@ func (s *InterceptableSwitch) ForwardPacket( return nil } +// RemoveOnChainIntercept removes an on-chain intercepted forward from the held +// set once its contractcourt resolver has finished. +func (s *InterceptableSwitch) RemoveOnChainIntercept( + key models.CircuitKey) error { + + select { + case s.onchainInterceptDone <- key: + + case <-s.quit: + return errors.New("interceptable switch quit") + } + + return nil +} + // interceptForward forwards the packet to the external interceptor after // checking the interception criteria. func (s *InterceptableSwitch) interceptForward(packet *htlcPacket, @@ -542,15 +527,16 @@ func (s *InterceptableSwitch) interceptForward(packet *htlcPacket, return true, nil } - return s.forward(intercepted, isReplay) + return s.forwardOffChain(intercepted, isReplay) default: return false, nil } } -// forward records the intercepted htlc and forwards it to the interceptor. -func (s *InterceptableSwitch) forward( +// forwardOffChain records an off-chain intercepted htlc and forwards it to the +// interceptor if needed. +func (s *InterceptableSwitch) forwardOffChain( fwd InterceptedForward, isReplay bool) (bool, error) { inKey := fwd.Packet().IncomingCircuit @@ -585,16 +571,16 @@ func (s *InterceptableSwitch) forward( // This packet is a replay. It is not safe to fail back, because the // interceptor may still signal otherwise upon reconnect. Keep the // packet in the queue until then. - if err := s.heldHtlcSet.push(inKey, fwd); err != nil { + if err := s.heldHtlcSet.addOffChain(fwd); err != nil { return false, err } return true, nil } - // There is an interceptor registered. We can forward the packet right now. - // Hold it in the queue too to track what is outstanding. - if err := s.heldHtlcSet.push(inKey, fwd); err != nil { + // There is an interceptor registered. We can notify it right now. Hold + // the packet in the queue too to track what is outstanding. + if err := s.heldHtlcSet.addOffChain(fwd); err != nil { return false, err } @@ -603,6 +589,44 @@ func (s *InterceptableSwitch) forward( return true, nil } +// interceptOnChain records an on-chain intercepted htlc. This doesn't resume or +// forward the htlc through the link. If this HTLC is not already held, the +// interceptor is notified so the client can settle it. If it is already held +// off-chain, we replace the stored entry so future resolutions go to the +// on-chain witness beacon path, and skip re-notifying because the client still +// has the circuit key from the existing hold. In that case, the client may +// still have the original off-chain auto-fail deadline, which can be stale once +// the HTLC has moved on-chain and the useful settle window is bounded by the +// on-chain timeout instead. +// +// Note that the skip only applies while that hold is still present: if the +// off-chain hold was already released or auto-failed before the channel went +// on-chain, wasHeld is false and the client is notified again for the same +// circuit key. Interceptor clients must therefore treat offers as idempotent +// per circuit key (reconnect replays already require this). +func (s *InterceptableSwitch) interceptOnChain(fwd InterceptedForward) error { + inKey := fwd.Packet().IncomingCircuit + wasHeld := s.heldHtlcSet.exists(inKey) + + if err := s.heldHtlcSet.addOnChain(fwd); err != nil { + return err + } + + if s.interceptor != nil && !wasHeld { + s.sendForward(fwd) + } + + return nil +} + +// removeOnChainIntercept removes an on-chain held HTLC after contractcourt no +// longer needs the interceptor replay handle. +func (s *InterceptableSwitch) removeOnChainIntercept(key models.CircuitKey) { + if s.heldHtlcSet.removeOnChain(key) { + log.Debugf("Removed on-chain held htlc %v", key) + } +} + // handleExpired checks that the htlc isn't too close to the channel // force-close broadcast height. If it is, it is cancelled back. func (s *InterceptableSwitch) handleExpired(fwd *interceptedForward) ( @@ -654,8 +678,10 @@ func (f *interceptedForward) Packet() InterceptedPacket { IncomingExpiry: f.packet.incomingTimeout, InOnionCustomRecords: f.packet.inOnionCustomRecords, OnionBlob: f.htlc.OnionBlob, - AutoFailHeight: f.autoFailHeight, - InWireCustomRecords: f.packet.inWireCustomRecords, + Deadline: fn.NewLeft[ + OffChainAutoFailHeight, OnChainSettleDeadline, + ](OffChainAutoFailHeight(f.autoFailHeight)), + InWireCustomRecords: f.packet.inWireCustomRecords, } } diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go index 75974a748b..53d1c599e3 100644 --- a/htlcswitch/interfaces.go +++ b/htlcswitch/interfaces.go @@ -419,9 +419,34 @@ type InterceptedPacket struct { // were defined by the peer that forwarded this HTLC to us. InWireCustomRecords lnwire.CustomRecords - // AutoFailHeight is the block height at which this intercept will be - // failed back automatically. - AutoFailHeight int32 + // Deadline describes how long this intercepted HTLC remains actionable. + // Off-chain forwards are auto-failed at this height, while on-chain + // forwards can be settled until this height. + Deadline fn.Either[OffChainAutoFailHeight, OnChainSettleDeadline] +} + +// OffChainAutoFailHeight is the block height at which an off-chain intercepted +// HTLC will be failed back automatically to prevent the incoming channel from +// force-closing. +type OffChainAutoFailHeight int32 + +// OnChainSettleDeadline is the block height until which an on-chain +// intercepted HTLC can be settled before the timeout path becomes available. +type OnChainSettleDeadline int32 + +// AutoFailHeight returns the legacy RPC auto_fail_height projection for an +// intercepted packet. For on-chain packets, the value is the settlement +// deadline exposed through the existing RPC field for compatibility. +func (p InterceptedPacket) AutoFailHeight() int32 { + return fn.ElimEither( + p.Deadline, + func(h OffChainAutoFailHeight) int32 { + return int32(h) + }, + func(d OnChainSettleDeadline) int32 { + return int32(d) + }, + ) } // InterceptedForward is passed to the ForwardInterceptor for every forwarded diff --git a/htlcswitch/switch_test.go b/htlcswitch/switch_test.go index 2f66fedb34..1584f874eb 100644 --- a/htlcswitch/switch_test.go +++ b/htlcswitch/switch_test.go @@ -4210,7 +4210,7 @@ func TestInterceptableSwitchWatchDog(t *testing.T) { require.Equal(t, int32(packet.incomingTimeout-c.cltvRejectDelta), - intercepted.AutoFailHeight, + intercepted.AutoFailHeight(), ) // Htlc expires before a resolution from the interceptor. diff --git a/itest/list_on_test.go b/itest/list_on_test.go index d5b3d5051d..54b203ff97 100644 --- a/itest/list_on_test.go +++ b/itest/list_on_test.go @@ -455,6 +455,14 @@ var allTestCases = []*lntest.TestCase{ Name: "forward interceptor restart", TestFunc: testForwardInterceptorRestart, }, + { + Name: "forward interceptor on chain settle after restart", + TestFunc: testForwardInterceptorOnChainSettleAfterRestart, + }, + { + Name: "forward interceptor on chain settle no restart", + TestFunc: testForwardInterceptorOnChainSettleNoRestart, + }, { Name: "delete forwarding history", TestFunc: testDeleteForwardingHistory, diff --git a/itest/lnd_forward_interceptor_test.go b/itest/lnd_forward_interceptor_test.go index 07244295fe..7391d4178e 100644 --- a/itest/lnd_forward_interceptor_test.go +++ b/itest/lnd_forward_interceptor_test.go @@ -484,6 +484,172 @@ func testForwardInterceptorRestart(ht *lntest.HarnessTest) { ) } +// testForwardInterceptorOnChainSettleAfterRestart tests that an HTLC offered +// to the interceptor by the on-chain resolver remains settleable after a new +// block is mined. This reproduces the incident path where Bob restarted after +// the force-close, so only the on-chain interceptor entry exists. +func testForwardInterceptorOnChainSettleAfterRestart(ht *lntest.HarnessTest) { + const ( + chanAmt = btcutil.Amount(300000) + invoiceAmt = int64(100000) + ) + + // Bob requires an interceptor so the forwarded HTLC remains held until + // the test explicitly resolves it. + p := lntest.OpenChannelParams{Amt: chanAmt} + cfgs := [][]string{nil, {"--requireinterceptor"}, nil} + chanPoints, nodes := ht.CreateSimpleNetwork(cfgs, p) + alice, bob, carol := nodes[0], nodes[1], nodes[2] + cpAB := chanPoints[0] + + // Fund Bob so he can publish the on-chain HTLC success sweep once the + // interceptor supplies the preimage. + ht.FundCoins(btcutil.SatoshiPerBitcoin, bob) + + interceptor, cancelInterceptor := bob.RPC.HtlcInterceptor() + + addResp := carol.RPC.AddInvoice(&lnrpc.Invoice{ + Value: invoiceAmt, + }) + invoice := carol.RPC.LookupInvoice(addResp.RHash) + + payHash, err := lntypes.MakeHash(invoice.RHash) + require.NoError(ht, err) + + req := &routerrpc.SendPaymentRequest{ + PaymentRequest: invoice.PaymentRequest, + FeeLimitMsat: noFeeLimitMsat, + } + ht.SendPaymentAssertInflight(alice, req) + + _ = ht.ReceiveHtlcInterceptor(interceptor) + ht.AssertIncomingHTLCActive(bob, cpAB, invoice.RHash) + ht.AssertPaymentStatus(alice, payHash, lnrpc.Payment_IN_FLIGHT) + + closeStream, _ := ht.CloseChannelAssertPending( + alice, cpAB, true, + ) + ht.AssertStreamChannelForceClosed( + alice, cpAB, false, closeStream, + ) + ht.AssertChannelPendingForceClose(bob, cpAB) + + cancelInterceptor() + ht.RestartNode(bob) + + // Re-register the interceptor after restart. The previous stream was + // cancelled before Bob went down. The incoming contest resolver only + // re-offers the on-chain HTLC to the active stream. + interceptor, cancelInterceptor = bob.RPC.HtlcInterceptor() + defer cancelInterceptor() + + // After restart, the incoming contest resolver re-offers the HTLC to + // the interceptor through the on-chain path. + intercepted := ht.ReceiveHtlcInterceptor(interceptor) + + // Mine one block after the on-chain intercept has been offered. With + // the current bug, the held entry is evicted here because the on-chain + // packet has no auto-fail height. + ht.MineEmptyBlocks(1) + + ht.AssertNumTxsInMempool(0) + err = interceptor.Send(&routerrpc.ForwardHtlcInterceptResponse{ + IncomingCircuitKey: intercepted.IncomingCircuitKey, + Action: routerrpc.ResolveHoldForwardAction_SETTLE, + Preimage: invoice.RPreimage, + }) + require.NoError(ht, err, "failed to settle intercepted HTLC") + + // The preimage should reach the contest resolver and register Bob's + // HTLC success input with the sweeper. + ht.AssertAtLeastNumPendingSweeps(bob, 1) + + // Give the sweeper another blockbeat to publish the sweep transaction. + ht.MineEmptyBlocks(1) + + ht.MineBlocksAndAssertNumTxes(1, 1) + ht.AssertPaymentStatus(alice, payHash, lnrpc.Payment_SUCCEEDED) + + // Bob's sweep is mined above. Clean up Alice's force close so the next + // test starts with an empty mempool. + ht.CleanupForceClose(alice) +} + +// testForwardInterceptorOnChainSettleNoRestart tests that an HTLC which was +// first held off-chain can still be settled after the incoming channel +// force-closes without restarting Bob. This covers the duplicate-entry path: +// the old off-chain held entry must not prevent settlement from reaching the +// on-chain contest resolver. +func testForwardInterceptorOnChainSettleNoRestart(ht *lntest.HarnessTest) { + const ( + chanAmt = btcutil.Amount(300000) + invoiceAmt = int64(100000) + ) + + // Bob requires an interceptor so the forwarded HTLC remains held until + // the test explicitly resolves it. + p := lntest.OpenChannelParams{Amt: chanAmt} + cfgs := [][]string{nil, {"--requireinterceptor"}, nil} + chanPoints, nodes := ht.CreateSimpleNetwork(cfgs, p) + alice, bob, carol := nodes[0], nodes[1], nodes[2] + cpAB := chanPoints[0] + + // Fund Bob so he can publish the on-chain HTLC success sweep once the + // interceptor supplies the preimage. + ht.FundCoins(btcutil.SatoshiPerBitcoin, bob) + + interceptor, cancelInterceptor := bob.RPC.HtlcInterceptor() + defer cancelInterceptor() + + addResp := carol.RPC.AddInvoice(&lnrpc.Invoice{ + Value: invoiceAmt, + }) + invoice := carol.RPC.LookupInvoice(addResp.RHash) + + payHash, err := lntypes.MakeHash(invoice.RHash) + require.NoError(ht, err) + + req := &routerrpc.SendPaymentRequest{ + PaymentRequest: invoice.PaymentRequest, + FeeLimitMsat: noFeeLimitMsat, + } + ht.SendPaymentAssertInflight(alice, req) + + intercepted := ht.ReceiveHtlcInterceptor(interceptor) + ht.AssertIncomingHTLCActive(bob, cpAB, invoice.RHash) + ht.AssertPaymentStatus(alice, payHash, lnrpc.Payment_IN_FLIGHT) + + closeStream, _ := ht.CloseChannelAssertPending( + alice, cpAB, true, + ) + ht.AssertStreamChannelForceClosed( + alice, cpAB, false, closeStream, + ) + ht.AssertChannelPendingForceClose(bob, cpAB) + + ht.AssertNumTxsInMempool(0) + err = interceptor.Send(&routerrpc.ForwardHtlcInterceptResponse{ + IncomingCircuitKey: intercepted.IncomingCircuitKey, + Action: routerrpc.ResolveHoldForwardAction_SETTLE, + Preimage: invoice.RPreimage, + }) + require.NoError(ht, err, "failed to settle intercepted HTLC") + + // The preimage should reach the contest resolver and register Bob's + // HTLC success input with the sweeper. + ht.AssertAtLeastNumPendingSweeps(bob, 1) + + // Give the sweeper another blockbeat to publish the sweep transaction. + ht.MineEmptyBlocks(1) + + ht.MineBlocksAndAssertNumTxes(1, 1) + ht.AssertPaymentStatus(alice, payHash, lnrpc.Payment_SUCCEEDED) + + // Bob's sweep is mined above. Clean up Alice's force close so the next + // test starts with an empty mempool. + ht.CleanupForceClose(alice) +} + // interceptorTestScenario is a helper struct to hold the test context and // provide the needed functionality. type interceptorTestScenario struct { diff --git a/lnrpc/routerrpc/forward_interceptor.go b/lnrpc/routerrpc/forward_interceptor.go index 6d6b3cf185..61adf8f2bf 100644 --- a/lnrpc/routerrpc/forward_interceptor.go +++ b/lnrpc/routerrpc/forward_interceptor.go @@ -96,7 +96,7 @@ func (r *forwardInterceptor) onIntercept( IncomingExpiry: htlc.IncomingExpiry, CustomRecords: htlc.InOnionCustomRecords, OnionBlob: htlc.OnionBlob[:], - AutoFailHeight: htlc.AutoFailHeight, + AutoFailHeight: htlc.AutoFailHeight(), InWireCustomRecords: htlc.InWireCustomRecords, } diff --git a/lnrpc/routerrpc/router.pb.go b/lnrpc/routerrpc/router.pb.go index 3008177d2b..78f21e244c 100644 --- a/lnrpc/routerrpc/router.pb.go +++ b/lnrpc/routerrpc/router.pb.go @@ -2812,6 +2812,10 @@ type ForwardHtlcInterceptRequest struct { state protoimpl.MessageState `protogen:"open.v1"` // The key of this forwarded htlc. It defines the incoming channel id and // the index in this channel. + // + // Interceptor clients should handle requests for the same circuit key + // idempotently. Requests may be replayed after reconnect, and an htlc that was + // previously offered off-chain may be offered again after it moves on-chain. IncomingCircuitKey *CircuitKey `protobuf:"bytes,1,opt,name=incoming_circuit_key,json=incomingCircuitKey,proto3" json:"incoming_circuit_key,omitempty"` // The incoming htlc amount. IncomingAmountMsat uint64 `protobuf:"varint,5,opt,name=incoming_amount_msat,json=incomingAmountMsat,proto3" json:"incoming_amount_msat,omitempty"` @@ -2834,7 +2838,8 @@ type ForwardHtlcInterceptRequest struct { // The onion blob for the next hop OnionBlob []byte `protobuf:"bytes,9,opt,name=onion_blob,json=onionBlob,proto3" json:"onion_blob,omitempty"` // The block height at which this htlc will be auto-failed to prevent the - // channel from force-closing. + // channel from force-closing. For on-chain htlcs, this field is the + // settlement deadline instead and no automatic fail-back is attempted. AutoFailHeight int32 `protobuf:"varint,10,opt,name=auto_fail_height,json=autoFailHeight,proto3" json:"auto_fail_height,omitempty"` // The custom records of the peer's incoming p2p wire message. InWireCustomRecords map[uint64][]byte `protobuf:"bytes,11,rep,name=in_wire_custom_records,json=inWireCustomRecords,proto3" json:"in_wire_custom_records,omitempty" protobuf_key:"varint,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` @@ -2957,6 +2962,14 @@ func (x *ForwardHtlcInterceptRequest) GetInWireCustomRecords() map[uint64][]byte // field modifications. // - `Reject`: Fail the htlc backwards. // - `Settle`: Settle this htlc with a given preimage. +// +// Once the incoming channel has force-closed and the HTLC is being resolved +// on-chain (see auto_fail_height), only `Settle` has any effect. The HTLC can no +// longer be resumed or failed back off-chain, so `Resume`, `ResumeModified`, and +// `Fail` return a stream-terminating error. The HTLC stays held until it is +// settled with a preimage, the on-chain resolver completes, or it expires +// on-chain. Clients should reconnect to receive any held HTLCs that remain +// unresolved. type ForwardHtlcInterceptResponse struct { state protoimpl.MessageState `protogen:"open.v1"` // * diff --git a/lnrpc/routerrpc/router.proto b/lnrpc/routerrpc/router.proto index a8aebe98fe..7b422bd841 100644 --- a/lnrpc/routerrpc/router.proto +++ b/lnrpc/routerrpc/router.proto @@ -910,6 +910,10 @@ message ForwardHtlcInterceptRequest { /* The key of this forwarded htlc. It defines the incoming channel id and the index in this channel. + + Interceptor clients should handle requests for the same circuit key + idempotently. Requests may be replayed after reconnect, and an htlc that was + previously offered off-chain may be offered again after it moves on-chain. */ CircuitKey incoming_circuit_key = 1; @@ -944,7 +948,8 @@ message ForwardHtlcInterceptRequest { bytes onion_blob = 9; // The block height at which this htlc will be auto-failed to prevent the - // channel from force-closing. + // channel from force-closing. For on-chain htlcs, this field is the + // settlement deadline instead and no automatic fail-back is attempted. int32 auto_fail_height = 10; // The custom records of the peer's incoming p2p wire message. @@ -959,6 +964,14 @@ forward. The caller can choose either to: field modifications. - `Reject`: Fail the htlc backwards. - `Settle`: Settle this htlc with a given preimage. + +Once the incoming channel has force-closed and the HTLC is being resolved +on-chain (see auto_fail_height), only `Settle` has any effect. The HTLC can no +longer be resumed or failed back off-chain, so `Resume`, `ResumeModified`, and +`Fail` return a stream-terminating error. The HTLC stays held until it is +settled with a preimage, the on-chain resolver completes, or it expires +on-chain. Clients should reconnect to receive any held HTLCs that remain +unresolved. */ message ForwardHtlcInterceptResponse { /** diff --git a/lnrpc/routerrpc/router.swagger.json b/lnrpc/routerrpc/router.swagger.json index 121c50d1a7..88b4761789 100644 --- a/lnrpc/routerrpc/router.swagger.json +++ b/lnrpc/routerrpc/router.swagger.json @@ -111,7 +111,7 @@ "parameters": [ { "name": "body", - "description": "*\nForwardHtlcInterceptResponse enables the caller to resolve a previously hold\nforward. The caller can choose either to:\n- `Resume`: Execute the default behavior (usually forward).\n- `ResumeModified`: Execute the default behavior (usually forward) with HTLC\nfield modifications.\n- `Reject`: Fail the htlc backwards.\n- `Settle`: Settle this htlc with a given preimage. (streaming inputs)", + "description": "*\nForwardHtlcInterceptResponse enables the caller to resolve a previously hold\nforward. The caller can choose either to:\n- `Resume`: Execute the default behavior (usually forward).\n- `ResumeModified`: Execute the default behavior (usually forward) with HTLC\nfield modifications.\n- `Reject`: Fail the htlc backwards.\n- `Settle`: Settle this htlc with a given preimage.\n\nOnce the incoming channel has force-closed and the HTLC is being resolved\non-chain (see auto_fail_height), only `Settle` has any effect. The HTLC can no\nlonger be resumed or failed back off-chain, so `Resume`, `ResumeModified`, and\n`Fail` return a stream-terminating error. The HTLC stays held until it is\nsettled with a preimage, the on-chain resolver completes, or it expires\non-chain. Clients should reconnect to receive any held HTLCs that remain\nunresolved. (streaming inputs)", "in": "body", "required": true, "schema": { @@ -1552,7 +1552,7 @@ "properties": { "incoming_circuit_key": { "$ref": "#/definitions/routerrpcCircuitKey", - "description": "The key of this forwarded htlc. It defines the incoming channel id and\nthe index in this channel." + "description": "The key of this forwarded htlc. It defines the incoming channel id and\nthe index in this channel.\n\nInterceptor clients should handle requests for the same circuit key\nidempotently. Requests may be replayed after reconnect, and an htlc that was\npreviously offered off-chain may be offered again after it moves on-chain." }, "incoming_amount_msat": { "type": "string", @@ -1600,7 +1600,7 @@ "auto_fail_height": { "type": "integer", "format": "int32", - "description": "The block height at which this htlc will be auto-failed to prevent the\nchannel from force-closing." + "description": "The block height at which this htlc will be auto-failed to prevent the\nchannel from force-closing. For on-chain htlcs, this field is the\nsettlement deadline instead and no automatic fail-back is attempted." }, "in_wire_custom_records": { "type": "object", @@ -1656,7 +1656,7 @@ "description": "Any custom records that should be set on the p2p wire message message of\nthe resumed HTLC. This field is ignored if the action is not\nRESUME_MODIFIED.\n\nThis map will merge with the existing set of custom records (if any),\nreplacing any conflicting types. Note that there currently is no support\nfor deleting existing custom records (they can only be replaced)." } }, - "description": "*\nForwardHtlcInterceptResponse enables the caller to resolve a previously hold\nforward. The caller can choose either to:\n- `Resume`: Execute the default behavior (usually forward).\n- `ResumeModified`: Execute the default behavior (usually forward) with HTLC\nfield modifications.\n- `Reject`: Fail the htlc backwards.\n- `Settle`: Settle this htlc with a given preimage." + "description": "*\nForwardHtlcInterceptResponse enables the caller to resolve a previously hold\nforward. The caller can choose either to:\n- `Resume`: Execute the default behavior (usually forward).\n- `ResumeModified`: Execute the default behavior (usually forward) with HTLC\nfield modifications.\n- `Reject`: Fail the htlc backwards.\n- `Settle`: Settle this htlc with a given preimage.\n\nOnce the incoming channel has force-closed and the HTLC is being resolved\non-chain (see auto_fail_height), only `Settle` has any effect. The HTLC can no\nlonger be resumed or failed back off-chain, so `Resume`, `ResumeModified`, and\n`Fail` return a stream-terminating error. The HTLC stays held until it is\nsettled with a preimage, the on-chain resolver completes, or it expires\non-chain. Clients should reconnect to receive any held HTLCs that remain\nunresolved." }, "routerrpcGetMissionControlConfigResponse": { "type": "object", diff --git a/server.go b/server.go index 078f9299f8..378dfa5510 100644 --- a/server.go +++ b/server.go @@ -935,6 +935,7 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr, s.witnessBeacon = newPreimageBeacon( dbs.ChanStateDB.NewWitnessCache(), s.interceptableSwitch.ForwardPacket, + s.interceptableSwitch.RemoveOnChainIntercept, ) chanStatusMgrCfg := &netann.ChanStatusConfig{ diff --git a/witness_beacon.go b/witness_beacon.go index 6c315d0c18..a552c8d13f 100644 --- a/witness_beacon.go +++ b/witness_beacon.go @@ -6,6 +6,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/contractcourt" + "github.com/lightningnetwork/lnd/fn/v2" "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/htlcswitch/hop" @@ -44,15 +45,19 @@ type preimageBeacon struct { subscribers map[uint64]*preimageSubscriber interceptor func(htlcswitch.InterceptedForward) error + + cancelInterceptor func(models.CircuitKey) error } func newPreimageBeacon(wCache witnessCache, - interceptor func(htlcswitch.InterceptedForward) error) *preimageBeacon { + interceptor func(htlcswitch.InterceptedForward) error, + cancelInterceptor func(models.CircuitKey) error) *preimageBeacon { return &preimageBeacon{ - wCache: wCache, - interceptor: interceptor, - subscribers: make(map[uint64]*preimageSubscriber), + wCache: wCache, + interceptor: interceptor, + cancelInterceptor: cancelInterceptor, + subscribers: make(map[uint64]*preimageSubscriber), } } @@ -64,48 +69,61 @@ func (p *preimageBeacon) SubscribeUpdates( nextHopOnionBlob []byte) (*contractcourt.WitnessSubscription, error) { p.Lock() - defer p.Unlock() - clientID := p.clientCounter client := &preimageSubscriber{ updateChan: make(chan lntypes.Preimage, 10), quit: make(chan struct{}), } - p.subscribers[p.clientCounter] = client + p.subscribers[clientID] = client p.clientCounter++ + p.Unlock() srvrLog.Debugf("Creating new witness beacon subscriber, id=%v", - p.clientCounter) + clientID) + + inKey := models.CircuitKey{ + ChanID: chanID, + HtlcID: htlc.HtlcIndex, + } sub := &contractcourt.WitnessSubscription{ WitnessUpdates: client.updateChan, CancelSubscription: func() { p.Lock() - defer p.Unlock() delete(p.subscribers, clientID) close(client.quit) + p.Unlock() + + err := p.cancelInterceptor(inKey) + if err != nil { + srvrLog.Errorf("Cannot remove on-chain "+ + "intercept %v: %v", inKey, err) + } }, } // Notify the htlc interceptor. There may be a client connected // and willing to supply a preimage. packet := &htlcswitch.InterceptedPacket{ - Hash: htlc.RHash, - IncomingExpiry: htlc.RefundTimeout, - IncomingAmount: htlc.Amt, - IncomingCircuit: models.CircuitKey{ - ChanID: chanID, - HtlcID: htlc.HtlcIndex, - }, + Hash: htlc.RHash, + IncomingExpiry: htlc.RefundTimeout, + IncomingAmount: htlc.Amt, + IncomingCircuit: inKey, OutgoingChanID: payload.FwdInfo.NextHop, OutgoingExpiry: payload.FwdInfo.OutgoingCTLV, OutgoingAmount: payload.FwdInfo.AmountToForward, InOnionCustomRecords: payload.CustomRecords(), InWireCustomRecords: htlc.CustomRecords, + // Keep the on-chain intercept available to the + // interceptor until the HTLC expires on chain. + Deadline: fn.NewRight[ + htlcswitch.OffChainAutoFailHeight, + htlcswitch.OnChainSettleDeadline, + ](htlcswitch.OnChainSettleDeadline(htlc.RefundTimeout)), } copy(packet.OnionBlob[:], nextHopOnionBlob) @@ -113,6 +131,8 @@ func (p *preimageBeacon) SubscribeUpdates( err := p.interceptor(fwd) if err != nil { + sub.CancelSubscription() + return nil, err } diff --git a/witness_beacon_test.go b/witness_beacon_test.go index d98c276f52..1edbada930 100644 --- a/witness_beacon_test.go +++ b/witness_beacon_test.go @@ -1,9 +1,11 @@ package lnd import ( + "errors" "testing" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/htlcswitch/hop" "github.com/lightningnetwork/lnd/lntypes" @@ -20,9 +22,15 @@ func TestWitnessBeaconIntercept(t *testing.T) { return nil } + var canceledKey models.CircuitKey + cancelInterceptor := func(key models.CircuitKey) error { + canceledKey = key + + return nil + } p := newPreimageBeacon( - &mockWitnessCache{}, interceptor, + &mockWitnessCache{}, interceptor, cancelInterceptor, ) preimage := lntypes.Preimage{1, 2, 3} @@ -37,12 +45,56 @@ func TestWitnessBeaconIntercept(t *testing.T) { []byte{2}, ) require.NoError(t, err) - t.Cleanup(subscription.CancelSubscription) require.NoError(t, interceptedFwd.Settle(preimage)) update := <-subscription.WitnessUpdates require.Equal(t, preimage, update) + + subscription.CancelSubscription() + require.Equal(t, interceptedFwd.Packet().IncomingCircuit, canceledKey) +} + +// TestWitnessBeaconInterceptErrorCancels tests that a failed interceptor offer +// tears down the witness subscription and on-chain intercept handle. +func TestWitnessBeaconInterceptErrorCancels(t *testing.T) { + errInterceptor := errors.New("interceptor error") + + interceptor := func(htlcswitch.InterceptedForward) error { + return errInterceptor + } + + var canceledKey models.CircuitKey + cancelInterceptor := func(key models.CircuitKey) error { + canceledKey = key + + return nil + } + + p := newPreimageBeacon( + &mockWitnessCache{}, interceptor, cancelInterceptor, + ) + + chanID := lnwire.NewShortChanIDFromInt(1) + htlc := &channeldb.HTLC{ + HtlcIndex: 2, + RHash: lntypes.Hash{3}, + } + + subscription, err := p.SubscribeUpdates( + chanID, htlc, &hop.Payload{}, []byte{2}, + ) + require.ErrorIs(t, err, errInterceptor) + require.Nil(t, subscription) + + require.Equal(t, models.CircuitKey{ + ChanID: chanID, + HtlcID: htlc.HtlcIndex, + }, canceledKey) + + p.RLock() + require.Empty(t, p.subscribers) + p.RUnlock() } type mockWitnessCache struct {