Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions backend/modules/evaluation/application/experiment_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1254,16 +1254,14 @@ func (e *experimentApplication) RetryExperiment(ctx context.Context, req *expt.R

switch runMode {
case entity.EvaluationModeRetryItems:
rid, retried, err := e.manager.LogRetryItemsRun(ctx, req.GetExptID(), runMode, req.GetWorkspaceID(), req.GetItemIds(), session)
rid, _, err := e.manager.LogRetryItemsRun(ctx, req.GetExptID(), runMode, req.GetWorkspaceID(), req.GetItemIds(), session)
if err != nil {
return nil, err
}
runID = rid

if !retried {
if err := e.manager.RetryItems(ctx, req.GetExptID(), runID, req.GetWorkspaceID(), gptr.Indirect(got.EvalConf.ItemRetryNum), req.GetItemIds(), session, req.GetExt()); err != nil {
return nil, err
}
if err := e.manager.RetryItems(ctx, req.GetExptID(), runID, req.GetWorkspaceID(), gptr.Indirect(got.EvalConf.ItemRetryNum), req.GetItemIds(), session, req.GetExt()); err != nil {
return nil, err
}
default:
if runID, err = e.idgen.GenID(ctx); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6895,7 +6895,8 @@ func TestExperimentApplication_RetryExperiment_Branches(t *testing.T) {
mockManager.EXPECT().Get(gomock.Any(), validExptID, validWorkspaceID, gomock.Any()).Return(baseExpt, nil)
mockAuth.EXPECT().AuthorizationWithoutSPI(gomock.Any(), gomock.Any()).Return(nil)
mockManager.EXPECT().LogRetryItemsRun(gomock.Any(), validExptID, entity.EvaluationModeRetryItems, validWorkspaceID, []int64{1}, gomock.Any()).Return(validRunID, true, nil)
// RetryItems should NOT be called since retried=true
// RetryItems IS called even when retried=true, so the scheduler picks up appended items
mockManager.EXPECT().RetryItems(gomock.Any(), validExptID, validRunID, validWorkspaceID, itemRetryNum, []int64{1}, gomock.Any(), gomock.Any()).Return(nil)

resp, err := app.RetryExperiment(context.Background(), &exptpb.RetryExperimentRequest{
WorkspaceID: gptr.Of(validWorkspaceID),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,7 @@ func (e *ExptMangerImpl) unlockCompletingRun(ctx context.Context, exptID, exptRu

func (e *ExptMangerImpl) LogRun(ctx context.Context, exptID, exptRunID int64, mode entity.ExptRunMode, spaceID int64, itemIDs []int64, session *entity.Session) error {
duration := time.Duration(e.configer.GetExptExecConf(ctx, spaceID).GetZombieIntervalSecond()) * time.Second
locked, err := e.mutex.LockBackoff(ctx, e.makeExptMutexLockKey(exptID), duration, time.Second)
locked, _, err := e.mutex.BackoffLockWithValue(ctx, e.makeExptMutexLockKey(exptID), strconv.FormatInt(exptRunID, 10), duration, time.Second)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,8 +554,8 @@ func TestExptMangerImpl_LogRun(t *testing.T) {
setup: func() {
mgr.mutex.(*lockMocks.MockILocker).
EXPECT().
LockBackoff(ctx, gomock.Any(), gomock.Any(), time.Second).
Return(true, nil)
BackoffLockWithValue(ctx, gomock.Any(), "456", gomock.Any(), time.Second).
Return(true, "", nil)

mgr.mtr.(*metricsMocks.MockExptMetric).
EXPECT().
Expand Down Expand Up @@ -595,8 +595,8 @@ func TestExptMangerImpl_LogRun(t *testing.T) {
setup: func() {
mgr.mutex.(*lockMocks.MockILocker).
EXPECT().
LockBackoff(ctx, gomock.Any(), gomock.Any(), time.Second).
Return(false, nil)
BackoffLockWithValue(ctx, gomock.Any(), "456", gomock.Any(), time.Second).
Return(false, "", nil)
},
wantErr: true,
},
Expand All @@ -609,8 +609,8 @@ func TestExptMangerImpl_LogRun(t *testing.T) {
setup: func() {
mgr.mutex.(*lockMocks.MockILocker).
EXPECT().
LockBackoff(ctx, gomock.Any(), gomock.Any(), time.Second).
Return(true, nil)
BackoffLockWithValue(ctx, gomock.Any(), "456", gomock.Any(), time.Second).
Return(true, "", nil)

mgr.mtr.(*metricsMocks.MockExptMetric).
EXPECT().
Expand All @@ -632,8 +632,8 @@ func TestExptMangerImpl_LogRun(t *testing.T) {
setup: func() {
mgr.mutex.(*lockMocks.MockILocker).
EXPECT().
LockBackoff(ctx, gomock.Any(), gomock.Any(), time.Second).
Return(true, nil)
BackoffLockWithValue(ctx, gomock.Any(), "456", gomock.Any(), time.Second).
Return(true, "", nil)

mgr.mtr.(*metricsMocks.MockExptMetric).
EXPECT().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ package service

import (
"context"
"crypto/sha1"
"fmt"
"sort"
"strconv"
"time"

"github.com/bytedance/gg/gptr"
Expand Down Expand Up @@ -1305,6 +1308,22 @@ func makeStartIdemKey(event *entity.ExptScheduleEvent) string {
return fmt.Sprintf("expt_start:%v%v", event.ExptID, event.ExptRunID)
}

// makeRetryItemsStartIdemKey 为 RetryItems 模式生成 idem key。
// RetryItems 允许同一个 expt_run_id 上反复追加不同 item,因此 key 必须把
// ExecEvalSetItemIDs 纳入指纹,否则前一次 retry 写下的 key 会让后续 retry
// 在 ExptStart 短路返回,跳过 resetEvalItems。MQ 重投递场景下 itemIDs 完全
// 一致 → 指纹相同 → 仍然能正确去重。
func makeRetryItemsStartIdemKey(event *entity.ExptScheduleEvent) string {
ids := append([]int64(nil), event.ExecEvalSetItemIDs...)
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
h := sha1.New()
for _, id := range ids {
_, _ = h.Write(strconv.AppendInt(nil, id, 10))
_, _ = h.Write([]byte{','})
}
return fmt.Sprintf("expt_start:%v%v:%x", event.ExptID, event.ExptRunID, h.Sum(nil))
}

func makeEndIdemKey(event *entity.ExptScheduleEvent) string {
return fmt.Sprintf("expt_end:%v%v", event.ExptID, event.ExptRunID)
}
Expand Down Expand Up @@ -1600,7 +1619,7 @@ func (e *ExptRetryItemsExec) Mode() entity.ExptRunMode {
}

func (e *ExptRetryItemsExec) ExptStart(ctx context.Context, event *entity.ExptScheduleEvent, expt *entity.Experiment) error {
idemKey := makeStartIdemKey(event)
idemKey := makeRetryItemsStartIdemKey(event)
exist, err := e.idem.Exist(ctx, idemKey)
if err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5682,3 +5682,33 @@ func TestExptTrialRunExec_ExptStart_OriginalPath(t *testing.T) {
})
}
}

func TestMakeRetryItemsStartIdemKey(t *testing.T) {
base := &entity.ExptScheduleEvent{ExptID: 100, ExptRunID: 200}

// 同 (exptID, runID, itemIDs) → 同 key(MQ 重投递保护不变)
a := *base
a.ExecEvalSetItemIDs = []int64{1, 2, 3}
b := *base
b.ExecEvalSetItemIDs = []int64{1, 2, 3}
assert.Equal(t, makeRetryItemsStartIdemKey(&a), makeRetryItemsStartIdemKey(&b))

// itemIDs 顺序变了 → 同 key(依赖排序)
c := *base
c.ExecEvalSetItemIDs = []int64{3, 1, 2}
assert.Equal(t, makeRetryItemsStartIdemKey(&a), makeRetryItemsStartIdemKey(&c))

// 同 (exptID, runID) 不同 itemIDs → 不同 key(修复点)
d := *base
d.ExecEvalSetItemIDs = []int64{1, 2, 4}
assert.NotEqual(t, makeRetryItemsStartIdemKey(&a), makeRetryItemsStartIdemKey(&d))

// 不同 (exptID, runID) → 不同 key
e := entity.ExptScheduleEvent{ExptID: 100, ExptRunID: 999, ExecEvalSetItemIDs: []int64{1, 2, 3}}
assert.NotEqual(t, makeRetryItemsStartIdemKey(&a), makeRetryItemsStartIdemKey(&e))

// 空 itemIDs 不 panic
empty := *base
empty.ExecEvalSetItemIDs = nil
assert.NotEmpty(t, makeRetryItemsStartIdemKey(&empty))
}
Loading