Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 43 additions & 9 deletions apps/indexer/lib/indexer/fetcher/empty_blocks_sanitizer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ defmodule Indexer.Fetcher.EmptyBlocksSanitizer do
{non_empty_blocks, empty_blocks} = classify_blocks_from_result(result)
process_non_empty_blocks(non_empty_blocks)
process_empty_blocks(empty_blocks)
process_missing_blocks(unprocessed_empty_blocks_list, non_empty_blocks, empty_blocks)

Logger.info("Batch of empty blocks is sanitized",
fetcher: :empty_blocks_to_refetch
Expand All @@ -114,18 +115,51 @@ defmodule Indexer.Fetcher.EmptyBlocksSanitizer do
end

defp classify_blocks_from_result(result) do
result
|> Enum.reduce({[], []}, fn %{id: _id, result: block}, {non_empty_blocks, empty_blocks} ->
transactions = Map.get(block, "transactions") || []

if Enum.empty?(transactions) do
{non_empty_blocks, [block_fields(block, transactions) | empty_blocks]}
else
{[block_fields(block, transactions) | non_empty_blocks], empty_blocks}
end
# A spec-compliant JSON-RPC server returns `result: null` for blocks it
# cannot find (e.g. pruned or reorged). Skip those without crashing — the
# caller reconciles which requested blocks are missing and flags them for
# refetch.
Enum.reduce(result, {[], []}, fn
%{id: _id, result: nil}, acc ->
acc

%{id: _id, result: block}, {non_empty_blocks, empty_blocks} ->
transactions = Map.get(block, "transactions") || []

if Enum.empty?(transactions) do
{non_empty_blocks, [block_fields(block, transactions) | empty_blocks]}
else
{[block_fields(block, transactions) | non_empty_blocks], empty_blocks}
end
end)
end

# Blocks the RPC returned nil for stay in `is_empty: nil, refetch_needed: false`,
# so without intervention the sanitizer's query would re-select them every cycle.
# Flag them `refetch_needed: true` to remove them from the query set and let the
# regular refetch path handle them.
defp process_missing_blocks(requested, non_empty_blocks, empty_blocks) do
returned = MapSet.new(non_empty_blocks ++ empty_blocks, & &1.number)

missing =
requested
|> Enum.map(& &1.number)
|> Enum.reject(&MapSet.member?(returned, &1))

case missing do
[] ->
:ok

numbers ->
Logger.warning(
"JSON-RPC returned nil for block numbers #{inspect(numbers)}; marking as refetch_needed",
fetcher: :empty_blocks_to_refetch
)

Block.set_refetch_needed(numbers)
end
end

defp block_fields(block, transactions) do
%{
number: quantity_to_integer(block["number"]),
Expand Down
47 changes: 47 additions & 0 deletions apps/indexer/test/indexer/fetcher/empty_blocks_sanitizer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,53 @@ defmodule Indexer.Fetcher.EmptyBlocksSanitizerTest do
assert processed_block.refetch_needed == true, "invalid `refetch_needed` value set for processed block"
end

test "marks block as refetch_needed when JSON-RPC returns nil result",
%{json_rpc_named_arguments: json_rpc_named_arguments} do
# Setup
block_to_process = insert(:block, is_empty: nil)
populate_database_with_dummy_blocks()
assert Repo.get!(Block, block_to_process.hash).is_empty == nil, "precondition to check setup correctness"
assert Repo.get!(Block, block_to_process.hash).refetch_needed == false, "precondition to check setup correctness"

encoded_expected_block_number = "0x" <> Integer.to_string(block_to_process.number, 16)

if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do
EthereumJSONRPC.Mox
|> stub(
:json_rpc,
fn [
%{
id: id,
method: "eth_getBlockByNumber",
params: [^encoded_expected_block_number, false]
}
],
_options ->
{:ok, [%{id: id, result: nil}]}
end
)
end

EmptyBlocksSanitizer.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)

# Wait for the sanitizer to flag the nil-result block as refetch_needed.
# On the un-fixed code this never happens (the GenServer crashes on
# BadMapError), so `wait_for_results` would time out and fail the test.
processed_block =
wait_for_results(fn ->
Repo.one!(
from(block in Block,
where: block.hash == ^block_to_process.hash and block.refetch_needed == true
)
)
end)

assert processed_block.is_empty == nil, "is_empty should remain untouched for unresolved blocks"

assert processed_block.refetch_needed == true,
"refetch_needed should be set so the block exits the sanitizer's query set"
end

test "only old enough blocks are sanitized", %{json_rpc_named_arguments: json_rpc_named_arguments} do
# Setup
block_to_process = insert(:block, is_empty: nil)
Expand Down
Loading