Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions src/sentry/buffer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
from typing import Any

import psycopg2.errors
from django.db import DataError
from django.db import DataError, IntegrityError, router, transaction
from django.db.models import F

from sentry.db import models
from sentry.db.models.fields.bounded import BoundedPositiveIntegerField
from sentry.options.rollout import in_random_rollout
from sentry.signals import buffer_incr_complete
from sentry.tasks.process_buffer import process_incr
from sentry.utils import metrics
Expand Down Expand Up @@ -187,7 +188,21 @@
raise
created = False
elif model:
_, created = model.objects.create_or_update(values=update_kwargs, **filters)
if in_random_rollout("buffer.update-or-create.rollout"):
affected = model.objects.filter(**filters).update(**update_kwargs)
if not affected:
create_kwargs = {**filters, **{col: v for col, v in columns.items()}}
try:
# Wrapped in a transaction block to create a savepoint
# so an IntegrityError from a concurrent insert
# only rolls back this attempt, not any outer transaction.
with transaction.atomic(using=router.db_for_write(model)):
model.objects.create(**create_kwargs)
created = True

Check warning on line 201 in src/sentry/buffer/base.py

View check run for this annotation

@sentry/warden / warden: sentry-backend-bugs

`extra` fields silently dropped on INSERT path in Buffer.process

In the `elif model:` rollout branch, when `filter().update()` matches no rows and a new record is created, `create_kwargs` is built from `filters` and `columns` only — `extra` is omitted. The legacy `create_or_update` it replaces did include `extra` on the INSERT path (it resolved both F-expressions and plain values from `values`/`defaults` into `create_kwargs`). As a result, caller-supplied fields like `last_seen` are silently dropped on creation and fall back to model defaults. The create branch only fires when the filtered row is absent (e.g. a `GroupTombstone`/`GroupRelease` row deleted between the buffer enqueue and flush), so this is a narrow correctness regression rather than a crash.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra fields silently dropped on INSERT path in Buffer.process

In the elif model: rollout branch, when filter().update() matches no rows and a new record is created, create_kwargs is built from filters and columns only — extra is omitted. The legacy create_or_update it replaces did include extra on the INSERT path (it resolved both F-expressions and plain values from values/defaults into create_kwargs). As a result, caller-supplied fields like last_seen are silently dropped on creation and fall back to model defaults. The create branch only fires when the filtered row is absent (e.g. a GroupTombstone/GroupRelease row deleted between the buffer enqueue and flush), so this is a narrow correctness regression rather than a crash.

Evidence
  • create_or_update in db/models/query.py builds create_kwargs from kwargs plus iterating values.items() and defaults.items(), so non-F extra values (e.g. last_seen) were written on INSERT.
  • The new branch in buffer/base.py sets create_kwargs = {**filters, **{col: v for col, v in columns.items()}}, omitting extra, so last_seen is not passed to model.objects.create().
  • GroupTombstone caller in event_manager.py:320-331 passes extra={'last_seen': max(event.datetime, group_tombstone.last_seen)}; if the tombstone row is gone at flush time the create path fires and last_seen falls back to auto_now_add.
  • ReleaseProject/ReleaseProjectEnvironment callers pass no extra, and GroupRelease filters by an existing id, so the create path with extra is only reached on a deletion race — limiting real-world impact.

Identified by Warden sentry-backend-bugs · C5Q-Q33

except IntegrityError:
model.objects.filter(**filters).update(**update_kwargs)
else:
_, created = model.objects.create_or_update(values=update_kwargs, **filters)

buffer_incr_complete.send_robust(
model=model,
Expand Down
10 changes: 10 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -3780,3 +3780,13 @@
default=False,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)

# Rollout rate for the new Buffer.process non-Group update path that uses
# filter().update() + create() instead of the legacy create_or_update.
# Set to 1.0 to fully enable; set to 0.0 to kill-switch back to the old path.
register(
"buffer.update-or-create.rollout",
type=Float,
default=0.0,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)
77 changes: 74 additions & 3 deletions tests/sentry/buffer/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from unittest import mock

import psycopg2.errors
from django.db import DataError
from django.db import DataError, IntegrityError
from django.utils import timezone

from sentry.buffer.base import Buffer, BufferField
Expand All @@ -14,7 +14,9 @@
from sentry.models.releases.release_project import ReleaseProject
from sentry.models.team import Team
from sentry.receivers import create_default_projects
from sentry.signals import buffer_incr_complete
from sentry.testutils.cases import TestCase
from sentry.testutils.helpers import override_options


class BufferTest(TestCase):
Expand Down Expand Up @@ -75,8 +77,7 @@ def test_increments_when_null(self) -> None:
release_project_ = ReleaseProject.objects.get(id=release_project.id)
assert release_project_.new_groups == 1

@mock.patch("sentry.models.Group.objects.create_or_update")
def test_signal_only(self, create_or_update: mock.MagicMock) -> None:
def test_signal_only(self) -> None:
group = Group.objects.create(project=Project(id=1))
columns = {"times_seen": 1}
filters = {"id": group.id, "project_id": 1}
Expand All @@ -86,6 +87,76 @@ def test_signal_only(self, create_or_update: mock.MagicMock) -> None:
group.refresh_from_db()
assert group.times_seen == prev_times_seen

@override_options({"buffer.update-or-create.rollout": 1.0})
def test_process_non_group_update_increments_column(self) -> None:
"""Buffer.process() increments a column on an existing non-Group row via F() expression."""
# self.release already has a ReleaseProject for self.project (created by add_project)
rp = ReleaseProject.objects.get(project=self.project, release=self.release)
ReleaseProject.objects.filter(id=rp.id).update(new_groups=5)

received: list[bool] = []

def on_signal(sender, created, **kw):
received.append(created)

buffer_incr_complete.connect(on_signal, weak=False)
try:
self.buf.process(
ReleaseProject,
{"new_groups": 3},
{"project_id": self.project.id, "release_id": self.release.id},
)
finally:
buffer_incr_complete.disconnect(on_signal)

rp.refresh_from_db()
assert rp.new_groups == 8
assert received == [False]

@override_options({"buffer.update-or-create.rollout": 1.0})
def test_process_non_group_create_sets_initial_value(self) -> None:
"""Buffer.process() creates a row with the column's raw initial value when it doesn't exist."""
# Create a release that has no ReleaseProject yet
new_release = Release.objects.create(
version="buffer-test-v1", organization_id=self.project.organization_id
)
received: list[bool] = []

def on_signal(sender, created, **kw):
received.append(created)

buffer_incr_complete.connect(on_signal, weak=False)
try:
self.buf.process(
ReleaseProject,
{"new_groups": 4},
{"project_id": self.project.id, "release_id": new_release.id},
)
finally:
buffer_incr_complete.disconnect(on_signal)

rp = ReleaseProject.objects.get(project=self.project, release=new_release)
assert rp.new_groups == 4
assert received == [True]

@override_options({"buffer.update-or-create.rollout": 1.0})
def test_process_race_condition_integrity_error_handled(self) -> None:
"""IntegrityError during create (concurrent insert race) is caught and retried as update."""
# Create a release that has no ReleaseProject so filter().update() returns 0
new_release = Release.objects.create(
version="buffer-test-race", organization_id=self.project.organization_id
)
columns = {"new_groups": 2}
filters = {"project_id": self.project.id, "release_id": new_release.id}

# No row exists → filter().update() returns 0 → triggers create path.
# Mock create to raise IntegrityError (simulating a concurrent insert winning the race).
# The code must catch it and retry with filter().update() — without propagating.
with mock.patch.object(
ReleaseProject.objects, "create", side_effect=IntegrityError("duplicate key")
):
self.buf.process(ReleaseProject, columns, filters)

def test_process_caps_times_seen_on_overflow(self) -> None:
"""Test that times_seen is capped to BoundedPositiveIntegerField.MAX_VALUE when increment would cause overflow.

Expand Down
2 changes: 1 addition & 1 deletion tests/sentry/test_no_create_or_update_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# create_or_update. Instead, use Django's update_or_create.
# Reduce over time as code is refactored. DO NOT add new files here.
ALLOWLIST_FILES: set[str] = {
"src/sentry/buffer/base.py",
"src/sentry/buffer/base.py", # fallback path behind buffer.update-or-create.rollout option
"src/sentry/db/models/manager/base.py",
}

Expand Down
Loading