Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions sdk/python/feast/infra/key_encoding_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ def _serialize_val(
if 0 <= entity_key_serialization_version <= 1:
return struct.pack("<l", v.int64_val), ValueType.INT64
return struct.pack("<q", v.int64_val), ValueType.INT64
elif value_type == "unix_timestamp_val":
return struct.pack("<q", v.unix_timestamp_val), ValueType.UNIX_TIMESTAMP
elif value_type == "float_val": # 32-bit float
return struct.pack("<f", float(v.float_val)), ValueType.FLOAT
elif value_type == "double_val": # 64-bit float
return struct.pack("<d", float(v.double_val)), ValueType.DOUBLE
else:
raise ValueError(f"Value type not supported for feast feature store: {v}")

Expand All @@ -35,6 +41,15 @@ def _deserialize_value(value_type, value_bytes) -> ValueProto:
return ValueProto(string_val=value)
elif value_type == ValueType.BYTES:
return ValueProto(bytes_val=value_bytes)
elif value_type == ValueType.UNIX_TIMESTAMP:
value = struct.unpack("<q", value_bytes)[0]
return ValueProto(unix_timestamp_val=value)
elif value_type == ValueType.FLOAT:
value = struct.unpack("<f", value_bytes)[0]
return ValueProto(float_val=value)
elif value_type == ValueType.DOUBLE:
value = struct.unpack("<d", value_bytes)[0]
return ValueProto(double_val=value)
else:
raise ValueError(f"Unsupported value type: {value_type}")

Expand Down
136 changes: 92 additions & 44 deletions sdk/python/feast/infra/online_stores/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel
from feast.sorted_feature_view import SortedFeatureView

try:
from redis import Redis
Expand Down Expand Up @@ -292,50 +293,97 @@ def online_write_batch(
keys = []
# redis pipelining optimization: send multiple commands to redis server without waiting for every reply
with client.pipeline(transaction=False) as pipe:
# check if a previous record under the key bin exists
# TODO: investigate if check and set is a better approach rather than pulling all entity ts and then setting
# it may be significantly slower but avoids potential (rare) race conditions
for entity_key, _, _, _ in data:
redis_key_bin = _redis_key(
project,
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)
keys.append(redis_key_bin)
pipe.hmget(redis_key_bin, ts_key)
prev_event_timestamps = pipe.execute()
# flattening the list of lists. `hmget` does the lookup assuming a list of keys in the key bin
prev_event_timestamps = [i[0] for i in prev_event_timestamps]

for redis_key_bin, prev_event_time, (_, values, timestamp, _) in zip(
keys, prev_event_timestamps, data
):
event_time_seconds = int(utils.make_tzaware(timestamp).timestamp())

# ignore if event_timestamp is before the event features that are currently in the feature store
if prev_event_time:
prev_ts = Timestamp()
prev_ts.ParseFromString(prev_event_time)
if prev_ts.seconds and event_time_seconds <= prev_ts.seconds:
# TODO: somehow signal that it's not overwriting the current record?
if progress:
progress(1)
continue

ts = Timestamp()
ts.seconds = event_time_seconds
entity_hset = dict()
entity_hset[ts_key] = ts.SerializeToString()

for feature_name, val in values.items():
f_key = _mmh3(f"{feature_view}:{feature_name}")
entity_hset[f_key] = val.SerializeToString()

pipe.hset(redis_key_bin, mapping=entity_hset)

ttl = online_store_config.key_ttl_seconds
if ttl:
pipe.expire(name=redis_key_bin, time=ttl)
if isinstance(table, SortedFeatureView):
if len(table.sort_keys) == 1:
sort_key_name = table.sort_keys[0].name
for entity_key, values, timestamp, _ in data:
redis_key_bin = _redis_key(
project,
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)
entity_key.join_keys.append(sort_key_name)
entity_key.entity_values.append(values[sort_key_name])
redis_key_bin_with_sort_keys = _redis_key(
project,
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)

event_time_seconds = int(
utils.make_tzaware(timestamp).timestamp()
)
ts = Timestamp()
ts.seconds = event_time_seconds
entity_hset = dict()
entity_hset[ts_key] = ts.SerializeToString()

for feature_name, val in values.items():
f_key = _mmh3(f"{feature_view}:{feature_name}")
entity_hset[f_key] = val.SerializeToString()
if feature_name == sort_key_name:
feast_value_type = val.WhichOneof("val")
if feast_value_type == "unix_timestamp_val":
feature_value = (
val.unix_timestamp_val * 1000
) # Convert to milliseconds
else:
feature_value = getattr(val, str(feast_value_type))
score = feature_value
member = redis_key_bin_with_sort_keys
zset_key = f"{project}:{table.name}:{feature_name}:{redis_key_bin}"
pipe.zadd(zset_key, {member: score})
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do some validation here to check if the score is numeric type or not rather than wait for Redis to throw the error. What do you think?

Copy link
Copy Markdown
Author

@kpulipati29 kpulipati29 Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes agree, it was in my to-do list. I think we should fail the materialization job if the feature view have multiple sort keys or non numeric types


pipe.hset(redis_key_bin_with_sort_keys, mapping=entity_hset)

ttl = online_store_config.key_ttl_seconds
if ttl:
pipe.expire(name=redis_key_bin_with_sort_keys, time=ttl)
else:
# check if a previous record under the key bin exists
# TODO: investigate if check and set is a better approach rather than pulling all entity ts and then setting
# it may be significantly slower but avoids potential (rare) race conditions
for entity_key, _, _, _ in data:
redis_key_bin = _redis_key(
project,
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)
keys.append(redis_key_bin)
pipe.hmget(redis_key_bin, ts_key)
prev_event_timestamps = pipe.execute()
# flattening the list of lists. `hmget` does the lookup assuming a list of keys in the key bin
prev_event_timestamps = [i[0] for i in prev_event_timestamps]

for redis_key_bin, prev_event_time, (_, values, timestamp, _) in zip(
keys, prev_event_timestamps, data
):
event_time_seconds = int(utils.make_tzaware(timestamp).timestamp())

# ignore if event_timestamp is before the event features that are currently in the feature store
if prev_event_time:
prev_ts = Timestamp()
prev_ts.ParseFromString(prev_event_time)
if prev_ts.seconds and event_time_seconds <= prev_ts.seconds:
# TODO: somehow signal that it's not overwriting the current record?
if progress:
progress(1)
continue

ts = Timestamp()
ts.seconds = event_time_seconds
entity_hset = dict()
entity_hset[ts_key] = ts.SerializeToString()

for feature_name, val in values.items():
f_key = _mmh3(f"{feature_view}:{feature_name}")
entity_hset[f_key] = val.SerializeToString()

pipe.hset(redis_key_bin, mapping=entity_hset)

ttl = online_store_config.key_ttl_seconds
if ttl:
pipe.expire(name=redis_key_bin, time=ttl)
results = pipe.execute()
if progress:
progress(len(results))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import Dict

from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.redis import RedisContainer

from tests.integration.feature_repos.universal.online_store_creator import (
OnlineStoreCreator,
)


class RedisOnlineStoreCreator(OnlineStoreCreator):
def __init__(self, project_name: str, **kwargs):
super().__init__(project_name)
with RedisContainer("redis:latest") as redis_container:
self.container = redis_container

def create_online_store(self) -> Dict[str, str]:
self.container.start()
log_string_to_wait_for = "Server initialized"
wait_for_logs(
container=self.container, predicate=log_string_to_wait_for, timeout=120
)
host = self.container.get_container_host_ip()
exposed_port = int(self.container.get_exposed_port(self.container.port))
connection_string = f"{host}:{exposed_port}"
print(f"connection_string: {connection_string}")
return {
"connection_string": connection_string,
}

def teardown(self):
self.container.stop()
Loading
Loading