From 69aafdb7fd7acdbc88ca3dc5a2aa8feeb8eb5145 Mon Sep 17 00:00:00 2001 From: bRong Njam Date: Wed, 17 Jun 2026 17:28:15 +0800 Subject: [PATCH] fix(rscthrottler): add mpool-based physical memory gate for S3 writes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the removed RSS gate with a check on mpool.GlobalStats().NumCurrBytes. mpool tracks all mmap'd off-heap allocations (vectors, hash tables, S3 buffers, cache) and is real-time accurate — no stale window, no RSS/s3-buffer double-count. The pool check (limit - reserved) bounds S3 write concurrency. The mpool gate (mpoolBytes + ask > total * limitRate) bounds physical memory. Rejection triggers flushS3WriterOnMemoryPressure, freeing mpool + reserved. Fixes OOM on TPCC regression caused by #24892 removing the RSS gate. --- pkg/common/rscthrottler/resource_throttler.go | 26 +++++++++------- .../rscthrottler/resource_throttler_test.go | 30 +++++++++++++++++++ 2 files changed, 46 insertions(+), 10 deletions(-) diff --git a/pkg/common/rscthrottler/resource_throttler.go b/pkg/common/rscthrottler/resource_throttler.go index ac9c717f3d224..b5658a033bc62 100644 --- a/pkg/common/rscthrottler/resource_throttler.go +++ b/pkg/common/rscthrottler/resource_throttler.go @@ -612,21 +612,27 @@ func acquireWithinRSSLimit(throttler *memThrottler, ask int64) (int64, bool) { // double-counts S3 write buffers already reflected in RSS. // The pool only needs to bound forward-looking reservations // against the throttler limit. - // - // No separate RSS-based physical memory gate is needed here: - // - pinnedRate >= 0.80 at the AcquirePolicyForCNFlushS3 level - // provides a hard ceiling on pool utilisation. - // - RSS scavenging (85 % / 92 % thresholds in tryScavengeRSS) - // handles physical memory pressure with a graduated response - // (cache eviction + FreeOSMemory) that is more effective than - // a binary reject. - // The original RSS gate (added in PR #24268 for non-S3 memory - // pressure) has been superseded by RSS scavenging. avail := limit - reserved if !throttler.options.allowOutOfMemoryAcquire && avail < ask { return avail, false } + // Physical memory gate: guard against total off-heap (mpool) + // consumption exceeding the container limit. mpool tracks all + // mmap'd allocations (vectors, hash tables, S3 buffers, cache) + // — everything that eats RSS. It is real-time accurate, with + // no stale window, and does not double-count the S3 write + // buffers already reflected in reserved. Rejection here triggers + // flushS3WriterOnMemoryPressure in the caller, freeing mpool + + // reserved and creating headroom for retry. + total := int64(throttler.actualTotalMemory.Load()) + if total > 0 && throttler.limitRate > 0 { + mpoolBytes := mpool.GlobalStats().NumCurrBytes.Load() + if float64(mpoolBytes+ask) > float64(total)*throttler.limitRate { + return 0, false + } + } + newReserved := reserved + ask if throttler.reserved.CompareAndSwap(reserved, newReserved) { return limit - newReserved, true diff --git a/pkg/common/rscthrottler/resource_throttler_test.go b/pkg/common/rscthrottler/resource_throttler_test.go index 46f8b6f910343..33d1ba309c481 100644 --- a/pkg/common/rscthrottler/resource_throttler_test.go +++ b/pkg/common/rscthrottler/resource_throttler_test.go @@ -491,4 +491,34 @@ func TestAcquirePolicyForCNFlushS3(t *testing.T) { require.Equal(t, int64(30), left) require.Equal(t, int64(60), throttler.reserved.Load()) }) + t.Run("deny when mpool plus ask exceeds physical limit", func(t *testing.T) { + oldMpool := mpool.GlobalStats().NumCurrBytes.Load() + defer mpool.GlobalStats().NumCurrBytes.Store(oldMpool) + + throttler := &memThrottler{limitRate: 0.90} + throttler.actualTotalMemory.Store(100) + throttler.limit.Store(90) + mpool.GlobalStats().NumCurrBytes.Store(89) + + left, ok := AcquirePolicyForCNFlushS3(throttler, 2) + require.False(t, ok) + require.Equal(t, int64(0), left) + require.Equal(t, int64(0), throttler.reserved.Load()) + }) + + t.Run("allow when mpool plus ask is at physical boundary", func(t *testing.T) { + oldMpool := mpool.GlobalStats().NumCurrBytes.Load() + defer mpool.GlobalStats().NumCurrBytes.Store(oldMpool) + + throttler := &memThrottler{limitRate: 0.90} + throttler.actualTotalMemory.Store(100) + throttler.limit.Store(90) + mpool.GlobalStats().NumCurrBytes.Store(88) + + left, ok := AcquirePolicyForCNFlushS3(throttler, 2) + require.True(t, ok) + require.Equal(t, int64(88), left) + require.Equal(t, int64(2), throttler.reserved.Load()) + }) + }