Skip to content

WIP add retries to create data for consistency#167

Open
DanielJanicek wants to merge 1 commit intomainfrom
djanicek/create-data-threading
Open

WIP add retries to create data for consistency#167
DanielJanicek wants to merge 1 commit intomainfrom
djanicek/create-data-threading

Conversation

@DanielJanicek
Copy link
Copy Markdown
Contributor

This error shows in some CI jobs https://github.com/weaviate/weaviate-e2e-tests/actions/runs/24739340905/job/72374934517#step:15:651

==================================== ERRORS ====================================
_ ERROR at setup of test_delete_read_repair_http_request[novector_timebasedresolution_class] _

self = <weaviate.connect.v4.ConnectionSync object at 0x7fa8bed27150>
request = collection: "Read_repair_testpy_test_delete_read_repair_http_requestnovector_timebasedresolution_class_2"
tenant: "test-tenant-0"
objects_count: true

def grpc_aggregate(
    self, request: aggregate_pb2.AggregateRequest
) -> aggregate_pb2.AggregateReply:
    try:
        assert self.grpc_stub is not None
      res = _Retry(4).with_exponential_backoff(
            0,
            f"Searching in collection {request.collection}",
            self.grpc_stub.Aggregate,
            request,
            metadata=self.grpc_headers(),
            timeout=self.timeout_config.query,
        )

/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/weaviate/connect/v4.py:1083:


/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/weaviate/retry.py:54: in with_exponential_backoff
raise e
/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/weaviate/retry.py:50: in with_exponential_backoff
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/grpc/_channel.py:1159: in call
return _end_unary_response_blocking(state, call, False, None)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


state = <grpc._channel._RPCState object at 0x7fa8a4635e90>
call = <grpc._cython.cygrpc.SegregatedCall object at 0x7fa8a46365c0>
with_call = False, deadline = None

def _end_unary_response_blocking(
    state: _RPCState,
    call: cygrpc.SegregatedCall,
    with_call: bool,
    deadline: Optional[float],
) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]:
    if state.code is grpc.StatusCode.OK:
        if with_call:
            rendezvous = _MultiThreadedRendezvous(state, call, None, deadline)
            return state.response, rendezvous
        return state.response
  raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

E grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
E status = StatusCode.UNKNOWN
E details = "aggregate: shard not found"
E debug_error_string = "UNKNOWN:Error received from peer {grpc_status:2, grpc_message:"aggregate: shard not found"}"
E >

/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/grpc/_channel.py:990: _InactiveRpcError

During handling of the above exception, another exception occurred:

request = <SubRequest 'collection_fixture' for <Function test_delete_read_repair_http_request[novector_timebasedresolution_class]>>

@pytest.fixture
def collection_fixture(request):
  return request.getfixturevalue(request.param)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

tests/recovery/python_recovery/read_repair_test.py:245:


/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/_pytest/fixtures.py:539: in getfixturevalue
fixturedef = self._get_active_fixturedef(argname)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/_pytest/fixtures.py:627: in _get_active_fixturedef
fixturedef.execute(request=subrequest)
/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/_pytest/fixtures.py:1110: in execute
result: FixtureValue = ihook.pytest_fixture_setup(
/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/pluggy/_hooks.py:512: in call
return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/pluggy/_manager.py:120: in _hookexec
return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/pluggy/_callers.py:53: in run_old_style_hookwrapper
return result.get_result()
^^^^^^^^^^^^^^^^^^^
/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/pluggy/_callers.py:38: in run_old_style_hookwrapper
res = yield
^^^^^
/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/_pytest/setuponly.py:36: in pytest_fixture_setup
return (yield)
^^^^^
/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/_pytest/fixtures.py:1202: in pytest_fixture_setup
result = call_fixture_func(fixturefunc, request, kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/_pytest/fixtures.py:908: in call_fixture_func
fixture_result = next(generator)
^^^^^^^^^^^^^^^
tests/recovery/python_recovery/read_repair_test.py:148: in novector_timebasedresolution_class
collection = _create_base_collection(
tests/recovery/python_recovery/read_repair_test.py:98: in _create_base_collection
DataManager(client).create_data(
/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/weaviate_cli/managers/data_manager.py:978: in create_data
inserted, collection = _ingest_one_tenant(tenant)
^^^^^^^^^^^^^^^^^^^^^^^^^^
/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/weaviate_cli/managers/data_manager.py:923: in _ingest_one_tenant
_initial = len(col.with_tenant(tenant))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/weaviate/collections/collection/sync.py:148: in len
total = self.aggregate.over_all(total_count=True).total_count
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/weaviate/collections/aggregations/over_all/executor.py:115: in over_all
return executor.execute(
/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/weaviate/connect/executor.py:99: in execute
return cast(T, exception_callback(e))
^^^^^^^^^^^^^^^^^^^^^
/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/weaviate/connect/executor.py:38: in raise_exception
raise e
/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/weaviate/connect/executor.py:80: in execute
call = method(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^


self = <weaviate.connect.v4.ConnectionSync object at 0x7fa8bed27150>
request = collection: "Read_repair_testpy_test_delete_read_repair_http_requestnovector_timebasedresolution_class_2"
tenant: "test-tenant-0"
objects_count: true

def grpc_aggregate(
    self, request: aggregate_pb2.AggregateRequest
) -> aggregate_pb2.AggregateReply:
    try:
        assert self.grpc_stub is not None
        res = _Retry(4).with_exponential_backoff(
            0,
            f"Searching in collection {request.collection}",
            self.grpc_stub.Aggregate,
            request,
            metadata=self.grpc_headers(),
            timeout=self.timeout_config.query,
        )
        return cast(aggregate_pb2.AggregateReply, res)
    except RpcError as e:
        error = cast(Call, e)
        if error.code() == StatusCode.PERMISSION_DENIED:
            raise InsufficientPermissionsError(error)
      raise WeaviateQueryError(str(e), "GRPC search")  # pyright: ignore
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

E weaviate.exceptions.WeaviateQueryError: Query call with protocol GRPC search failed with message <_InactiveRpcError of RPC that terminated with:
E status = StatusCode.UNKNOWN
E details = "aggregate: shard not found"
E debug_error_string = "UNKNOWN:Error received from peer {grpc_status:2, grpc_message:"aggregate: shard not found"}"
E >.

/opt/hostedtoolcache/Python/3.11.15/x64/lib/python3.11/site-packages/weaviate/connect/v4.py:1096: WeaviateQueryError

I believe there is a race in create_data when auto tenant creation is on, as the producer-consumer task queue can have either a stale tenant list or a tenant list with tenants that have not propogated yet.

Copy link
Copy Markdown

@orca-security-eu orca-security-eu Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Orca Security Scan Summary

Status Check Issues by priority
Passed Passed Infrastructure as Code high 0   medium 0   low 0   info 0 View in Orca
Passed Passed SAST high 0   medium 0   low 0   info 0 View in Orca
Passed Passed Secrets high 0   medium 0   low 0   info 0 View in Orca
Passed Passed Vulnerabilities high 0   medium 0   low 0   info 0 View in Orca

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant