-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmodel.py
More file actions
1036 lines (894 loc) · 49.8 KB
/
Copy pathmodel.py
File metadata and controls
1036 lines (894 loc) · 49.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
model.py
此模块定义了 FMLSTMAttentionModel 类,该类实现了一个结合了 DeepFM、LSTM 和多头自注意力机制的模型,适用于分类和回归任务。该模型设计灵活,能够捕捉特征之间的复杂交互关系,并且包含了针对不同任务类型的评估指标。此外,还定义了 RMSNorm、Attention 和 FeedForward 类,这些类在 Transformer 块中被使用,以增强模型的表达能力和性能。
"""
import math
from typing import Tuple, Optional
import torch
from torch._tensor import Tensor
from torch.nn import functional
from torch.nn.init import xavier_uniform_, zeros_, orthogonal_
from torch.nn.modules.container import ModuleList
from torch.nn.modules.dropout import Dropout
from torch.nn.modules.linear import Linear
from torch.nn.modules.loss import MSELoss, CrossEntropyLoss
from torch.nn.modules.module import Module
from torch.nn.modules.normalization import LayerNorm
from torch.nn.modules.rnn import LSTM
from torch.nn.modules.sparse import Embedding
from torch.nn.parameter import Parameter
from torchmetrics.classification.accuracy import MulticlassAccuracy
from torchmetrics.regression.r2 import R2Score
class FMLSTMAttentionModel(Module):
"""
基于 DeepFM、LSTM 和多头自注意力模型的分类及回归任务解决方案。
- 对于分类任务,输出层采用 softmax 激活函数和交叉熵损失函数。
- 对于回归任务,输出层采用 sigmoid 激活函数和均方误差损失函数。
- 该模型由三个主要部分组成:
1. DeepFM 组件:能够捕捉低阶和高阶的特征交互关系。
2. LSTM 组件:能够捕捉特征之间的序列关联关系。
3. 多头自注意力组件:能够捕捉特征之间的全局关联关系。
- 该模型设计具有灵活性,能够支持使用深层结构,并可根据所提供的配置灵活设置注意力机制。
- 该模型还包含了针对分类和回归任务的评估指标,使其适用于推荐系统和预测建模等众多领域。
- 该实现确保了权重和偏置的正确初始化,并使用了 dropout 和层归一化技术来防止过拟合并提高训练的稳定性。
- 该模型的设计便于进行扩展和修改,使其能够适应机器学习和深度学习领域的各种数据集和需求。
- 总的来说,该模型融合了 DeepFM、LSTM 和多头自注意力机制的优势,为分类和回归任务提供了强大且灵活的解决方案,使其成为推荐系统和预测建模领域从业者的重要工具。
- 该模型是使用 PyTorch 实现的,利用了其在高效构建和训练深度学习模型方面的强大功能。
- 多头自注意力机制的运用使模型能够捕捉特征之间的复杂相互关系,而 LSTM 组件则有助于捕捉序列数据中的时间依赖性,这使其在诸如时间序列数据或用户行为建模等特征顺序至关重要的任务中表现尤为出色。
构造函数参数:
problem_type: bool - 为分类任务时为 True,为回归任务时为 False
feature_size: int - 特征的规模(用于嵌入)
field_size: int - 每个时间步长的字段数量
seq_size: int - 序列长度(时间步数)
config: 具有众多属性的配置对象(见使用说明)在各处均有应用
所使用的关键配置项:
- class_size
- use_deep, embedding_size, deep_sizes, deep_norm_epses, deep_norm_elementwise_affines, deep_dropouts
- combination_size, combination_norm_eps, combination_norm_elementwise_affine, combination_dropout
- lstm_size, lstm_num_layers, lstm_recurrent_dropout, lstm_norm_eps, lstm_norm_elementwise_affine, lstm_dropout
- attention_size, attention_num_heads, attention_dropout, attention_norm_eps, attention_norm_elementwise_affine
- fm_first_norm_eps, fm_first_norm_elementwise_affine, fm_first_dropout
- fm_second_norm_eps, fm_second_norm_elementwise_affine, fm_second_dropout
"""
def __init__(self, problem_type, feature_size, field_size, seq_size, config):
super(FMLSTMAttentionModel, self).__init__()
self.problem_type = problem_type
self.field_size = field_size
self.seq_size = seq_size
self.class_size = config.class_size
self.use_deep = config.use_deep
self.embedding_size = config.embedding_size
self.deep_sizes = config.deep_sizes
self.deep_layer_num = len(self.deep_sizes)
self.deep_input_size = self.field_size * self.embedding_size
self.combination_size = config.combination_size
self.lstm_size = config.lstm_size
self.lstm_num_layers = config.lstm_num_layers
self.attention_size = config.attention_size
self.attention_num_heads = config.attention_num_heads
# attention_size 必须能够被等分,从而形成相等的“头”大小
assert self.attention_size % self.attention_num_heads == 0, 'The value of "attention_size" must be divisible by "attention_num_heads".'
self.attention_size_per_head = int(self.attention_size / self.attention_num_heads)
# 特征的嵌入(稀疏索引 -> 稠密向量)
self.embeds = Embedding(feature_size, self.embedding_size)
xavier_uniform_(self.embeds.weight)
# FM 中一阶项的嵌入(每个特征一个标量)
self.bias = Embedding(feature_size, 1)
xavier_uniform_(self.bias.weight)
# 用于 FM 项的归一化和丢弃层
# 一阶:对嵌入求和进行 (seq_size, field_size) 的归一化处理
self.fm_first_norm = LayerNorm([self.seq_size, self.field_size], config.fm_first_norm_eps,
config.fm_first_norm_elementwise_affine)
self.fm_first_dropout = Dropout(config.fm_first_dropout)
# 二阶:对交互项进行 (seq_size, embedding_size) 的归一化处理
self.fm_second_norm = LayerNorm([self.seq_size, self.embedding_size], config.fm_second_norm_eps,
config.fm_second_norm_elementwise_affine)
self.fm_second_dropout = Dropout(config.fm_second_dropout)
# 构建可选的深度多层感知器堆栈,该堆栈会使用扁平化的字段嵌入作为输入
combination_input_size = self.field_size + self.embedding_size
if self.use_deep:
i = 0
self.deep_layers = ModuleList()
self.deep_layer_norms = ModuleList()
self.deep_layer_dropouts = ModuleList()
deep_norm_epses = config.deep_norm_epses
deep_norm_elementwise_affines = config.deep_norm_elementwise_affines
deep_dropouts = config.deep_dropouts
deep_size = self.deep_sizes[i]
# 第一深层层将平坦字段嵌入映射到预设的深度大小上
layer = Linear(self.deep_input_size, deep_size, True)
xavier_uniform_(layer.weight)
zeros_(layer.bias)
self.deep_layers.append(layer)
self.deep_layer_norms.append(
LayerNorm([self.seq_size, deep_size], deep_norm_epses[i], deep_norm_elementwise_affines[i]))
self.deep_layer_dropouts.append(Dropout(deep_dropouts[i]))
i += 1
# 更深层的部分(如果有的话)
while i < self.deep_layer_num:
last_deep_size = deep_size
deep_size = self.deep_sizes[i]
layer = Linear(last_deep_size, deep_size, True)
xavier_uniform_(layer.weight)
zeros_(layer.bias)
self.deep_layers.append(layer)
self.deep_layer_norms.append(
LayerNorm([self.seq_size, deep_size], deep_norm_epses[i], deep_norm_elementwise_affines[i]))
self.deep_layer_dropouts.append(Dropout(deep_dropouts[i]))
i += 1
# 与 FM 输出相连接的深层输出会增加组合输入的规模
combination_input_size += deep_size
# 线性组合层用于在每个时间步将 FM 和深度模型的输出进行合并
self.combination = Linear(combination_input_size, self.combination_size, True)
xavier_uniform_(self.combination.weight)
zeros_(self.combination.bias)
self.combination_norm = LayerNorm([self.seq_size, self.combination_size], config.combination_norm_eps,
config.combination_norm_elementwise_affine)
self.combination_dropout = Dropout(config.combination_dropout)
# 在序列维度上使用 LSTM
self.seq_lstm = LSTM(
input_size=self.combination_size,
hidden_size=self.lstm_size,
num_layers=self.lstm_num_layers,
bias=True,
batch_first=True,
dropout=config.lstm_recurrent_dropout,
bidirectional=False
)
# 初始化 LSTM 的权重 (input-hidden, hidden-hidden, biases)
seq_lstm_parameters = self.seq_lstm.named_parameters()
for seq_lstm_param_name, seq_lstm_param in seq_lstm_parameters:
if 'weight_ih' in seq_lstm_param_name:
xavier_uniform_(seq_lstm_param)
elif 'weight_hh' in seq_lstm_param_name:
orthogonal_(seq_lstm_param)
elif 'bias' in seq_lstm_param_name:
zeros_(seq_lstm_param)
self.lstm_norm = LayerNorm([self.seq_size, self.lstm_size], config.lstm_norm_eps,
config.lstm_norm_elementwise_affine)
self.lstm_dropout = Dropout(config.lstm_dropout)
# 注意组件。当 attention_num_heads 小于或等于 1 时,我们将简化为更简单的 attention_query;否则,我们将使用独立的键/值转换以及回溯映射。
self.attention_query = None
self.attention = None
if self.attention_num_heads <= 1:
# 在整个 attention_size 范围内,针对每个位置进行标量查询
self.attention_query = Linear(self.attention_size, 1, False)
else:
# 每个头的查询以及最后的线性运算以将各个头进行混合处理
self.attention_query = Linear(self.attention_size_per_head, 1, False)
self.attention = Linear(self.attention_size, self.attention_size, False)
xavier_uniform_(self.attention.weight)
xavier_uniform_(self.attention_query.weight)
# 键值转换项目将 LSTMs 的隐藏状态转换为 attention_size
self.attention_key = Linear(self.lstm_size, self.attention_size, False)
xavier_uniform_(self.attention_key.weight)
self.attention_value = Linear(self.lstm_size, self.attention_size, False)
xavier_uniform_(self.attention_value.weight)
self.attention_norm = LayerNorm(self.attention_size, config.attention_norm_eps,
config.attention_norm_elementwise_affine)
self.attention_dropout = Dropout(config.attention_dropout)
# 分类或回归的最终预测结果及指标
self.projection = None
self.accuracy = None
self.r2 = None
if self.problem_type:
# 分类
self.projection = Linear(self.attention_size, self.class_size, True)
self.target_loss = CrossEntropyLoss()
self.accuracy = MulticlassAccuracy(num_classes=self.class_size)
else:
# 回归
self.projection = Linear(self.attention_size, 1, True)
self.target_loss = MSELoss()
self.r2 = R2Score(num_outputs=1)
xavier_uniform_(self.projection.weight)
zeros_(self.projection.bias)
def predict(self, feat_index, feat_value):
"""
进行一次前向传播,生成原始输出(在损失计算之前)。
:param feat_index: 一个形状为 (batch_size, seq_size, field_size) 的 LongTensor,其中包含对嵌入表中元素的索引
:param feat_value: 具有相同形状的 Tensor,其中包含特征值 (floats)。通常为 0/1 值或特征值的缩放形式
:return: 对于分类任务:形状为 (batch_size, class_size) 且包含原始对数几率的 Tensor。对于回归任务:形状为 (batch_size,) 的 Tensor,其中包含经过 Sigmoid 函数处理后的预测值。
"""
# 对与嵌入进行元素级乘法运算的值进行重新排列: (batch, seq, field) -> (batch, seq, field, 1)
reshaped_feat_value = feat_value.reshape(-1, self.seq_size, self.field_size, 1)
# 将嵌入向量乘以每个字段的标量特征值
embeddings = torch.multiply(self.embeds(feat_index), reshaped_feat_value)
# 构建 FM 组件:
# 1)一阶项:各字段中特征值与嵌入的乘积之和 -> 求和后的形状: (batch, seq, field) 然后进行层归一化和 dropout 处理
# 2)二阶项:0.5 * (sum(v)^2 - sum(v^2)) ,从而得出每个嵌入维度下的交互向量
outs = [
self.fm_first_dropout(
self.fm_first_norm(
torch.sum(
torch.multiply(
self.bias(
feat_index
),
reshaped_feat_value
),
3
)
)
),
self.fm_second_dropout(
self.fm_second_norm(
0.5 * torch.subtract(
torch.square(
torch.sum(
embeddings,
2
)
),
torch.sum(
torch.square(
embeddings
),
2
)
)
)
)
]
# 可选的深度组件:在每个时间步长处,对已扁平化的字段嵌入运行一个小的多层感知机模型。
if self.use_deep:
i = 0
deep_out = embeddings.reshape(-1, self.seq_size, self.deep_input_size)
while i < self.deep_layer_num:
# 每一层: Linear -> LayerNorm -> ReLU -> Dropout
deep_out = self.deep_layer_dropouts[i](
torch.relu(self.deep_layer_norms[i](self.deep_layers[i](deep_out))))
i += 1
outs.append(deep_out)
# 将 FM 和深度模型的输出沿着最后一个维度(即每个时间步的特征)进行连接
combination_out = self.combination_dropout(self.combination_norm(self.combination(torch.concat(outs, 2))))
batch_size = combination_out.size(0)
device = next(self.parameters()).device
# 将 LSTM 的隐藏状态初始化为零值
h0 = torch.zeros(self.lstm_num_layers, batch_size, self.lstm_size, device=device)
lstm_out, _ = self.seq_lstm(combination_out, (h0, torch.zeros_like(h0, device=device)))
# 在 LSTM 之后应用归一化和丢弃操作
lstm_out = self.lstm_dropout(self.lstm_norm(lstm_out))
# 计算注意力输出。有两个分支:
# - 单头模式:attention_query 接收完整的 attention_size 参数,并针对每个位置返回标量值
# - 多头情况:每个头的查询以及一个额外的线性层用于混合各个头的信息
attention_out = None
if self.attention_num_heads <= 1:
# 将计算出的值投影到 attention_size,然后进行转置以便用于矩阵乘法运算
# attention_query 会针对每个位置输出一个标量分数,并在时间维度上进行 softmax 处理
attention_out = torch.bmm(
torch.transpose(
self.attention_value(
lstm_out
),
1,
2
),
functional.softmax(
self.attention_query(
self.attention_key(
lstm_out
)
) / math.sqrt(self.attention_size),
dim=1
)
).squeeze(
2
)
else:
# 多头注意力流(自定义折叠实现):
# - 项目中的值、键以及查询 (batch, seq, heads, head_dim)
# - 使用 attention_query 来计算权重(然后对序列进行求 softmax)
# - 将数值乘以注意力权重,然后将头层重新合并为一个单一的向量
attention_out = self.attention(torch.matmul(torch.transpose(
self.attention_value(lstm_out).view(batch_size, self.seq_size, self.attention_num_heads,
self.attention_size_per_head).transpose(1, 2), 2, 3),
functional.softmax(self.attention_query(
self.attention_key(lstm_out).view(batch_size, self.seq_size,
self.attention_num_heads,
self.attention_size_per_head).transpose(
1, 2)) / math.sqrt(self.attention_size_per_head),
dim=2)).squeeze(3).view(batch_size,
self.attention_size))
# 最终预测及可选激活(用于回归的 Sigmoid 函数)
result = self.projection(self.attention_dropout(self.attention_norm(attention_out)))
if self.problem_type:
return result.reshape(-1, self.class_size)
else:
# 对于回归任务,会返回一个经过 Sigmoid 函数处理后的形状为 1D 的 Tensor
return torch.sigmoid(result).reshape(-1)
def loss(self, pred, label):
"""
计算预测值与标签之间的损失。
:param pred: 模型输出(分类任务的预测值、回归任务的标量预测值)
:param label: 真实标签
:return: 标量损失 Tensor
"""
return self.target_loss(pred, label)
def forward(self, feat_index, feat_value, label=None):
"""
标准的“torch 模块前向传播”函数:如果提供了 label,则返回损失值;否则返回预测结果。
:param feat_index: 一个形状为 (batch_size, seq_size, field_size) 的 LongTensor,其中包含对嵌入表中元素的索引
:param feat_value: 具有相同形状的 Tensor,其中包含特征值 (floats)。通常为 0/1 值或特征值的缩放形式
:param label: 真实标签
:return: 如果提供了 label,则返回损失值;否则返回预测结果。
"""
pred = self.predict(feat_index, feat_value)
if label is not None:
return self.loss(pred, label)
return pred
def evaluate(self, pred, label):
"""
计算评估分数和损失值。
对于分类任务:返回(accuracy, loss)。
对于回归模型:返回值为 (R^2 score, loss)。
注意:在使用 torchmetrics 对象时,会先对其进行重置操作,以便对所提供的批次进行计算。
:param pred: 模型输出(分类任务的预测值、回归任务的标量预测值)
:param label: 真实标签
:return: 评估分数和损失值
"""
score = None
label = label.reshape(-1)
if self.problem_type:
self.accuracy.reset()
score = self.accuracy(torch.argmax(pred, dim=1), label)
else:
self.r2.reset()
score = self.r2(pred, label)
return score, self.loss(pred, label)
def predict_and_evaluate(self, feat_index, feat_value, label):
"""
简便方法:先进行预测,然后在同一批次数据上进行评估。
:param feat_index: 一个形状为 (batch_size, seq_size, field_size) 的 LongTensor,其中包含对嵌入表中元素的索引
:param feat_value: 具有相同形状的 Tensor,其中包含特征值 (floats)。通常为 0/1 值或特征值的缩放形式
:param label: 真实标签
:return: 评估分数和损失值
"""
return self.evaluate(self.predict(feat_index, feat_value), label)
class RMSNorm(Module):
"""
均方根层归一化 (RMSNorm)。
RMSNorm 是通过对最后一个维度的均方根值进行归一化(不进行均值减法)来实现归一化的,然后通过一个学习得到的权重参数进行缩放。
参数:
- dim: 最后的尺寸以进行标准化处理
- eps: 使用小的“ε”值以避免除以零的情况发生
"""
def __init__(self, dim: int, eps: Optional[float] = 1e-6):
super().__init__()
self.eps = eps
# 尺度参数(已学习的)
self.weight = Parameter(torch.ones(dim))
def _norm(self, x):
# 对 x 进行归一化处理,使其在最后一个维度上按照均方根(RMS)进行标准化: x * rsqrt(mean(x^2) + eps)
return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)
def forward(self, x):
# 将数据转换为 float 以确保数值稳定性,然后再转换回原始数据类型
output = self._norm(x.float()).type_as(x)
return output * self.weight
def precompute_freqs_cis(dim: int, end: int, theta: float = 10000.0):
"""
预先计算复数值的旋转嵌入频率。
生成一个形状为 (end, dim/2) 的 Tensor,该 Tensor 表示旋转嵌入中每个位置的角度(相位)。返回的 Tensor 采用 torch.polar 函数以复数极坐标形式表示。
:param dim: 全尺寸(旋转部件采用成对设计,因此有效角度为尺寸的二分之一)
:param end: 要计算频率所对应的序列长度
:param theta:基础频率缩放(默认值为 10000,类似于常见的旋转嵌入)
:return: 带有复数元素的 Tensor,用于表示每个位置和频率对应的 cos+i sin。形状: (end, dim/2) ,其中维度假定为偶数。
"""
# 每对数据的频率(两个实数维度 -> 一个复数频率)
freqs = 1.0 / (theta ** (torch.arange(0, dim, 2)[: (dim // 2)].float() / dim))
t = torch.arange(end, device=freqs.device)
freqs = torch.outer(t, freqs).float()
# 极坐标表示:模长为 1,角度频率 -> cos + i sin
freqs_cis = torch.polar(torch.ones_like(freqs), freqs)
return freqs_cis
def reshape_for_broadcast(freqs_cis: Tensor, x: Tensor):
"""
将预先计算的 freqs_cis 数据重新排列,使其能够与输入 Tensor x 正确地进行广播操作。
:param freqs_cis: 形状 (seq_len, head_dim)
:param x: Tensor 的第二个维度为 seq_len,最后一个维度为完整的复数对维度
:return: 重新排列后的 freqs_cis 变量,在被视为复数形式后能够与 x 相乘
"""
ndim = x.ndim
assert 0 <= 1 < ndim
assert freqs_cis.shape == (x.shape[1], x.shape[-1])
shape = [d if i == 1 or i == ndim - 1 else 1 for i, d in enumerate(x.shape)]
return freqs_cis.view(*shape)
def apply_rotary_emb(
xq: Tensor,
xk: Tensor,
xq_freqs_cis: Tensor,
xk_freqs_cis: Optional[Tensor] = None
) -> Tuple[Tensor, Tensor]:
"""
将旋转位置嵌入应用到查询 Tensor 和键 Tensor 上。
该实现要求 xq 和 xk 为实值 Tensor,其最后一个维度的排列方式应为表示复数的成对形式: (..., 2)。我们将它们转换为复数形式,乘以预先计算的相位因子,然后返回展平后的实数对。
:param xq: 对形状为 (..., 2) 的查询 Tensor,其中最后一轴为成对的实数
:param xk: 形状为 (..., 2) 的键 Tensor,其中最后一轴为成对的实数
:param xq_freqs_cis: 应用于查询的频率(复数形式);广播后形状必须与之匹配
:param xk_freqs_cis: 可选的键单独频率设置。若未指定,则使用 xq_freqs_cis
:return: tuple (xq_out, xk_out): 经过旋转操作后,生成的均为与输入具有相同形状的实数 Tensor
"""
# 将最后维度的对称表示形式转换为复数形式
xq_ = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2))
xk_ = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2))
# 将频率向量重新排列,以便能够扩展到复数 Tensor 上
xq_freqs_cis = reshape_for_broadcast(xq_freqs_cis, xq_)
xk_freqs_cis = reshape_for_broadcast(xk_freqs_cis, xk_) if xk_freqs_cis is not None else xq_freqs_cis
# 在复数域中通过相位因子(旋转)进行乘法运算
xq_out = torch.view_as_real(xq_ * xq_freqs_cis).flatten(3)
xk_out = torch.view_as_real(xk_ * xk_freqs_cis).flatten(3)
# 恢复为原始数据类型并返回
return xq_out.type_as(xq), xk_out.type_as(xk)
class Attention(Module):
"""
Transformer 模块所使用的多头注意力模块。
此实现支持:
- 旋转位置嵌入(应用于查询和键)
- 为多层场景提供的外部可选因果掩码
构造函数参数:
combination_size: 每个位置的输入维度(模型隐藏维度)
attention_size: 用于生成 Q/K/V 的总维度 (attention_num_heads * head_dim)
attention_num_heads: 注意力头的数量
attention_size_per_head: 每个头的维度 (head_dim)
"""
def __init__(self, combination_size: int, attention_size: int, attention_num_heads: int,
attention_size_per_head: int):
super().__init__()
self.attention_num_heads = attention_num_heads
self.attention_size_per_head = attention_size_per_head
# Q/K/V 的线性变换及输出投影
self.wq = Linear(
combination_size,
attention_size,
False
)
self.wk = Linear(
combination_size,
attention_size,
False
)
self.wv = Linear(
combination_size,
attention_size,
False
)
self.wo = Linear(
attention_size,
combination_size,
False
)
def forward(
self,
x: Tensor,
freqs_cis: Tensor,
mask: Optional[Tensor] = None,
xq_freqs_cis: Optional[Tensor] = None
):
"""
标准的“torch 模块前向传播”函数。
:param x: 输入 Tensor 的形状 (batch, seqlen, combination_size)
:param freqs_cis: 键的预计算旋转频率 (seq_len, head_dim)
:param mask: 可选的附加掩码(例如,对于被遮挡的位置使用 -inf 值)将被添加到注意力得分中
:param xq_freqs_cis: 查询的可选频率(用于因果最后一步注意力模式)
:return: 根据是否提供了 xq_freqs_cis 参数,其形状为 (batch, 1 or seqlen, attention_size) 的上下文向量
"""
xq = None
xk = None
bsz, seqlen, _ = x.shape
# 值始终针对完整序列进行计算
xv = self.wv(x)
if xq_freqs_cis is not None:
# 当提供了 xq_freqs_cis 时,我们仅对最后一个位置计算查询(自回归注意力)
xq, xk = self.wq(x[:, seqlen - 1:seqlen, :]), self.wk(x)
# 重塑为 (batch, 1, heads, head_dim) 和 (batch, seqlen, heads, head_dim)
xq = xq.view(bsz, 1, self.attention_num_heads, self.attention_size_per_head)
xk = xk.view(bsz, seqlen, self.attention_num_heads, self.attention_size_per_head)
# 将旋转嵌入应用到查询和键上
xq, xk = apply_rotary_emb(xq, xk, xq_freqs_cis, xk_freqs_cis=freqs_cis)
else:
# 标准情况:先计算所有位置的 Q 和 K 值,然后对整个序列应用旋转操作
xq, xk = self.wq(x), self.wk(x)
xq = xq.view(bsz, seqlen, self.attention_num_heads, self.attention_size_per_head)
xk = xk.view(bsz, seqlen, self.attention_num_heads, self.attention_size_per_head)
xq, xk = apply_rotary_emb(xq, xk, freqs_cis)
xv = xv.view(bsz, seqlen, self.attention_num_heads, self.attention_size_per_head)
keys = xk
values = xv
# 将数据转置为 (batch, heads, seq, head_dim) 的格式,以便按每个头计算注意力值
xq = xq.transpose(1, 2)
keys = keys.transpose(1, 2)
values = values.transpose(1, 2)
# 分数: (batch, heads, qlen, klen)
scores = torch.matmul(xq, keys.transpose(2, 3)) / math.sqrt(self.attention_size_per_head)
if mask is not None:
# 掩码是累加式的(例如,在使用 softmax 进行归一化之前,对于不允许出现的位置应设置为 -inf)
scores = scores + mask
# 在关键维度上应用 softmax 函数,并计算注意力加权求和结果
scores = functional.softmax(scores.float(), dim=-1).type_as(xq)
output = torch.matmul(scores, values)
# 重新排列为 (batch, seq or 1, attention_size)
output = output.transpose(1, 2).contiguous().view(bsz, 1 if xq_freqs_cis is not None else seqlen, -1)
return self.wo(output)
class FeedForward(Module):
"""
在 Transformer 块内部使用的前馈网络。
采用门控线性单元式的前馈结构:
out = w2(silu(w1(x)) * w3(x))
'hidden_dim' 是根据 combination_size 计算得出的,并且可能会乘以/四舍五入到 `multiple_of` 的整数倍,以符合常见的 Transformer 设计。
构造函数参数:
- combination_size: 输入和输出的维度(Transformer 块的隐藏维度)
- hidden_dim: 前馈网络中间层的维度(通常是 combination_size 的 2-4 倍)
- multiple_of: hidden_dim 应该是这个值的倍数,以确保与特定硬件优化兼容
- ffn_dim_multiplier: 可选的乘数,用于调整 hidden_dim 的规模(例如,某些 Transformer 变体可能会使用更大的前馈层)
"""
def __init__(
self,
combination_size: int,
hidden_dim: int,
multiple_of: int,
ffn_dim_multiplier: Optional[float]
):
super().__init__()
# 使用内部启发式算法来确定隐藏维度,并允许 ffn 乘数的存在
hidden_dim = int(2 * hidden_dim / 3)
if ffn_dim_multiplier is not None:
hidden_dim = int(ffn_dim_multiplier * hidden_dim)
hidden_dim = multiple_of * ((hidden_dim + multiple_of - 1) // multiple_of)
self.w1 = Linear(
combination_size, hidden_dim, False
)
self.w2 = Linear(
hidden_dim, combination_size, False
)
self.w3 = Linear(
combination_size, hidden_dim, False
)
def forward(self, x):
# 使用第二个线性层进行 silu 激活门控以实现乘法门控
return self.w2(functional.silu(self.w1(x)) * self.w3(x))
class TransformerBlock(Module):
"""
单个 Transformer 块由 RMSNorm -> Attention -> Residual -> FeedForward -> Residual 组成。
这种 Transformer 先采用注意力机制,然后是带有 RMSNorm 样式归一化的前馈网络。
该模块支持两种模式:
- 如果提供了 xq_freqs_cis 参数,该块将假定查询仅针对最后一个时间步(即自回归的最后一个时间步)进行
- 否则将使用完整的序列注意力机制(上游可能还会应用掩码操作)
属性:
layer_id: 外部 Transformer 所使用的整数标识符,用于决定特殊情况下顶层的行为
"""
def __init__(self, in_layer_id: int, combination_size: int, attention_size: int, attention_num_heads: int,
attention_size_per_head: int, multiple_of: int, ffn_dim_multiplier: Optional[float],
norm_eps: Optional[float] = 1e-6):
super().__init__()
self.attention = Attention(combination_size, attention_size, attention_num_heads, attention_size_per_head)
self.feed_forward = FeedForward(
combination_size=combination_size,
hidden_dim=4 * combination_size,
multiple_of=multiple_of,
ffn_dim_multiplier=ffn_dim_multiplier
)
self.__layer_id = in_layer_id
self.attention_norm = RMSNorm(combination_size, eps=norm_eps)
self.ffn_norm = RMSNorm(combination_size, eps=norm_eps)
@property
def layer_id(self) -> int:
return self.__layer_id
def forward(
self,
x: Tensor,
freqs_cis: Tensor,
mask: Optional[Tensor] = None,
xq_freqs_cis: Optional[Tensor] = None
):
"""
单个 Transformer 块的前向传播过程。
:param x: (batch, seqlen, combination_size)
:param freqs_cis: 旋转频率
:param mask: 用于全序列注意力的可选附加注意力掩码
:param xq_freqs_cis: 可选的仅用于查询的旋转频率(用于最后一步的注意力计算)
:return: 与输入 x 具有相同形状的 Tensor(经过残差连接后)
"""
h = None
if xq_freqs_cis is not None:
# 最后一步注意力路径:仅针对最后一个位置计算新的输出值
_, seqlen, _ = x.shape
h = x[:, seqlen - 1:seqlen, :] + self.attention.forward(
self.attention_norm(x), freqs_cis, xq_freqs_cis=xq_freqs_cis
)
else:
# 如果有掩码,则采用全序列注意力机制
h = x + self.attention.forward(
self.attention_norm(x), freqs_cis, mask=mask
)
out = h + self.feed_forward.forward(self.ffn_norm(h))
return out
class Transformer(Module):
"""
由多个 TransformerBlock 层组成的 Transformer。
这种 Transformer 会预先计算出针对序列长度的旋转嵌入 (freqs_cis) ,并且在存在多个层的情况下,会构建一个上三角形掩码以实现因果注意力机制。
构造函数参数:
- transformer_num_layers: Transformer 块的数量
- seq_size: 输入序列的长度(用于预计算旋转嵌入和掩码)
- combination_size: 每个位置的输入/输出维度(Transformer 块的隐藏维度)
- attention_size: 注意力机制中查询/键/值的总维度(通常是 combination_size 的 2-4 倍)
- attention_num_heads: 注意力头的数量
- attention_size_per_head: 每个头的维度 (head_dim),必须满足 attention_size % attention_num_heads == 0
- multiple_of: 前馈网络中间层的维度应该是这个值的倍数,以确保与特定硬件优化兼容
- ffn_dim_multiplier: 可选的乘数,用于调整前馈网络中间层的规模(例如,某些 Transformer 变体可能会使用更大的前馈层)
- norm_eps: 用于 RMSNorm 的 epsilon 值,以确保数值稳定性
"""
def __init__(self, transformer_num_layers: int, seq_size: int, combination_size: int, attention_size: int,
attention_num_heads: int, attention_size_per_head: int, multiple_of: int,
ffn_dim_multiplier: Optional[float], norm_eps: Optional[float] = 1e-6):
super().__init__()
self.num_layers = transformer_num_layers
self.top_layer = transformer_num_layers - 1
self.layers = ModuleList()
num_layers_range = range(transformer_num_layers)
for layer_id in num_layers_range:
self.layers.append(TransformerBlock(layer_id, combination_size, attention_size, attention_num_heads,
attention_size_per_head, multiple_of, ffn_dim_multiplier, norm_eps))
self.norm = RMSNorm(combination_size, eps=norm_eps)
# 预先计算整个序列以及最后一个查询的旋转嵌入的余弦值/正弦值(复数值)
self.freqs_cis = precompute_freqs_cis(
attention_size_per_head, seq_size
)
# xq_freqs_cis 是用于自回归查询计算的最后一个时间步的频率行
self.xq_freqs_cis = self.freqs_cis[seq_size - 1:seq_size, :]
# 构建一个因果掩码:将上三角矩阵中的元素设为负无穷大,使其位于对角线之上 -> 以此防止模型关注后续的标记
self.mask = torch.full(
(seq_size, seq_size), float('-inf')
)
self.mask = torch.triu(self.mask, diagonal=1)
def forward(self, combinations: Tensor):
"""
运行堆叠的 Transformer 层。
:param combinations: Tensor 的形状 (batch, seq_len, combination_size)
:return: 形状为 (batch, combination_size) 的 Tensor ——这是最后一个位置的规范化表示(该代码会压缩序列维度,并返回顶层的向量)
"""
device = combinations.device
h = combinations
freqs_cis = self.freqs_cis.to(device)
xq_freqs_cis = self.xq_freqs_cis.to(device)
mask = None
if self.num_layers >= 2:
# 对于多层结构,我们向内部层传递因果掩码(以防止后续信息泄露)
mask = self.mask.to(device)
mask = mask.type_as(h)
for layer in self.layers:
# 仅将 xq_freqs_cis 用于顶层(自回归最后一步注意力),其他层则采用带有掩码的全序列注意力机制
h = layer(h, freqs_cis, xq_freqs_cis=xq_freqs_cis) if layer.layer_id == self.top_layer else layer(h,
freqs_cis,
mask=mask)
# 对最终的末位表示进行归一化处理并返回(消除时间维度)
h = self.norm(h.squeeze(1))
return h.float()
class FMLlamaModel(Module):
"""
基于 DeepFM、Llama 自回归大模型架构模型的分类及回归任务解决方案。
- 该模型由两个主要部分组成:
1. DeepFM 组件:能够捕捉低阶和高阶的特征交互关系。
2. Llama 自回归大模型架构组件:能够捕捉含有旋转嵌入位置编码的增强特征之间的全局关联关系。
- 该模型设计具有灵活性,能够支持使用深层结构,并可根据所提供的配置灵活设置注意力机制。
- 该模型还包含了针对分类和回归任务的评估指标,使其适用于推荐系统和预测建模等众多领域。
- 该实现确保了权重和偏置的正确初始化,并使用了 dropout 和层归一化技术来防止过拟合并提高训练的稳定性。
- 该模型的设计便于进行扩展和修改,使其能够适应机器学习和深度学习领域的各种数据集和需求。
- 总的来说,该模型在较高层面上与 FMLSTMAttentionModel 类似,但将 LSTM + 多头自注意力机制替换为了一个利用旋转嵌入进行位置编码的 Transformer-Decoder 模型,为硬件资源充裕且具有大规模有效数据的场景提供了强大且灵活的解决方案,使其成为序列建模领域从业者的重要工具。
- 该模型是使用 PyTorch 实现的,利用了其在高效构建和训练深度学习模型方面的强大功能。
- DeepFM 可以捕捉特征之间的低阶和高阶交互关系,而 Llama 自回归大模型架构通过堆叠 Transformer-Decoder 充分扩容模型的参数量捕捉含有旋转嵌入位置编码的增强特征之间的全局关联关系,这使其在序列建模任务中表现尤为出色。
前向流程:
- 计算 FM 的一阶和二阶项
- (可选)对按时间步划分后的嵌入 Tensor 进行深度多层感知机运算
- 将这些内容合并成每一步的组合 Tensor
- 将组合 Tensor 的序列输入到 Transformer 中
- 将项目 Transformer 的输出结果映射至目标值(分类预测值或回归值)
这里的投影使用的是来自 Transformer 的 combination_size 输出(而非 attention_size)。
构造函数参数:
- problem_type: 一个布尔值,指示这是一个分类任务(True)还是回归任务(False)
- feature_size: 嵌入表的大小(即特征的总数量)
- field_size: 每个时间步长中的字段数量
- seq_size: 输入序列的长度
- config: 包含模型超参数的配置对象
"""
def __init__(self, problem_type, feature_size, field_size, seq_size, config):
super(FMLlamaModel, self).__init__()
self.problem_type = problem_type
self.field_size = field_size
self.seq_size = seq_size
self.class_size = config.class_size
self.use_deep = config.use_deep
self.embedding_size = config.embedding_size
self.deep_sizes = config.deep_sizes
self.deep_layer_num = len(self.deep_sizes)
self.deep_input_size = self.field_size * self.embedding_size
self.combination_size = config.combination_size
self.attention_size = config.attention_size
self.attention_num_heads = config.attention_num_heads
# 确保头部尺寸的可分割性
assert self.attention_size % self.attention_num_heads == 0, 'The value of "attention_size" must be divisible by "attention_num_heads".'
self.attention_size_per_head = int(self.attention_size / self.attention_num_heads)
# 特征的嵌入(稀疏索引 -> 稠密向量)
self.embeds = Embedding(feature_size, self.embedding_size)
xavier_uniform_(self.embeds.weight)
# FM 中一阶项的嵌入(每个特征一个标量)
self.bias = Embedding(feature_size, 1)
xavier_uniform_(self.bias.weight)
# 用于 FM 项的归一化和丢弃层
self.fm_first_norm = LayerNorm([self.seq_size, self.field_size], config.fm_first_norm_eps,
config.fm_first_norm_elementwise_affine)
self.fm_first_dropout = Dropout(config.fm_first_dropout)
self.fm_second_norm = LayerNorm([self.seq_size, self.embedding_size], config.fm_second_norm_eps,
config.fm_second_norm_elementwise_affine)
self.fm_second_dropout = Dropout(config.fm_second_dropout)
# 深度多层感知机(可选)
combination_input_size = self.field_size + self.embedding_size
if self.use_deep:
i = 0
self.deep_layers = ModuleList()
self.deep_layer_norms = ModuleList()
self.deep_layer_dropouts = ModuleList()
deep_norm_epses = config.deep_norm_epses
deep_norm_elementwise_affines = config.deep_norm_elementwise_affines
deep_dropouts = config.deep_dropouts
deep_size = self.deep_sizes[i]
layer = Linear(self.deep_input_size, deep_size, True)
xavier_uniform_(layer.weight)
zeros_(layer.bias)
self.deep_layers.append(layer)
self.deep_layer_norms.append(
LayerNorm([self.seq_size, deep_size], deep_norm_epses[i], deep_norm_elementwise_affines[i]))
self.deep_layer_dropouts.append(Dropout(deep_dropouts[i]))
i += 1
while i < self.deep_layer_num:
last_deep_size = deep_size
deep_size = self.deep_sizes[i]
layer = Linear(last_deep_size, deep_size, True)
xavier_uniform_(layer.weight)
zeros_(layer.bias)
self.deep_layers.append(layer)
self.deep_layer_norms.append(
LayerNorm([self.seq_size, deep_size], deep_norm_epses[i], deep_norm_elementwise_affines[i]))
self.deep_layer_dropouts.append(Dropout(deep_dropouts[i]))
i += 1
combination_input_size += deep_size
# 每个时间步的组合线性层
self.combination = Linear(combination_input_size, self.combination_size, True)
xavier_uniform_(self.combination.weight)
zeros_(self.combination.bias)
self.combination_norm = LayerNorm([self.seq_size, self.combination_size], config.combination_norm_eps,
config.combination_norm_elementwise_affine)
self.combination_dropout = Dropout(config.combination_dropout)
# 每步时间都使用组合 Tensor 的 Transformer
self.transformer = Transformer(
config.transformer_num_layers,
seq_size,
self.combination_size,
self.attention_size,
self.attention_num_heads,
self.attention_size_per_head,
config.multiple_of,
config.ffn_dim_multiplier,
config.norm_eps
)
self.attention_dropout = Dropout(config.attention_dropout)
# 输出预测与指标
self.projection = None
self.accuracy = None
self.r2 = None
if self.problem_type:
self.projection = Linear(self.combination_size, self.class_size, True)
self.target_loss = CrossEntropyLoss()
self.accuracy = MulticlassAccuracy(num_classes=self.class_size)
else:
self.projection = Linear(self.combination_size, 1, True)
self.target_loss = MSELoss()
self.r2 = R2Score(num_outputs=1)
xavier_uniform_(self.projection.weight)
zeros_(self.projection.bias)
def predict(self, feat_index, feat_value):
"""
进行前向传递:计算每一步的时间步组合 Tensor,将其输入到 Transformer 中,并进行投影。
:param feat_index: LongTensor (batch, seq, field)
:param feat_value: Tensor (batch, seq, field)
:return: 分类概率值 (batch, class_size) 或回归预测值 (batch,)
"""
reshaped_feat_value = feat_value.reshape(-1, self.seq_size, self.field_size, 1)
embeddings = torch.multiply(self.embeds(feat_index), reshaped_feat_value)
# 与 FMLSTMAttentionModel 中相同的 FM 一阶和二阶项
outs = [
self.fm_first_dropout(
self.fm_first_norm(
torch.sum(
torch.multiply(
self.bias(
feat_index
),
reshaped_feat_value
),
3
)
)
),
self.fm_second_dropout(
self.fm_second_norm(
0.5 * torch.subtract(
torch.square(
torch.sum(
embeddings,
2
)
),
torch.sum(
torch.square(
embeddings
),
2
)
)
)
)
]
# 可选的深层堆栈
if self.use_deep:
i = 0
deep_out = embeddings.reshape(-1, self.seq_size, self.deep_input_size)
while i < self.deep_layer_num:
deep_out = self.deep_layer_dropouts[i](
torch.relu(self.deep_layer_norms[i](self.deep_layers[i](deep_out))))
i += 1
outs.append(deep_out)
# 将组合 Tensor 的序列输入到 Transformer 中,并对输出进行投影处理
result = self.projection(self.attention_dropout(self.transformer(
self.combination_dropout(self.combination_norm(self.combination(torch.concat(outs, 2)))))).clone())
if self.problem_type:
return result.reshape(-1, self.class_size)
else:
return torch.sigmoid(result).reshape(-1)
def loss(self, pred, label):
"""
计算所配置的目标损失值。
:param pred: 模型输出(分类任务的预测值、回归任务的标量预测值)
:param label: 真实标签
:return: 标量损失 Tensor
"""
return self.target_loss(pred, label)
def forward(self, feat_index, feat_value, label=None):
"""
前向调用:若提供了标签,则返回损失值,否则返回预测结果。
:param feat_index: 一个形状为 (batch_size, seq_size, field_size) 的 LongTensor,其中包含对嵌入表中元素的索引
:param feat_value: 具有相同形状的 Tensor,其中包含特征值 (floats)。通常为 0/1 值或特征值的缩放形式
:param label: 真实标签
:return: 如果提供了 label,则返回损失值;否则返回预测结果。
"""
pred = self.predict(feat_index, feat_value)
if label is not None:
return self.loss(pred, label)
return pred