diff --git a/lbry/wallet/network.py b/lbry/wallet/network.py index 55b8b145b3..3664a457d8 100644 --- a/lbry/wallet/network.py +++ b/lbry/wallet/network.py @@ -309,47 +309,55 @@ async def connect_to_fastest(self) -> Optional[ClientSession]: return async def network_loop(self): - sleep_delay = 30 + def reset_sleep(): + return 10 + random.uniform(0, 5) + sleep_delay = reset_sleep() while self.running: await asyncio.wait( - [asyncio.sleep(30), self._urgent_need_reconnect.wait()], return_when=asyncio.FIRST_COMPLETED + [asyncio.sleep(sleep_delay), self._urgent_need_reconnect.wait()], + return_when=asyncio.FIRST_COMPLETED ) - if self._urgent_need_reconnect.is_set(): - sleep_delay = 30 self._urgent_need_reconnect.clear() if not self.is_connected: client = await self.connect_to_fastest() if not client: - log.warning("failed to connect to any spv servers, retrying later") sleep_delay *= 2 - sleep_delay = min(sleep_delay, 300) + sleep_delay = min(sleep_delay, 120) + log.warning("failed to connect to any spv servers, retrying after %.2fs", sleep_delay) continue - log.debug("get spv server features %s:%i", *client.server) - features = await client.send_request('server.features', []) - self.client, self.server_features = client, features - log.debug("discover other hubs %s:%i", *client.server) - await self._update_hubs(await client.send_request('server.peers.subscribe', [])) - log.info("subscribe to headers %s:%i", *client.server) - self._update_remote_height((await self.subscribe_headers(),)) - self._on_connected_controller.add(True) + sleep_delay = reset_sleep() server_str = "%s:%i" % client.server - log.info("maintaining connection to spv server %s", server_str) - self._keepalive_task = asyncio.create_task(self.client.keepalive_loop()) try: - if not self._urgent_need_reconnect.is_set(): - await asyncio.wait( - [self._keepalive_task, self._urgent_need_reconnect.wait()], - return_when=asyncio.FIRST_COMPLETED - ) - else: - await self._keepalive_task + # Perform initial sequence of RPCs. + log.debug("get spv server features %s:%i", *client.server) + features = await client.send_request('server.features', []) + log.debug("discover other hubs %s:%i", *client.server) + await self._update_hubs(await client.send_request('server.peers.subscribe', [])) + log.info("subscribe to headers %s:%i", *client.server) + self._update_remote_height((await client.send_request('blockchain.headers.subscribe', [True]),)) + + # All initial RPCs were successful. We're now connected. + self.client, self.server_features = client, features + self._urgent_need_reconnect.clear() + sleep_delay = reset_sleep() + # Release any waiters. + self._on_connected_controller.add(True) + + log.info("maintaining connection to spv server %s", server_str) + self._keepalive_task = asyncio.create_task(self.client.keepalive_loop()) + await asyncio.wait( + [self._keepalive_task, self._urgent_need_reconnect.wait()], + return_when=asyncio.FIRST_COMPLETED + ) if self._urgent_need_reconnect.is_set(): log.warning("urgent reconnect needed") + except (asyncio.TimeoutError, ConnectionResetError, ConnectionError, RPCError, ProtocolError): + sleep_delay *= 2 + sleep_delay = min(sleep_delay, 120) + log.warning("failed to connect to spv server %s, retrying after %.2fs", server_str, sleep_delay) + finally: if self._keepalive_task and not self._keepalive_task.done(): self._keepalive_task.cancel() - except asyncio.CancelledError: - pass - finally: self._keepalive_task = None self.client = None self.server_features = None @@ -381,9 +389,10 @@ def rpc(self, list_or_method, args, restricted=True, session: Optional[ClientSes async def retriable_call(self, function, *args, **kwargs): while self.running: if not self.is_connected: - log.warning("Wallet server unavailable, waiting for it to come back and retry.") + log.warning("%s: Wallet server unavailable, waiting for it to come back and retry.", function.__name__) self._urgent_need_reconnect.set() await self.on_connected.first + log.warning("%s: Wallet server available, proceeding.", function.__name__) try: return await function(*args, **kwargs) except asyncio.TimeoutError: @@ -436,9 +445,6 @@ def get_history(self, address): def broadcast(self, raw_transaction): return self.rpc('blockchain.transaction.broadcast', [raw_transaction], True) - def subscribe_headers(self): - return self.rpc('blockchain.headers.subscribe', [True], True) - async def subscribe_address(self, address, *addresses): addresses = list((address, ) + addresses) server_addr_and_port = self.client.server_address_and_port # on disconnect client will be None