diff --git a/backend/modules/evaluation/application/experiment_app.go b/backend/modules/evaluation/application/experiment_app.go index f350f1fca..9734d025e 100644 --- a/backend/modules/evaluation/application/experiment_app.go +++ b/backend/modules/evaluation/application/experiment_app.go @@ -1254,16 +1254,14 @@ func (e *experimentApplication) RetryExperiment(ctx context.Context, req *expt.R switch runMode { case entity.EvaluationModeRetryItems: - rid, retried, err := e.manager.LogRetryItemsRun(ctx, req.GetExptID(), runMode, req.GetWorkspaceID(), req.GetItemIds(), session) + rid, _, err := e.manager.LogRetryItemsRun(ctx, req.GetExptID(), runMode, req.GetWorkspaceID(), req.GetItemIds(), session) if err != nil { return nil, err } runID = rid - if !retried { - if err := e.manager.RetryItems(ctx, req.GetExptID(), runID, req.GetWorkspaceID(), gptr.Indirect(got.EvalConf.ItemRetryNum), req.GetItemIds(), session, req.GetExt()); err != nil { - return nil, err - } + if err := e.manager.RetryItems(ctx, req.GetExptID(), runID, req.GetWorkspaceID(), gptr.Indirect(got.EvalConf.ItemRetryNum), req.GetItemIds(), session, req.GetExt()); err != nil { + return nil, err } default: if runID, err = e.idgen.GenID(ctx); err != nil { diff --git a/backend/modules/evaluation/application/experiment_app_test.go b/backend/modules/evaluation/application/experiment_app_test.go index d70d9df3b..6df8e2e75 100644 --- a/backend/modules/evaluation/application/experiment_app_test.go +++ b/backend/modules/evaluation/application/experiment_app_test.go @@ -6895,7 +6895,8 @@ func TestExperimentApplication_RetryExperiment_Branches(t *testing.T) { mockManager.EXPECT().Get(gomock.Any(), validExptID, validWorkspaceID, gomock.Any()).Return(baseExpt, nil) mockAuth.EXPECT().AuthorizationWithoutSPI(gomock.Any(), gomock.Any()).Return(nil) mockManager.EXPECT().LogRetryItemsRun(gomock.Any(), validExptID, entity.EvaluationModeRetryItems, validWorkspaceID, []int64{1}, gomock.Any()).Return(validRunID, true, nil) - // RetryItems should NOT be called since retried=true + // RetryItems IS called even when retried=true, so the scheduler picks up appended items + mockManager.EXPECT().RetryItems(gomock.Any(), validExptID, validRunID, validWorkspaceID, itemRetryNum, []int64{1}, gomock.Any(), gomock.Any()).Return(nil) resp, err := app.RetryExperiment(context.Background(), &exptpb.RetryExperimentRequest{ WorkspaceID: gptr.Of(validWorkspaceID), diff --git a/backend/modules/evaluation/domain/service/expt_manage_execution_impl.go b/backend/modules/evaluation/domain/service/expt_manage_execution_impl.go index 12fc7a591..09eb5c93b 100644 --- a/backend/modules/evaluation/domain/service/expt_manage_execution_impl.go +++ b/backend/modules/evaluation/domain/service/expt_manage_execution_impl.go @@ -1039,7 +1039,7 @@ func (e *ExptMangerImpl) unlockCompletingRun(ctx context.Context, exptID, exptRu func (e *ExptMangerImpl) LogRun(ctx context.Context, exptID, exptRunID int64, mode entity.ExptRunMode, spaceID int64, itemIDs []int64, session *entity.Session) error { duration := time.Duration(e.configer.GetExptExecConf(ctx, spaceID).GetZombieIntervalSecond()) * time.Second - locked, err := e.mutex.LockBackoff(ctx, e.makeExptMutexLockKey(exptID), duration, time.Second) + locked, _, err := e.mutex.BackoffLockWithValue(ctx, e.makeExptMutexLockKey(exptID), strconv.FormatInt(exptRunID, 10), duration, time.Second) if err != nil { return err } diff --git a/backend/modules/evaluation/domain/service/expt_manage_execution_impl_test.go b/backend/modules/evaluation/domain/service/expt_manage_execution_impl_test.go index b2cb7c85a..e44358c09 100755 --- a/backend/modules/evaluation/domain/service/expt_manage_execution_impl_test.go +++ b/backend/modules/evaluation/domain/service/expt_manage_execution_impl_test.go @@ -554,8 +554,8 @@ func TestExptMangerImpl_LogRun(t *testing.T) { setup: func() { mgr.mutex.(*lockMocks.MockILocker). EXPECT(). - LockBackoff(ctx, gomock.Any(), gomock.Any(), time.Second). - Return(true, nil) + BackoffLockWithValue(ctx, gomock.Any(), "456", gomock.Any(), time.Second). + Return(true, "", nil) mgr.mtr.(*metricsMocks.MockExptMetric). EXPECT(). @@ -595,8 +595,8 @@ func TestExptMangerImpl_LogRun(t *testing.T) { setup: func() { mgr.mutex.(*lockMocks.MockILocker). EXPECT(). - LockBackoff(ctx, gomock.Any(), gomock.Any(), time.Second). - Return(false, nil) + BackoffLockWithValue(ctx, gomock.Any(), "456", gomock.Any(), time.Second). + Return(false, "", nil) }, wantErr: true, }, @@ -609,8 +609,8 @@ func TestExptMangerImpl_LogRun(t *testing.T) { setup: func() { mgr.mutex.(*lockMocks.MockILocker). EXPECT(). - LockBackoff(ctx, gomock.Any(), gomock.Any(), time.Second). - Return(true, nil) + BackoffLockWithValue(ctx, gomock.Any(), "456", gomock.Any(), time.Second). + Return(true, "", nil) mgr.mtr.(*metricsMocks.MockExptMetric). EXPECT(). @@ -632,8 +632,8 @@ func TestExptMangerImpl_LogRun(t *testing.T) { setup: func() { mgr.mutex.(*lockMocks.MockILocker). EXPECT(). - LockBackoff(ctx, gomock.Any(), gomock.Any(), time.Second). - Return(true, nil) + BackoffLockWithValue(ctx, gomock.Any(), "456", gomock.Any(), time.Second). + Return(true, "", nil) mgr.mtr.(*metricsMocks.MockExptMetric). EXPECT(). diff --git a/backend/modules/evaluation/domain/service/expt_run_scheduler_mode_impl.go b/backend/modules/evaluation/domain/service/expt_run_scheduler_mode_impl.go index 855f769e5..2fdf7fe44 100644 --- a/backend/modules/evaluation/domain/service/expt_run_scheduler_mode_impl.go +++ b/backend/modules/evaluation/domain/service/expt_run_scheduler_mode_impl.go @@ -5,7 +5,10 @@ package service import ( "context" + "crypto/sha1" "fmt" + "sort" + "strconv" "time" "github.com/bytedance/gg/gptr" @@ -1305,6 +1308,22 @@ func makeStartIdemKey(event *entity.ExptScheduleEvent) string { return fmt.Sprintf("expt_start:%v%v", event.ExptID, event.ExptRunID) } +// makeRetryItemsStartIdemKey 为 RetryItems 模式生成 idem key。 +// RetryItems 允许同一个 expt_run_id 上反复追加不同 item,因此 key 必须把 +// ExecEvalSetItemIDs 纳入指纹,否则前一次 retry 写下的 key 会让后续 retry +// 在 ExptStart 短路返回,跳过 resetEvalItems。MQ 重投递场景下 itemIDs 完全 +// 一致 → 指纹相同 → 仍然能正确去重。 +func makeRetryItemsStartIdemKey(event *entity.ExptScheduleEvent) string { + ids := append([]int64(nil), event.ExecEvalSetItemIDs...) + sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] }) + h := sha1.New() + for _, id := range ids { + _, _ = h.Write(strconv.AppendInt(nil, id, 10)) + _, _ = h.Write([]byte{','}) + } + return fmt.Sprintf("expt_start:%v%v:%x", event.ExptID, event.ExptRunID, h.Sum(nil)) +} + func makeEndIdemKey(event *entity.ExptScheduleEvent) string { return fmt.Sprintf("expt_end:%v%v", event.ExptID, event.ExptRunID) } @@ -1600,7 +1619,7 @@ func (e *ExptRetryItemsExec) Mode() entity.ExptRunMode { } func (e *ExptRetryItemsExec) ExptStart(ctx context.Context, event *entity.ExptScheduleEvent, expt *entity.Experiment) error { - idemKey := makeStartIdemKey(event) + idemKey := makeRetryItemsStartIdemKey(event) exist, err := e.idem.Exist(ctx, idemKey) if err != nil { return err diff --git a/backend/modules/evaluation/domain/service/expt_run_scheduler_mode_impl_test.go b/backend/modules/evaluation/domain/service/expt_run_scheduler_mode_impl_test.go index 34962922a..26d7c0b52 100644 --- a/backend/modules/evaluation/domain/service/expt_run_scheduler_mode_impl_test.go +++ b/backend/modules/evaluation/domain/service/expt_run_scheduler_mode_impl_test.go @@ -5682,3 +5682,33 @@ func TestExptTrialRunExec_ExptStart_OriginalPath(t *testing.T) { }) } } + +func TestMakeRetryItemsStartIdemKey(t *testing.T) { + base := &entity.ExptScheduleEvent{ExptID: 100, ExptRunID: 200} + + // 同 (exptID, runID, itemIDs) → 同 key(MQ 重投递保护不变) + a := *base + a.ExecEvalSetItemIDs = []int64{1, 2, 3} + b := *base + b.ExecEvalSetItemIDs = []int64{1, 2, 3} + assert.Equal(t, makeRetryItemsStartIdemKey(&a), makeRetryItemsStartIdemKey(&b)) + + // itemIDs 顺序变了 → 同 key(依赖排序) + c := *base + c.ExecEvalSetItemIDs = []int64{3, 1, 2} + assert.Equal(t, makeRetryItemsStartIdemKey(&a), makeRetryItemsStartIdemKey(&c)) + + // 同 (exptID, runID) 不同 itemIDs → 不同 key(修复点) + d := *base + d.ExecEvalSetItemIDs = []int64{1, 2, 4} + assert.NotEqual(t, makeRetryItemsStartIdemKey(&a), makeRetryItemsStartIdemKey(&d)) + + // 不同 (exptID, runID) → 不同 key + e := entity.ExptScheduleEvent{ExptID: 100, ExptRunID: 999, ExecEvalSetItemIDs: []int64{1, 2, 3}} + assert.NotEqual(t, makeRetryItemsStartIdemKey(&a), makeRetryItemsStartIdemKey(&e)) + + // 空 itemIDs 不 panic + empty := *base + empty.ExecEvalSetItemIDs = nil + assert.NotEmpty(t, makeRetryItemsStartIdemKey(&empty)) +}