From e95f0f72f6a0a0bd3df949086a1e7e857c89fc0b Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Wed, 25 Feb 2026 11:57:42 +1030 Subject: [PATCH 1/9] plugins/sql: remove unneeded TODO entries. 1. Refresh time would say "dont refresh if it's only this old" but better is to simply make refresh more efficient, which we've done and continue to do. 2. time_msec could be used in two places, but floating point suffices. Signed-off-by: Rusty Russell --- plugins/sql.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/plugins/sql.c b/plugins/sql.c index 24c3b28c5a40..7712b8be06d2 100644 --- a/plugins/sql.c +++ b/plugins/sql.c @@ -24,8 +24,6 @@ static const char schemas[] = ; /* TODO: - * 2. Refresh time in API. - * 8. time_msec fields. * 10. General pagination API (not just chainmoves and channelmoves) * 11. Normalize account_id fields into another table, as they are highly duplicate, and use views to maintain the current API. */ From 1c8d1b56eba2320cc390930bed7fe9b4668352e4 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 26 Feb 2026 10:48:10 +1030 Subject: [PATCH 2/9] pytest: add sql change test. This is trivial now, as the invoice table gets reloaded every time, but is an important check as we improve the implementation. Signed-off-by: Rusty Russell --- tests/test_plugin.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/tests/test_plugin.py b/tests/test_plugin.py index 9644d4161b62..ccb756561711 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -4463,7 +4463,7 @@ def test_plugin_startdir_lol(node_factory): def test_autoclean_batch(node_factory): - l1 = node_factory.get_node(1) + l1 = node_factory.get_node() # Many expired invoices for i in range(100): @@ -4527,6 +4527,30 @@ def test_sql_parallel(node_factory, executor): f.result(TIMEOUT) +def test_sql_during_change(node_factory): + l1 = node_factory.get_node() + + labels = [f"test_sql_during_delete{i:02}" for i in range(10)] + for l in labels: + l1.rpc.invoice(100, l, l) + assert l1.rpc.sql("SELECT amount_msat, description, status FROM invoices ORDER BY description") == {'rows': [[100, l, 'unpaid'] for l in labels]} + + # Should notice extra one (note shorter expiry) + l = f"test_sql_during_delete{11}" + l1.rpc.invoice(100, l, l, expiry=10) + labels.append(l) + assert l1.rpc.sql("SELECT amount_msat, description, status FROM invoices ORDER BY description") == {'rows': [[100, l, 'unpaid'] for l in labels]} + + # Should notice delete. + l1.rpc.delinvoice(labels[0], 'unpaid') + del labels[0] + assert l1.rpc.sql("SELECT amount_msat, description, status FROM invoices ORDER BY description") == {'rows': [[100, l, 'unpaid'] for l in labels]} + + # Should notice change once invoice has expired. + wait_for(lambda: only_one(l1.rpc.listinvoices(label=labels[-1])['invoices'])['status'] != 'unpaid', timeout=10 + TIMEOUT) + assert l1.rpc.sql("SELECT amount_msat, description, status FROM invoices ORDER BY description") == {'rows': [[100, l, 'unpaid'] for l in labels[:-1]] + [[100, labels[-1], 'expired']]} + + def test_listchannels_broken_message(node_factory): """This gave a bogus BROKEN message with deprecated-apis enabled""" l1 = node_factory.get_node(options={'allow-deprecated-apis': True}) From 59039e0f4cd266053ac076cf9438011b51b4bd28 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 26 Feb 2026 10:48:15 +1030 Subject: [PATCH 3/9] sql: rename indices_created var to populated. It's a "have we initialized the table" flag, and we're going to use it for more than setting up the indices, so rename it. Signed-off-by: Rusty Russell --- plugins/sql.c | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/plugins/sql.c b/plugins/sql.c index 7712b8be06d2..065f3315b0a8 100644 --- a/plugins/sql.c +++ b/plugins/sql.c @@ -120,8 +120,8 @@ struct table_desc { bool is_subobject; /* Do we use created_index as primary key? Otherwise we create rowid. */ bool has_created_index; - /* Have we created our sql indexes yet? */ - bool indices_created; + /* Have we ever been used? */ + bool populated; /* function to refresh it. */ struct command_result *(*refresh)(struct command *cmd, const struct table_desc *td, @@ -578,9 +578,11 @@ static struct command_result *one_refresh_done(struct command *cmd, (u64)refresh_duration.ts.tv_nsec, td->last_created_index); - if (!td->indices_created) { + if (!td->populated) { + /* Now we've done initial population, install indices: + * much faster than creating them before! */ init_indices(cmd->plugin, td); - td->indices_created = 1; + td->populated = true; refresh_duration = timemono_since(td->refresh_start); plugin_log(cmd->plugin, LOG_DBG, "Time to refresh + create indices for %s: %"PRIu64".%09"PRIu64" seconds", @@ -1655,7 +1657,7 @@ static struct table_desc *new_table_desc(const tal_t *ctx, td->has_created_index = false; td->needs_refresh = true; td->refreshing = false; - td->indices_created = false; + td->populated = false; list_head_init(&td->refresh_waiters); /* Only top-levels have refresh functions */ From 774c3d1cdeec41e8ab324564147546441e66b75f Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 26 Feb 2026 10:48:15 +1030 Subject: [PATCH 4/9] plugins/sql: use td->last_created_index directly, don't stash it in dbq. We can actually just make struct table_desc non-const where required, and hand its var directly. Signed-off-by: Rusty Russell --- plugins/sql.c | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/plugins/sql.c b/plugins/sql.c index 065f3315b0a8..bc0ac9ddac99 100644 --- a/plugins/sql.c +++ b/plugins/sql.c @@ -91,8 +91,6 @@ struct db_query { struct table_desc **tables; const char *authfail; bool has_wildcard; - /* Update *last_created_index */ - u64 *last_created_index; }; /* Waiting for another command to refresh table */ @@ -901,7 +899,7 @@ static struct command_result *default_list_done(struct command *cmd, struct db_query *dbq) { struct sql *sql = sql_of(cmd->plugin); - const struct table_desc *td = dbq->tables[0]; + struct table_desc *td = dbq->tables[0]; struct command_result *ret; int err; char *errmsg; @@ -914,7 +912,7 @@ static struct command_result *default_list_done(struct command *cmd, td->name, errmsg); } - ret = process_json_result(cmd, buf, result, td, dbq->last_created_index, NULL); + ret = process_json_result(cmd, buf, result, td, &td->last_created_index, NULL); if (ret) return ret; @@ -994,10 +992,10 @@ static struct command_result *listchannels_one_done(struct command *cmd, const jsmntok_t *result, struct db_query *dbq) { - const struct table_desc *td = dbq->tables[0]; + struct table_desc *td = dbq->tables[0]; struct command_result *ret; - ret = process_json_result(cmd, buf, result, td, dbq->last_created_index, NULL); + ret = process_json_result(cmd, buf, result, td, &td->last_created_index, NULL); if (ret) return ret; @@ -1095,10 +1093,10 @@ static struct command_result *listnodes_one_done(struct command *cmd, const jsmntok_t *result, struct db_query *dbq) { - const struct table_desc *td = dbq->tables[0]; + struct table_desc *td = dbq->tables[0]; struct command_result *ret; - ret = process_json_result(cmd, buf, result, td, dbq->last_created_index, NULL); + ret = process_json_result(cmd, buf, result, td, &td->last_created_index, NULL); if (ret) return ret; @@ -1225,8 +1223,6 @@ static struct command_result *refresh_tables(struct command *cmd, if (tal_count(dbq->tables) == 0) return refresh_complete(cmd, dbq); - /* td is const, but last_created_index needs updating, so we hand - * pointer in dbq. */ td = dbq->tables[0]; /* If it's currently being refreshed, wait */ @@ -1241,7 +1237,6 @@ static struct command_result *refresh_tables(struct command *cmd, if (!td->needs_refresh) return next_refresh(cmd, dbq); - dbq->last_created_index = &dbq->tables[0]->last_created_index; td->refreshing = true; td->refresh_start = time_mono(); return td->refresh(cmd, dbq->tables[0], dbq); @@ -1567,7 +1562,7 @@ static struct command_result *limited_list_done(struct command *cmd, struct command_result *ret; size_t num_entries; - ret = process_json_result(cmd, buf, result, td, dbq->last_created_index, + ret = process_json_result(cmd, buf, result, td, &td->last_created_index, &num_entries); if (ret) return ret; @@ -1586,7 +1581,7 @@ static struct command_result *refresh_by_created_index(struct command *cmd, limited_list_done, forward_error, dbq); json_add_string(req->js, "index", "created"); - json_add_u64(req->js, "start", *dbq->last_created_index + 1); + json_add_u64(req->js, "start", td->last_created_index + 1); json_add_u64(req->js, "limit", LIMIT_PER_LIST); return send_outreq(req); } From 2a0b9c6ca42a9a94da37216576720f27e2971063 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 26 Feb 2026 10:48:15 +1030 Subject: [PATCH 5/9] sql: keep persistent watches on tables. We now always watch for deleted/created/updated, and keep multiple flags and indices (though we don't use them yet!). Signed-off-by: Rusty Russell --- plugins/sql.c | 178 ++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 144 insertions(+), 34 deletions(-) diff --git a/plugins/sql.c b/plugins/sql.c index bc0ac9ddac99..1258684c6dbe 100644 --- a/plugins/sql.c +++ b/plugins/sql.c @@ -2,6 +2,7 @@ #include "config.h" #include #include +#include #include #include #include @@ -100,6 +101,20 @@ struct refresh_waiter { struct db_query *dbq; }; +enum refresh_needs { + /* Naive tables always need refresh */ + REFRESH_ALWAYS = 0x8, + + /* Up-to-date! */ + REFRESH_UNNECESSARY = 0, + /* We were notified of new created entries */ + REFRESH_CREATED = 0x1, + /* We were notified of new updated entries */ + REFRESH_UPDATED = 0x2, + /* We were notified of new deleted entries */ + REFRESH_DELETED = 0x4, +}; + struct table_desc { /* e.g. listpeers. For sub-tables, the raw name without * parent prepended */ @@ -122,12 +137,13 @@ struct table_desc { bool populated; /* function to refresh it. */ struct command_result *(*refresh)(struct command *cmd, - const struct table_desc *td, + struct table_desc *td, struct db_query *dbq); /* some refresh functions maintain changed and created indexes */ u64 last_created_index; + u64 last_updated_index; /* Do we need a refresh? */ - bool needs_refresh; + enum refresh_needs refresh_needs; /* Are we refreshing now? */ bool refreshing; /* When did we start refreshing? */ @@ -144,6 +160,9 @@ struct sql { int gosstore_fd ; size_t gosstore_nodes_off, gosstore_channels_off; u64 next_rowid; + + /* This is an aux_command for all our watches */ + struct command *waitcmd; }; static struct sql *sql_of(struct plugin *plugin) @@ -543,16 +562,7 @@ static struct command_result *next_refresh(struct command *cmd, return refresh_tables(cmd, dbq); } -static struct command_result *wait_done(struct command *cmd, - const char *method, - const char *buf, - const jsmntok_t *result, - struct table_desc *td) -{ - td->needs_refresh = true; - return aux_command_done(cmd); -} - +/* Recursion */ static struct command_result *one_refresh_done(struct command *cmd, struct db_query *dbq, bool was_limited) @@ -589,18 +599,6 @@ static struct command_result *one_refresh_done(struct command *cmd, (u64)refresh_duration.ts.tv_nsec); } - /* We put in a wait command to we get told when we need to refresh */ - if (td->waitname) { - struct out_req *req; - td->needs_refresh = false; - req = jsonrpc_request_start(aux_command(cmd), "wait", wait_done, - plugin_broken_cb, td); - json_add_string(req->js, "subsystem", td->waitname); - json_add_string(req->js, "indexname", "created"); - json_add_u64(req->js, "nextvalue", td->last_created_index+1); - send_outreq(req); - } - /* Transfer refresh waiters onto local list */ list_head_init(&waiters); list_append_list(&waiters, &td->refresh_waiters); @@ -920,7 +918,7 @@ static struct command_result *default_list_done(struct command *cmd, } static struct command_result *default_refresh(struct command *cmd, - const struct table_desc *td, + struct table_desc *td, struct db_query *dbq) { struct out_req *req; @@ -983,7 +981,7 @@ static void delete_channel_from_db(struct command *cmd, } static struct command_result *channels_refresh(struct command *cmd, - const struct table_desc *td, + struct table_desc *td, struct db_query *dbq); static struct command_result *listchannels_one_done(struct command *cmd, @@ -1004,8 +1002,8 @@ static struct command_result *listchannels_one_done(struct command *cmd, } static struct command_result *channels_refresh(struct command *cmd, - const struct table_desc *td, - struct db_query *dbq) + struct table_desc *td, + struct db_query *dbq) { struct sql *sql = sql_of(cmd->plugin); struct out_req *req; @@ -1084,7 +1082,7 @@ static struct command_result *channels_refresh(struct command *cmd, } static struct command_result *nodes_refresh(struct command *cmd, - const struct table_desc *td, + struct table_desc *td, struct db_query *dbq); static struct command_result *listnodes_one_done(struct command *cmd, @@ -1152,7 +1150,7 @@ static bool extract_node_id(int gosstore_fd, size_t off, u16 type, } static struct command_result *nodes_refresh(struct command *cmd, - const struct table_desc *td, + struct table_desc *td, struct db_query *dbq) { struct sql *sql = sql_of(cmd->plugin); @@ -1215,6 +1213,102 @@ static struct command_result *nodes_refresh(struct command *cmd, return one_refresh_done(cmd, dbq, false); } +/* Mutual recursion */ +static void watch_for(struct sql *sql, + struct table_desc *td, + const char *indexname, + u64 next_index); + +static struct command_result *wait_done(struct command *auxcmd, + const char *method, + const char *buf, + const jsmntok_t *result, + struct table_desc *td) +{ + const jsmntok_t *valtok; + const char *indexname; + u64 val; + + if ((valtok = json_get_member(buf, result, "created")) != NULL) { + indexname = "created"; + td->refresh_needs |= REFRESH_CREATED; + } else if ((valtok = json_get_member(buf, result, "updated")) != NULL) { + indexname = "updated"; + td->refresh_needs |= REFRESH_UPDATED; + } else if ((valtok = json_get_member(buf, result, "deleted")) != NULL) { + indexname = "deleted"; + td->refresh_needs |= REFRESH_DELETED; + } else { + plugin_err(auxcmd->plugin, + "Invalid wait_done for %s: '%.*s'", + td->name, + json_tok_full_len(result), + json_tok_full(buf, result)); + } + + if (!json_to_u64(buf, valtok, &val)) { + plugin_err(auxcmd->plugin, + "Invalid wait_done index for %s: '%.*s'", + td->name, + json_tok_full_len(result), + json_tok_full(buf, result)); + } + + /* Keep watching for next one */ + watch_for(sql_of(auxcmd->plugin), td, indexname, val + 1); + return command_still_pending(auxcmd); +} + +static void watch_for(struct sql *sql, + struct table_desc *td, + const char *indexname, + u64 next_index) +{ + struct out_req *req; + + req = jsonrpc_request_start(sql->waitcmd, "wait", wait_done, + plugin_broken_cb, td); + json_add_string(req->js, "subsystem", td->waitname); + json_add_string(req->js, "indexname", indexname); + json_add_u64(req->js, "nextvalue", next_index); + send_outreq(req); +} + +/* First time we initialize counters and figure where we're up to */ +static void watch_init(struct command *cmd, + struct table_desc *td, + const char *indexname, + u64 *max) +{ + struct json_out *params = json_out_new(NULL); + const jsmntok_t *result, *valtok; + const char *buf; + u64 val; + + json_out_start(params, NULL, '{'); + json_out_addstr(params, "subsystem", td->waitname); + json_out_addstr(params, "indexname", indexname); + json_out_add(params, "nextvalue", false, "0"); + json_out_end(params, '}'); + + result = jsonrpc_request_sync(tmpctx, cmd, "wait", take(params), &buf); + + valtok = json_get_member(buf, result, indexname); + if (!valtok || !json_to_u64(buf, valtok, &val)) { + plugin_err(cmd->plugin, + "Invalid wait reply for %s %s: '%.*s'", + td->name, indexname, + json_tok_full_len(result), + json_tok_full(buf, result)); + } + + if (max != NULL) + *max = val; + + /* Place watch for when it increases */ + watch_for(sql_of(cmd->plugin), td, indexname, val + 1); +} + static struct command_result *refresh_tables(struct command *cmd, struct db_query *dbq) { @@ -1234,11 +1328,20 @@ static struct command_result *refresh_tables(struct command *cmd, return command_still_pending(cmd); } - if (!td->needs_refresh) + if (td->refresh_needs == REFRESH_UNNECESSARY) return next_refresh(cmd, dbq); td->refreshing = true; td->refresh_start = time_mono(); + + /* The first time, we may need to install watches */ + if (!td->populated && td->waitname) { + /* We will initialize td->last_created_index as we read them in */ + watch_init(cmd, td, "created", NULL); + watch_init(cmd, td, "updated", &td->last_updated_index); + watch_init(cmd, td, "deleted", NULL); + } + return td->refresh(cmd, dbq->tables[0], dbq); } @@ -1573,10 +1676,15 @@ static struct command_result *limited_list_done(struct command *cmd, /* The simplest case: append-only lists */ static struct command_result *refresh_by_created_index(struct command *cmd, - const struct table_desc *td, + struct table_desc *td, struct db_query *dbq) { struct out_req *req; + + /* Since we're relying on watches, mark refreshing unnecessary to start */ + assert(td->refresh_needs != REFRESH_UNNECESSARY); + td->refresh_needs = REFRESH_UNNECESSARY; + req = jsonrpc_request_start(cmd, td->cmdname, limited_list_done, forward_error, dbq); @@ -1589,7 +1697,7 @@ static struct command_result *refresh_by_created_index(struct command *cmd, struct refresh_funcs { const char *cmdname; struct command_result *(*refresh)(struct command *cmd, - const struct table_desc *td, + struct table_desc *td, struct db_query *dbq); const char *waitname; }; @@ -1649,8 +1757,9 @@ static struct table_desc *new_table_desc(const tal_t *ctx, td->arrname = json_strdup(td, schemas, arrname); td->columns = tal_arr(td, struct column *, 0); td->last_created_index = 0; + td->last_updated_index = 0; td->has_created_index = false; - td->needs_refresh = true; + td->refresh_needs = REFRESH_ALWAYS; td->refreshing = false; td->populated = false; list_head_init(&td->refresh_waiters); @@ -1850,6 +1959,7 @@ static const char *init(struct command *init_cmd, struct sql *sql = sql_of(plugin); sql->db = sqlite_setup(plugin); init_tablemap(plugin, &sql->tablemap); + sql->waitcmd = aux_command(init_cmd); plugin_set_memleak_handler(plugin, memleak_mark_tablemap); return NULL; From 3985391640ad31deb8ef03b3a681f1950e7dbbc2 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 26 Feb 2026 10:48:15 +1030 Subject: [PATCH 6/9] plugins/sql: keep track of last `updated_index` value in tables. We're going to need this once we start using `wait updated` to track these. Signed-off-by: Rusty Russell --- plugins/sql.c | 40 ++++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/plugins/sql.c b/plugins/sql.c index 1258684c6dbe..a279f482d324 100644 --- a/plugins/sql.c +++ b/plugins/sql.c @@ -622,7 +622,8 @@ static struct command_result *process_json_list(struct command *cmd, const jsmntok_t *arr, const u64 *rowid, const struct table_desc *td, - u64 *last_created_index); + u64 *last_created_index, + u64 *last_updated_index); /* Process all subobject columns */ static struct command_result *process_json_subobjs(struct command *cmd, @@ -630,7 +631,8 @@ static struct command_result *process_json_subobjs(struct command *cmd, const jsmntok_t *t, const struct table_desc *td, u64 this_rowid, - u64 *last_created_index) + u64 *last_created_index, + u64 *last_updated_index) { for (size_t i = 0; i < tal_count(td->columns); i++) { const struct column *col = td->columns[i]; @@ -647,10 +649,10 @@ static struct command_result *process_json_subobjs(struct command *cmd, /* If it's an array, use process_json_list */ if (!col->sub->is_subobject) { ret = process_json_list(cmd, buf, coltok, &this_rowid, - col->sub, last_created_index); + col->sub, last_created_index, last_updated_index); } else { ret = process_json_subobjs(cmd, buf, coltok, col->sub, - this_rowid, last_created_index); + this_rowid, last_created_index, last_updated_index); } if (ret) return ret; @@ -668,7 +670,8 @@ static struct command_result *process_json_obj(struct command *cmd, const u64 *parent_rowid, size_t *sqloff, sqlite3_stmt *stmt, - u64 *last_created_index) + u64 *last_created_index, + u64 *last_updated_index) { struct sql *sql = sql_of(cmd->plugin); int err; @@ -696,7 +699,7 @@ static struct command_result *process_json_obj(struct command *cmd, else coltok = json_get_member(buf, t, col->jsonname); ret = process_json_obj(cmd, buf, coltok, col->sub, row, this_rowid, - NULL, sqloff, stmt, last_created_index); + NULL, sqloff, stmt, last_created_index, last_updated_index); if (ret) return ret; continue; @@ -746,6 +749,11 @@ static struct command_result *process_json_obj(struct command *cmd, && val64 > *last_created_index) { *last_created_index = val64; } + /* updated_index -> last_updated_index */ + if (streq(col->dbname, "updated_index") + && val64 > *last_updated_index) { + *last_updated_index = val64; + } break; case FIELD_BOOL: if (!json_to_bool(buf, coltok, &valbool)) { @@ -817,7 +825,7 @@ static struct command_result *process_json_obj(struct command *cmd, sqlite3_errmsg(sql->db)); } - return process_json_subobjs(cmd, buf, t, td, this_rowid, last_created_index); + return process_json_subobjs(cmd, buf, t, td, this_rowid, last_created_index, last_updated_index); } /* A list, such as in the top-level reply, or for a sub-table */ @@ -826,7 +834,8 @@ static struct command_result *process_json_list(struct command *cmd, const jsmntok_t *arr, const u64 *parent_rowid, const struct table_desc *td, - u64 *last_created_index) + u64 *last_created_index, + u64 *last_updated_index) { struct sql *sql = sql_of(cmd->plugin); size_t i; @@ -860,7 +869,7 @@ static struct command_result *process_json_list(struct command *cmd, json_tok_full_len(t), json_tok_full(buf, t)); } - ret = process_json_obj(cmd, buf, t, td, i, this_rowid, parent_rowid, &off, stmt, last_created_index); + ret = process_json_obj(cmd, buf, t, td, i, this_rowid, parent_rowid, &off, stmt, last_created_index, last_updated_index); if (ret) break; sqlite3_reset(stmt); @@ -875,6 +884,7 @@ static struct command_result *process_json_result(struct command *cmd, const jsmntok_t *result, const struct table_desc *td, u64 *last_created_index, + u64 *last_updated_index, size_t *num_entries) { const jsmntok_t *arr; @@ -887,7 +897,7 @@ static struct command_result *process_json_result(struct command *cmd, arr = json_get_member(buf, result, td->arrname); if (num_entries) *num_entries = arr->size; - return process_json_list(cmd, buf, arr, NULL, td, last_created_index); + return process_json_list(cmd, buf, arr, NULL, td, last_created_index, last_updated_index); } static struct command_result *default_list_done(struct command *cmd, @@ -910,7 +920,7 @@ static struct command_result *default_list_done(struct command *cmd, td->name, errmsg); } - ret = process_json_result(cmd, buf, result, td, &td->last_created_index, NULL); + ret = process_json_result(cmd, buf, result, td, &td->last_created_index, &td->last_updated_index, NULL); if (ret) return ret; @@ -993,7 +1003,7 @@ static struct command_result *listchannels_one_done(struct command *cmd, struct table_desc *td = dbq->tables[0]; struct command_result *ret; - ret = process_json_result(cmd, buf, result, td, &td->last_created_index, NULL); + ret = process_json_result(cmd, buf, result, td, &td->last_created_index, &td->last_updated_index, NULL); if (ret) return ret; @@ -1094,7 +1104,7 @@ static struct command_result *listnodes_one_done(struct command *cmd, struct table_desc *td = dbq->tables[0]; struct command_result *ret; - ret = process_json_result(cmd, buf, result, td, &td->last_created_index, NULL); + ret = process_json_result(cmd, buf, result, td, &td->last_created_index, &td->last_updated_index, NULL); if (ret) return ret; @@ -1665,7 +1675,9 @@ static struct command_result *limited_list_done(struct command *cmd, struct command_result *ret; size_t num_entries; - ret = process_json_result(cmd, buf, result, td, &td->last_created_index, + ret = process_json_result(cmd, buf, result, td, + &td->last_created_index, + &td->last_updated_index, &num_entries); if (ret) return ret; From da0223d52d543a3a44231b910a2d1dab24b5a1ef Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 26 Feb 2026 10:48:15 +1030 Subject: [PATCH 7/9] plugins/sql: rename `update_stmt` to `insert_stmt`. The current variable name is confusing, since it's an INSERT! This becomes more noticible in the next patch, where we add a delete statement. Signed-off-by: Rusty Russell --- plugins/sql.c | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/plugins/sql.c b/plugins/sql.c index a279f482d324..5f9a6bc96df2 100644 --- a/plugins/sql.c +++ b/plugins/sql.c @@ -126,7 +126,7 @@ struct table_desc { /* name if we need to wait for changes */ const char *waitname; struct column **columns; - char *update_stmt; + char *insert_stmt; /* If we are a subtable */ struct table_desc *parent; /* Is this a sub object (otherwise, subarray if parent is true) */ @@ -820,7 +820,7 @@ static struct command_result *process_json_obj(struct command *cmd, if (err != SQLITE_DONE) { return command_fail(cmd, LIGHTNINGD, "Error executing %s on row %zu: %s", - td->update_stmt, + td->insert_stmt, row, sqlite3_errmsg(sql->db)); } @@ -841,13 +841,13 @@ static struct command_result *process_json_list(struct command *cmd, size_t i; const jsmntok_t *t; int err; - sqlite3_stmt *stmt; + sqlite3_stmt *insert_stmt; struct command_result *ret = NULL; - err = sqlite3_prepare_v2(sql->db, td->update_stmt, -1, &stmt, NULL); + err = sqlite3_prepare_v2(sql->db, td->insert_stmt, -1, &insert_stmt, NULL); if (err != SQLITE_OK) { return command_fail(cmd, LIGHTNINGD, "preparing '%s' failed: %s", - td->update_stmt, + td->insert_stmt, sqlite3_errmsg(sql->db)); } @@ -859,7 +859,7 @@ static struct command_result *process_json_list(struct command *cmd, if (!td->has_created_index) { this_rowid = sql->next_rowid++; /* First entry is always the rowid */ - sqlite3_bind_int64(stmt, off++, this_rowid); + sqlite3_bind_int64(insert_stmt, off++, this_rowid); } else { if (!json_to_u64(buf, json_get_member(buf, t, "created_index"), @@ -869,12 +869,12 @@ static struct command_result *process_json_list(struct command *cmd, json_tok_full_len(t), json_tok_full(buf, t)); } - ret = process_json_obj(cmd, buf, t, td, i, this_rowid, parent_rowid, &off, stmt, last_created_index, last_updated_index); + ret = process_json_obj(cmd, buf, t, td, i, this_rowid, parent_rowid, &off, insert_stmt, last_created_index, last_updated_index); if (ret) break; - sqlite3_reset(stmt); + sqlite3_reset(insert_stmt); } - sqlite3_finalize(stmt); + sqlite3_finalize(insert_stmt); return ret; } @@ -1518,7 +1518,7 @@ static struct command_result *json_listsqlschemas(struct command *cmd, } /* Adds a sub_object to this sql statement (and sub-sub etc) */ -static void add_sub_object(char **update_stmt, char **create_stmt, +static void add_sub_object(char **insert_stmt, char **create_stmt, const char **sep, struct table_desc *sub) { /* sub-arrays are a completely separate table. */ @@ -1530,11 +1530,11 @@ static void add_sub_object(char **update_stmt, char **create_stmt, const struct column *subcol = sub->columns[j]; if (subcol->sub) { - add_sub_object(update_stmt, create_stmt, sep, + add_sub_object(insert_stmt, create_stmt, sep, subcol->sub); continue; } - tal_append_fmt(update_stmt, "%s?", *sep); + tal_append_fmt(insert_stmt, "%s?", *sep); tal_append_fmt(create_stmt, "%s%s %s", *sep, subcol->dbname, @@ -1569,11 +1569,11 @@ static void finish_td(struct plugin *plugin, struct table_desc *td) goto do_subtables; create_stmt = tal_fmt(tmpctx, "CREATE TABLE %s (", td->name); - td->update_stmt = tal_fmt(td, "INSERT INTO %s VALUES (", td->name); + td->insert_stmt = tal_fmt(td, "INSERT INTO %s VALUES (", td->name); /* If no created_index, create explicit rowid */ if (!td->has_created_index) { tal_append_fmt(&create_stmt, "rowid INTEGER PRIMARY KEY, "); - tal_append_fmt(&td->update_stmt, "?, "); + tal_append_fmt(&td->insert_stmt, "?, "); } /* If we're a child array, we reference the parent column */ @@ -1586,7 +1586,7 @@ static void finish_td(struct plugin *plugin, struct table_desc *td) "row INTEGER REFERENCES %s(%s) ON DELETE CASCADE," " arrindex INTEGER", parent->name, primary_key_name(parent)); - tal_append_fmt(&td->update_stmt, "?,?"); + tal_append_fmt(&td->insert_stmt, "?,?"); sep = ","; } @@ -1594,11 +1594,11 @@ static void finish_td(struct plugin *plugin, struct table_desc *td) const struct column *col = td->columns[i]; if (col->sub) { - add_sub_object(&td->update_stmt, &create_stmt, + add_sub_object(&td->insert_stmt, &create_stmt, &sep, col->sub); continue; } - tal_append_fmt(&td->update_stmt, "%s?", sep); + tal_append_fmt(&td->insert_stmt, "%s?", sep); tal_append_fmt(&create_stmt, "%s%s %s", sep, col->dbname, @@ -1609,7 +1609,7 @@ static void finish_td(struct plugin *plugin, struct table_desc *td) sep = ","; } tal_append_fmt(&create_stmt, ");"); - tal_append_fmt(&td->update_stmt, ");"); + tal_append_fmt(&td->insert_stmt, ");"); err = sqlite3_exec(sql->db, create_stmt, NULL, NULL, &errmsg); if (err != SQLITE_OK) From 4a65e9247d1b2267009246ad77c55c1a314c9d84 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 26 Feb 2026 10:48:16 +1030 Subject: [PATCH 8/9] plugins/sql: add a delete_statement to toplevel tables with created_index values. This will let us (efficiently) delete a single entry, so we can wean tables with created_index off the default "delete all and reload" behavior. Change insert_stmt to a const, too (and use a temporary when we're building it). Signed-off-by: Rusty Russell --- plugins/sql.c | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/plugins/sql.c b/plugins/sql.c index 5f9a6bc96df2..313b16e5ef6d 100644 --- a/plugins/sql.c +++ b/plugins/sql.c @@ -126,7 +126,9 @@ struct table_desc { /* name if we need to wait for changes */ const char *waitname; struct column **columns; - char *insert_stmt; + const char *insert_stmt; + /* If we have create_index, we can delete by it */ + const char *delete_stmt; /* If we are a subtable */ struct table_desc *parent; /* Is this a sub object (otherwise, subarray if parent is true) */ @@ -1558,7 +1560,7 @@ static const char *primary_key_name(const struct table_desc *td) static void finish_td(struct plugin *plugin, struct table_desc *td) { struct sql *sql = sql_of(plugin); - char *create_stmt; + char *create_stmt, *insert_stmt; int err; char *errmsg; const char *sep = ""; @@ -1569,11 +1571,14 @@ static void finish_td(struct plugin *plugin, struct table_desc *td) goto do_subtables; create_stmt = tal_fmt(tmpctx, "CREATE TABLE %s (", td->name); - td->insert_stmt = tal_fmt(td, "INSERT INTO %s VALUES (", td->name); + insert_stmt = tal_fmt(td, "INSERT INTO %s VALUES (", td->name); /* If no created_index, create explicit rowid */ if (!td->has_created_index) { tal_append_fmt(&create_stmt, "rowid INTEGER PRIMARY KEY, "); - tal_append_fmt(&td->insert_stmt, "?, "); + tal_append_fmt(&insert_stmt, "?, "); + td->delete_stmt = NULL; + } else { + td->delete_stmt = tal_fmt(td, "DELETE FROM %s WHERE created_index = ?;", td->name); } /* If we're a child array, we reference the parent column */ @@ -1586,7 +1591,7 @@ static void finish_td(struct plugin *plugin, struct table_desc *td) "row INTEGER REFERENCES %s(%s) ON DELETE CASCADE," " arrindex INTEGER", parent->name, primary_key_name(parent)); - tal_append_fmt(&td->insert_stmt, "?,?"); + tal_append_fmt(&insert_stmt, "?,?"); sep = ","; } @@ -1594,11 +1599,11 @@ static void finish_td(struct plugin *plugin, struct table_desc *td) const struct column *col = td->columns[i]; if (col->sub) { - add_sub_object(&td->insert_stmt, &create_stmt, + add_sub_object(&insert_stmt, &create_stmt, &sep, col->sub); continue; } - tal_append_fmt(&td->insert_stmt, "%s?", sep); + tal_append_fmt(&insert_stmt, "%s?", sep); tal_append_fmt(&create_stmt, "%s%s %s", sep, col->dbname, @@ -1609,7 +1614,8 @@ static void finish_td(struct plugin *plugin, struct table_desc *td) sep = ","; } tal_append_fmt(&create_stmt, ");"); - tal_append_fmt(&td->insert_stmt, ");"); + tal_append_fmt(&insert_stmt, ");"); + td->insert_stmt = insert_stmt; err = sqlite3_exec(sql->db, create_stmt, NULL, NULL, &errmsg); if (err != SQLITE_OK) From 6008f6c4587baed8327036a46acf56bfd91690d1 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 26 Feb 2026 10:48:16 +1030 Subject: [PATCH 9/9] plugins/sql: support updates for tables with `created_index` values. For these, when things change, we simply delete amd recreate the changed entries. Signed-off-by: Rusty Russell Changelog-Changed: Plugins: `sql` plugin tables "htlcs", "forwards", "invoices", "sendpays" and "networkevents" are now updated more efficiently. --- plugins/sql.c | 135 ++++++++++++++++++++++++++++++++++++++----- tests/test_plugin.py | 12 ++++ 2 files changed, 131 insertions(+), 16 deletions(-) diff --git a/plugins/sql.c b/plugins/sql.c index 313b16e5ef6d..cd0a52200d9f 100644 --- a/plugins/sql.c +++ b/plugins/sql.c @@ -624,6 +624,7 @@ static struct command_result *process_json_list(struct command *cmd, const jsmntok_t *arr, const u64 *rowid, const struct table_desc *td, + bool update, u64 *last_created_index, u64 *last_updated_index); @@ -651,7 +652,8 @@ static struct command_result *process_json_subobjs(struct command *cmd, /* If it's an array, use process_json_list */ if (!col->sub->is_subobject) { ret = process_json_list(cmd, buf, coltok, &this_rowid, - col->sub, last_created_index, last_updated_index); + col->sub, false, + last_created_index, last_updated_index); } else { ret = process_json_subobjs(cmd, buf, coltok, col->sub, this_rowid, last_created_index, last_updated_index); @@ -836,6 +838,7 @@ static struct command_result *process_json_list(struct command *cmd, const jsmntok_t *arr, const u64 *parent_rowid, const struct table_desc *td, + bool update, u64 *last_created_index, u64 *last_updated_index) { @@ -843,7 +846,7 @@ static struct command_result *process_json_list(struct command *cmd, size_t i; const jsmntok_t *t; int err; - sqlite3_stmt *insert_stmt; + sqlite3_stmt *insert_stmt, *delete_stmt; struct command_result *ret = NULL; err = sqlite3_prepare_v2(sql->db, td->insert_stmt, -1, &insert_stmt, NULL); @@ -853,6 +856,18 @@ static struct command_result *process_json_list(struct command *cmd, sqlite3_errmsg(sql->db)); } + /* Updating? Delete any previous record */ + if (update) { + err = sqlite3_prepare_v2(sql->db, td->delete_stmt, -1, &delete_stmt, NULL); + if (err != SQLITE_OK) { + return command_fail(cmd, LIGHTNINGD, "preparing '%s' failed: %s", + td->delete_stmt, + sqlite3_errmsg(sql->db)); + } + } else { + delete_stmt = NULL; + } + json_for_each_arr(i, t, arr) { /* sqlite3 columns are 1-based! */ size_t off = 1; @@ -862,6 +877,7 @@ static struct command_result *process_json_list(struct command *cmd, this_rowid = sql->next_rowid++; /* First entry is always the rowid */ sqlite3_bind_int64(insert_stmt, off++, this_rowid); + assert(!delete_stmt); } else { if (!json_to_u64(buf, json_get_member(buf, t, "created_index"), @@ -870,6 +886,23 @@ static struct command_result *process_json_list(struct command *cmd, td->cmdname, json_tok_full_len(t), json_tok_full(buf, t)); + + /* For updates, we simply delete old entry: this + * cascades to subtables. We ignore updates on + * entries we don't have yet, too. */ + if (delete_stmt) { + if (this_rowid > td->last_created_index) + continue; + sqlite3_bind_int64(delete_stmt, 1, this_rowid); + err = sqlite3_step(delete_stmt); + if (err != SQLITE_DONE) { + return command_fail(cmd, LIGHTNINGD, + "Error executing %s on id %"PRIu64": %s", + td->delete_stmt, this_rowid, + sqlite3_errmsg(sql->db)); + } + sqlite3_reset(delete_stmt); + } } ret = process_json_obj(cmd, buf, t, td, i, this_rowid, parent_rowid, &off, insert_stmt, last_created_index, last_updated_index); if (ret) @@ -877,14 +910,23 @@ static struct command_result *process_json_list(struct command *cmd, sqlite3_reset(insert_stmt); } sqlite3_finalize(insert_stmt); + if (delete_stmt) + sqlite3_finalize(delete_stmt); + return ret; } -/* Process top-level JSON result object */ +/* Process top-level JSON result object. + * If update is true, ignore entries > td->last_created_index, and + * delete before insert. + * Put the maximum processed created_index in *last_created_index. + * Put the number of entries in *num_entries, if not NULL; + */ static struct command_result *process_json_result(struct command *cmd, const char *buf, const jsmntok_t *result, const struct table_desc *td, + bool update, u64 *last_created_index, u64 *last_updated_index, size_t *num_entries) @@ -892,14 +934,15 @@ static struct command_result *process_json_result(struct command *cmd, const jsmntok_t *arr; struct timerel so_far = timemono_since(td->refresh_start); plugin_log(cmd->plugin, LOG_DBG, - "Time to call %s: %"PRIu64".%09"PRIu64" seconds", - td->cmdname, + "Time to call %s%s: %"PRIu64".%09"PRIu64" seconds", + td->cmdname, update ? " (updates)" : "", (u64)so_far.ts.tv_sec, (u64)so_far.ts.tv_nsec); arr = json_get_member(buf, result, td->arrname); if (num_entries) *num_entries = arr->size; - return process_json_list(cmd, buf, arr, NULL, td, last_created_index, last_updated_index); + return process_json_list(cmd, buf, arr, NULL, td, update, + last_created_index, last_updated_index); } static struct command_result *default_list_done(struct command *cmd, @@ -922,7 +965,8 @@ static struct command_result *default_list_done(struct command *cmd, td->name, errmsg); } - ret = process_json_result(cmd, buf, result, td, &td->last_created_index, &td->last_updated_index, NULL); + ret = process_json_result(cmd, buf, result, td, false, + &td->last_created_index, &td->last_updated_index, NULL); if (ret) return ret; @@ -1005,7 +1049,8 @@ static struct command_result *listchannels_one_done(struct command *cmd, struct table_desc *td = dbq->tables[0]; struct command_result *ret; - ret = process_json_result(cmd, buf, result, td, &td->last_created_index, &td->last_updated_index, NULL); + ret = process_json_result(cmd, buf, result, td, false, + &td->last_created_index, &td->last_updated_index, NULL); if (ret) return ret; @@ -1106,7 +1151,8 @@ static struct command_result *listnodes_one_done(struct command *cmd, struct table_desc *td = dbq->tables[0]; struct command_result *ret; - ret = process_json_result(cmd, buf, result, td, &td->last_created_index, &td->last_updated_index, NULL); + ret = process_json_result(cmd, buf, result, td, false, + &td->last_created_index, &td->last_updated_index, NULL); if (ret) return ret; @@ -1681,7 +1727,7 @@ static struct command_result *limited_list_done(struct command *cmd, struct command_result *ret; size_t num_entries; - ret = process_json_result(cmd, buf, result, td, + ret = process_json_result(cmd, buf, result, td, false, &td->last_created_index, &td->last_updated_index, &num_entries); @@ -1712,6 +1758,63 @@ static struct command_result *refresh_by_created_index(struct command *cmd, return send_outreq(req); } +static struct command_result *updated_list_done(struct command *cmd, + const char *method, + const char *buf, + const jsmntok_t *result, + struct db_query *dbq) +{ + struct table_desc *td = dbq->tables[0]; + struct command_result *ret; + u64 unused = 0; + + /* We don't care what max created_index it processes. */ + ret = process_json_result(cmd, buf, result, td, true, &unused, &td->last_updated_index, + NULL); + if (ret) + return ret; + + /* Now we can process any new ones */ + if (td->refresh_needs & REFRESH_CREATED) { + plugin_log(cmd->plugin, LOG_DBG, + "%s: records created, inserting from %"PRIu64, td->name, td->last_created_index + 1); + return refresh_by_created_index(cmd, td, dbq); + } + + td->refresh_needs = REFRESH_UNNECESSARY; + return one_refresh_done(cmd, dbq, false); +} + +static struct command_result *paginated_refresh(struct command *cmd, + struct table_desc *td, + struct db_query *dbq) +{ + /* In case something was deleted (rare!) we just reload the + * entire thing */ + if (td->refresh_needs & REFRESH_DELETED) { + plugin_log(cmd->plugin, LOG_DBG, "%s: total reload due to delete", td->name); + td->refresh_needs = REFRESH_UNNECESSARY; + return default_refresh(cmd, td, dbq); + } + + if (td->refresh_needs & REFRESH_UPDATED) { + struct out_req *req; + plugin_log(cmd->plugin, LOG_DBG, + "%s: records updated, updating from %"PRIu64, td->name, td->last_updated_index + 1); + req = jsonrpc_request_start(cmd, td->cmdname, + updated_list_done, forward_error, + dbq); + json_add_string(req->js, "index", "updated"); + json_add_u64(req->js, "start", td->last_updated_index + 1); + return send_outreq(req); + } + + /* No updates, no deletes: the simple case! */ + plugin_log(cmd->plugin, LOG_DBG, + "%s: records created, inserting from %"PRIu64, td->name, td->last_created_index + 1); + return refresh_by_created_index(cmd, td, dbq); +} + struct refresh_funcs { const char *cmdname; struct command_result *(*refresh)(struct command *cmd, @@ -1724,11 +1827,12 @@ static const struct refresh_funcs refresh_funcs[] = { /* These are special, using gossmap */ { "listchannels", channels_refresh, NULL }, { "listnodes", nodes_refresh, NULL }, - /* FIXME: These support wait and full pagination, but we need to watch for deletes, too! */ - { "listhtlcs", default_refresh, NULL }, - { "listforwards", default_refresh, NULL }, - { "listinvoices", default_refresh, NULL }, - { "listsendpays", default_refresh, NULL }, + /* These support wait and full pagination. */ + { "listhtlcs", paginated_refresh, "htlcs" }, + { "listforwards", paginated_refresh, "forwards" }, + { "listinvoices", paginated_refresh, "invoices" }, + { "listsendpays", paginated_refresh, "sendpays" }, + { "listnetworkevents", paginated_refresh, "networkevents" }, /* These are never changed or deleted */ { "listchainmoves", refresh_by_created_index, "chainmoves" }, { "listchannelmoves", refresh_by_created_index, "channelmoves" }, @@ -1740,7 +1844,6 @@ static const struct refresh_funcs refresh_funcs[] = { { "listtransactions", default_refresh, NULL }, { "bkpr-listaccountevents", default_refresh, NULL }, { "bkpr-listincome", default_refresh, NULL }, - { "listnetworkevents", default_refresh, NULL }, }; static const struct refresh_funcs *find_command_refresh(const char *cmdname) diff --git a/tests/test_plugin.py b/tests/test_plugin.py index ccb756561711..5543cd2f27ec 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -4535,20 +4535,32 @@ def test_sql_during_change(node_factory): l1.rpc.invoice(100, l, l) assert l1.rpc.sql("SELECT amount_msat, description, status FROM invoices ORDER BY description") == {'rows': [[100, l, 'unpaid'] for l in labels]} + # It should wait on correct values + l1.daemon.wait_for_logs(['lightningd: waiting on invoices created 11$', + 'lightningd: waiting on invoices updated 1$', + 'lightningd: waiting on invoices deleted 1$']) + # Should notice extra one (note shorter expiry) l = f"test_sql_during_delete{11}" l1.rpc.invoice(100, l, l, expiry=10) labels.append(l) assert l1.rpc.sql("SELECT amount_msat, description, status FROM invoices ORDER BY description") == {'rows': [[100, l, 'unpaid'] for l in labels]} + l1.daemon.wait_for_log('invoices: records created, inserting from 11') + assert not l1.daemon.is_in_log('invoices: records updated') + assert not l1.daemon.is_in_log('invoices: total reload due to delete') + # Should notice delete. l1.rpc.delinvoice(labels[0], 'unpaid') del labels[0] assert l1.rpc.sql("SELECT amount_msat, description, status FROM invoices ORDER BY description") == {'rows': [[100, l, 'unpaid'] for l in labels]} + l1.daemon.wait_for_log('invoices: total reload due to delete') + # Should notice change once invoice has expired. wait_for(lambda: only_one(l1.rpc.listinvoices(label=labels[-1])['invoices'])['status'] != 'unpaid', timeout=10 + TIMEOUT) assert l1.rpc.sql("SELECT amount_msat, description, status FROM invoices ORDER BY description") == {'rows': [[100, l, 'unpaid'] for l in labels[:-1]] + [[100, labels[-1], 'expired']]} + l1.daemon.wait_for_log('invoices: records updated, updating from 1') def test_listchannels_broken_message(node_factory):