Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions pkg/common/rscthrottler/resource_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,21 +612,27 @@ func acquireWithinRSSLimit(throttler *memThrottler, ask int64) (int64, bool) {
// double-counts S3 write buffers already reflected in RSS.
// The pool only needs to bound forward-looking reservations
// against the throttler limit.
//
// No separate RSS-based physical memory gate is needed here:
// - pinnedRate >= 0.80 at the AcquirePolicyForCNFlushS3 level
// provides a hard ceiling on pool utilisation.
// - RSS scavenging (85 % / 92 % thresholds in tryScavengeRSS)
// handles physical memory pressure with a graduated response
// (cache eviction + FreeOSMemory) that is more effective than
// a binary reject.
// The original RSS gate (added in PR #24268 for non-S3 memory
// pressure) has been superseded by RSS scavenging.
avail := limit - reserved
if !throttler.options.allowOutOfMemoryAcquire && avail < ask {
return avail, false
}

// Physical memory gate: guard against total off-heap (mpool)
// consumption exceeding the container limit. mpool tracks all
// mmap'd allocations (vectors, hash tables, S3 buffers, cache)
// — everything that eats RSS. It is real-time accurate, with
// no stale window, and does not double-count the S3 write
// buffers already reflected in reserved. Rejection here triggers
// flushS3WriterOnMemoryPressure in the caller, freeing mpool +
// reserved and creating headroom for retry.
total := int64(throttler.actualTotalMemory.Load())
if total > 0 && throttler.limitRate > 0 {
mpoolBytes := mpool.GlobalStats().NumCurrBytes.Load()
if float64(mpoolBytes+ask) > float64(total)*throttler.limitRate {
return 0, false
}
}

newReserved := reserved + ask
if throttler.reserved.CompareAndSwap(reserved, newReserved) {
return limit - newReserved, true
Expand Down
30 changes: 30 additions & 0 deletions pkg/common/rscthrottler/resource_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,4 +491,34 @@ func TestAcquirePolicyForCNFlushS3(t *testing.T) {
require.Equal(t, int64(30), left)
require.Equal(t, int64(60), throttler.reserved.Load())
})
t.Run("deny when mpool plus ask exceeds physical limit", func(t *testing.T) {
oldMpool := mpool.GlobalStats().NumCurrBytes.Load()
defer mpool.GlobalStats().NumCurrBytes.Store(oldMpool)

throttler := &memThrottler{limitRate: 0.90}
throttler.actualTotalMemory.Store(100)
throttler.limit.Store(90)
mpool.GlobalStats().NumCurrBytes.Store(89)

left, ok := AcquirePolicyForCNFlushS3(throttler, 2)
require.False(t, ok)
require.Equal(t, int64(0), left)
require.Equal(t, int64(0), throttler.reserved.Load())
})

t.Run("allow when mpool plus ask is at physical boundary", func(t *testing.T) {
oldMpool := mpool.GlobalStats().NumCurrBytes.Load()
defer mpool.GlobalStats().NumCurrBytes.Store(oldMpool)

throttler := &memThrottler{limitRate: 0.90}
throttler.actualTotalMemory.Store(100)
throttler.limit.Store(90)
mpool.GlobalStats().NumCurrBytes.Store(88)

left, ok := AcquirePolicyForCNFlushS3(throttler, 2)
require.True(t, ok)
require.Equal(t, int64(88), left)
require.Equal(t, int64(2), throttler.reserved.Load())
})

}
Loading