forked from estuary/flow
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathflow.yaml
More file actions
158 lines (140 loc) · 5.26 KB
/
flow.yaml
File metadata and controls
158 lines (140 loc) · 5.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import:
- tests.flow.yaml
collections:
# Segmentation events, each representing an add or remove of a user to a segment.
examples/segment/events:
schema: event.schema.yaml
key: [/event]
projections:
vendor:
location: /segment/vendor
partition: true
# Project for natural querying (without needing to quote 'segment/name').
segment_name: /segment/name
user_id: /user
# User profiles, mapping from user => {segments: [segment => status]}.
# This derivation is designed for materialization into a persistent key/value
# store, which provides backing storage of reduced segment sets for each user.
# As such, this derivation is stateless and produces partially combined roll-ups
# for each user, which are then fully reduced upon materialization into a store.
examples/segment/profiles:
schema: derived.schema.yaml#/$defs/profile
key: [/user]
projections:
user_id: /user
derivation:
transform:
fromSegmentation:
# Map segmentation to implied user profile.
source:
name: examples/segment/events
publish:
lambda: typescript
# Membership is a mapping of (segment, user) => {segmentation status}.
# The derivation is suited for materialization into sorted key/value stores
# like ScyllaDB and BigTable, having efficient range-read operations which
# can be used to walk the full membership of a segment (even if very large).
# As with the "pull" user profile, this derivation is stateless and produces
# partial roll-ups of the current segmentation status for each composite key,
# which is fully reduced only upon materialization into a store.
examples/segment/memberships:
schema: derived.schema.yaml#/$defs/membership
key: [/segment/vendor, /segment/name, /user]
projections:
vendor:
location: /segment/vendor
partition: true
segment_name: /segment/name
user_id: /user
derivation:
transform:
fromSegmentation:
# Map segmentation to implied membership segmentation status.
source:
name: examples/segment/events
publish:
lambda: typescript
# Toggles are annotated events which change the status of a user, e.x. from
# "added" to "removed". They do not include events which initially add a user
# to a segment, but do include subsequent events which remove the user or
# re-add them. Each event is annotated with a /previous event which is the
# last event of the user's former status: /previous is a "remove" if
# the present event is an "add", or vice versa.
examples/segment/toggles:
schema:
$ref: event.schema.yaml
properties:
previous: { $ref: event.schema.yaml }
required: [previous]
key: [/event]
projections:
vendor:
location: /segment/vendor
partition: true
# Additional projections for more natural querying without identifer quotes.
segment_name: /segment/name
previous_event: /previous/event
previous_remove: /previous/remove
user_id: /user
derivation:
# Registers track the last event for each (segment, user), along with
# a bit indicating whether the user has ever been added to the segment.
register:
initial: {}
schema:
type: object
properties:
event: { $ref: event.schema.yaml }
firstAdd: { const: true }
reduce: { strategy: merge }
transform:
fromSegmentation:
source:
name: examples/segment/events
shuffle:
key: [/segment/vendor, /segment/name, /user]
update:
lambda: typescript
publish:
lambda: typescript
# Something to try: the profiles collection can be altered to a "push"
# model by reducing user segments within derivation registers, and then
# publishing each current, fully-reduced segment set. This works well with
# stateless materializations, like Webhooks or pub/sub streams, where the
# full set is required with each POST.
#
#examples/segment/profiles:
# schema:
# $ref: derived.schema.yaml#/$defs/profile
# # Published values are already fully reduced. Just take the last.
# reduce: { strategy: lastWriteWins }
# key: [/user]
#
# derivation:
# register:
# # Source documents are shuffled to a register on /user.
# # Within each register, we accumulate the user's segments.
# schema: derived.schema.yaml#SegmentSet
# initial: []
# transform:
# fromSegmentation:
# # Update maps the segmentation to its implied segment set, which
# # is reduced into the register. Then publish the reduced register
# # mapped into a profile.
# source:
# name: examples/segment/events
# shuffle:
# key: [/user]
# update:
# lambda: typescript
# publish:
# lambda: typescript
materializations:
examples/segment/views:
endpoint:
sqlite: { path: ../examples.db?_journal=WAL }
bindings:
- resource: { table: segment_memberships }
source: examples/segment/memberships
- resource: { table: segment_profiles }
source: examples/segment/profiles