From 9670c9eb3ad3c5b13a16a4277bd2279b105303ff Mon Sep 17 00:00:00 2001 From: Vjeran Grozdanic Date: Fri, 12 Jun 2026 12:51:46 +0200 Subject: [PATCH] ref(db): Replace create_or_update with update_or_create in Buffer.process MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `elif model:` branch in `Buffer.process` was the last remaining caller of Sentry's custom `create_or_update` utility in non-manager production code. A simple `update_or_create` cannot be used here because `update_kwargs` contains `F()` increment expressions that Django would try to evaluate on the INSERT path and fail. Instead, use the explicit pattern: `filter().update()` first; if no rows were affected, `create()` with raw column values inside a savepoint, retrying as an update on `IntegrityError` in case a concurrent writer beat us to the insert. All models that reach this branch — `GroupTombstone`, `ReleaseProject`, `ReleaseProjectEnvironment`, and `GroupRelease` — have unique constraints on their filter fields, so the create-with-retry pattern is safe. Adds regression tests covering the update path (F() increment applied, signal fires with `created=False`), the create path (initial value set, signal fires with `created=True`), and the race-condition path (IntegrityError swallowed, no exception propagated). Co-Authored-By: Claude --- src/sentry/buffer/base.py | 19 ++++- src/sentry/options/defaults.py | 10 +++ tests/sentry/buffer/test_base.py | 77 ++++++++++++++++++- .../sentry/test_no_create_or_update_usage.py | 2 +- 4 files changed, 102 insertions(+), 6 deletions(-) diff --git a/src/sentry/buffer/base.py b/src/sentry/buffer/base.py index 0b6d33d24a29..f5d26c58d81f 100644 --- a/src/sentry/buffer/base.py +++ b/src/sentry/buffer/base.py @@ -3,11 +3,12 @@ from typing import Any import psycopg2.errors -from django.db import DataError +from django.db import DataError, IntegrityError, router, transaction from django.db.models import F from sentry.db import models from sentry.db.models.fields.bounded import BoundedPositiveIntegerField +from sentry.options.rollout import in_random_rollout from sentry.signals import buffer_incr_complete from sentry.tasks.process_buffer import process_incr from sentry.utils import metrics @@ -187,7 +188,21 @@ def process( raise created = False elif model: - _, created = model.objects.create_or_update(values=update_kwargs, **filters) + if in_random_rollout("buffer.update-or-create.rollout"): + affected = model.objects.filter(**filters).update(**update_kwargs) + if not affected: + create_kwargs = {**filters, **{col: v for col, v in columns.items()}} + try: + # Wrapped in a transaction block to create a savepoint + # so an IntegrityError from a concurrent insert + # only rolls back this attempt, not any outer transaction. + with transaction.atomic(using=router.db_for_write(model)): + model.objects.create(**create_kwargs) + created = True + except IntegrityError: + model.objects.filter(**filters).update(**update_kwargs) + else: + _, created = model.objects.create_or_update(values=update_kwargs, **filters) buffer_incr_complete.send_robust( model=model, diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 82e5f5fdb857..442eb2113799 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -3780,3 +3780,13 @@ default=False, flags=FLAG_AUTOMATOR_MODIFIABLE, ) + +# Rollout rate for the new Buffer.process non-Group update path that uses +# filter().update() + create() instead of the legacy create_or_update. +# Set to 1.0 to fully enable; set to 0.0 to kill-switch back to the old path. +register( + "buffer.update-or-create.rollout", + type=Float, + default=0.0, + flags=FLAG_AUTOMATOR_MODIFIABLE, +) diff --git a/tests/sentry/buffer/test_base.py b/tests/sentry/buffer/test_base.py index 8e619495059e..d6a7ca6946eb 100644 --- a/tests/sentry/buffer/test_base.py +++ b/tests/sentry/buffer/test_base.py @@ -2,7 +2,7 @@ from unittest import mock import psycopg2.errors -from django.db import DataError +from django.db import DataError, IntegrityError from django.utils import timezone from sentry.buffer.base import Buffer, BufferField @@ -14,7 +14,9 @@ from sentry.models.releases.release_project import ReleaseProject from sentry.models.team import Team from sentry.receivers import create_default_projects +from sentry.signals import buffer_incr_complete from sentry.testutils.cases import TestCase +from sentry.testutils.helpers import override_options class BufferTest(TestCase): @@ -75,8 +77,7 @@ def test_increments_when_null(self) -> None: release_project_ = ReleaseProject.objects.get(id=release_project.id) assert release_project_.new_groups == 1 - @mock.patch("sentry.models.Group.objects.create_or_update") - def test_signal_only(self, create_or_update: mock.MagicMock) -> None: + def test_signal_only(self) -> None: group = Group.objects.create(project=Project(id=1)) columns = {"times_seen": 1} filters = {"id": group.id, "project_id": 1} @@ -86,6 +87,76 @@ def test_signal_only(self, create_or_update: mock.MagicMock) -> None: group.refresh_from_db() assert group.times_seen == prev_times_seen + @override_options({"buffer.update-or-create.rollout": 1.0}) + def test_process_non_group_update_increments_column(self) -> None: + """Buffer.process() increments a column on an existing non-Group row via F() expression.""" + # self.release already has a ReleaseProject for self.project (created by add_project) + rp = ReleaseProject.objects.get(project=self.project, release=self.release) + ReleaseProject.objects.filter(id=rp.id).update(new_groups=5) + + received: list[bool] = [] + + def on_signal(sender, created, **kw): + received.append(created) + + buffer_incr_complete.connect(on_signal, weak=False) + try: + self.buf.process( + ReleaseProject, + {"new_groups": 3}, + {"project_id": self.project.id, "release_id": self.release.id}, + ) + finally: + buffer_incr_complete.disconnect(on_signal) + + rp.refresh_from_db() + assert rp.new_groups == 8 + assert received == [False] + + @override_options({"buffer.update-or-create.rollout": 1.0}) + def test_process_non_group_create_sets_initial_value(self) -> None: + """Buffer.process() creates a row with the column's raw initial value when it doesn't exist.""" + # Create a release that has no ReleaseProject yet + new_release = Release.objects.create( + version="buffer-test-v1", organization_id=self.project.organization_id + ) + received: list[bool] = [] + + def on_signal(sender, created, **kw): + received.append(created) + + buffer_incr_complete.connect(on_signal, weak=False) + try: + self.buf.process( + ReleaseProject, + {"new_groups": 4}, + {"project_id": self.project.id, "release_id": new_release.id}, + ) + finally: + buffer_incr_complete.disconnect(on_signal) + + rp = ReleaseProject.objects.get(project=self.project, release=new_release) + assert rp.new_groups == 4 + assert received == [True] + + @override_options({"buffer.update-or-create.rollout": 1.0}) + def test_process_race_condition_integrity_error_handled(self) -> None: + """IntegrityError during create (concurrent insert race) is caught and retried as update.""" + # Create a release that has no ReleaseProject so filter().update() returns 0 + new_release = Release.objects.create( + version="buffer-test-race", organization_id=self.project.organization_id + ) + columns = {"new_groups": 2} + filters = {"project_id": self.project.id, "release_id": new_release.id} + + # No row exists → filter().update() returns 0 → triggers create path. + # Mock create to raise IntegrityError (simulating a concurrent insert winning the race). + # The code must catch it and retry with filter().update() — without propagating. + with mock.patch.object( + ReleaseProject.objects, "create", side_effect=IntegrityError("duplicate key") + ): + self.buf.process(ReleaseProject, columns, filters) + def test_process_caps_times_seen_on_overflow(self) -> None: """Test that times_seen is capped to BoundedPositiveIntegerField.MAX_VALUE when increment would cause overflow. diff --git a/tests/sentry/test_no_create_or_update_usage.py b/tests/sentry/test_no_create_or_update_usage.py index 389be56da30b..54688a465a5e 100644 --- a/tests/sentry/test_no_create_or_update_usage.py +++ b/tests/sentry/test_no_create_or_update_usage.py @@ -10,7 +10,7 @@ # create_or_update. Instead, use Django's update_or_create. # Reduce over time as code is refactored. DO NOT add new files here. ALLOWLIST_FILES: set[str] = { - "src/sentry/buffer/base.py", + "src/sentry/buffer/base.py", # fallback path behind buffer.update-or-create.rollout option "src/sentry/db/models/manager/base.py", }