From fe05dc3f2cf318af30999899b85ba060dc8f06fd Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Fri, 8 Sep 2017 19:54:03 +0930 Subject: [PATCH] channeld: fix sync write to master. We hit: assert(!peer->handle_master_reply); #4 0x000055bba3b030a0 in master_sync_reply (peer=0x55bba41c0030, msg=0x55bba41c6a80 "", replytype=WIRE_CHANNEL_GOT_COMMITSIG_REPLY, handle=0x55bba3b041cf ) at channeld/channel.c:518 #5 0x000055bba3b049bc in handle_peer_commit_sig (conn=0x55bba41c10d0, peer=0x55bba41c0030, msg=0x55bba41c6a80 "") at channeld/channel.c:959 #6 0x000055bba3b05c69 in peer_in (conn=0x55bba41c10d0, peer=0x55bba41c0030, msg=0x55bba41c67c0 "") at channeld/channel.c:1339 #7 0x000055bba3b123eb in peer_decrypt_body (conn=0x55bba41c10d0, pcs=0x55bba41c0030) at common/cryptomsg.c:155 #8 0x000055bba3b2c63b in next_plan (conn=0x55bba41c10d0, plan=0x55bba41c1100) at ccan/ccan/io/io.c:59 We got a commit_sig from the peer while waiting for the master to reply to acknowledge the commitsig we want to send (handle_sending_commitsig_reply). The fix is to go always talk to the master synchronous, and not try to process anything but messages from the master daemon. This avoids the whole class of problems. There's a fairly simple way to do this, as ccan/io lets you override its poll call: we process any outstanding master requests there, or add the master fd to the pollfds array. Signed-off-by: Rusty Russell --- channeld/channel.c | 301 ++++++++++++++++++++++++--------------------- 1 file changed, 162 insertions(+), 139 deletions(-) diff --git a/channeld/channel.c b/channeld/channel.c index 9f70d6618cb7..b8bc05205617 100644 --- a/channeld/channel.c +++ b/channeld/channel.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -44,7 +45,7 @@ #include /* stdin == requests, 3 == peer, 4 = gossip, 5 = HSM */ -#define REQ_FD STDIN_FILENO +#define MASTER_FD STDIN_FILENO #define PEER_FD 3 #define GOSSIP_FD 4 #define HSM_FD 5 @@ -92,12 +93,10 @@ struct peer { struct io_conn *peer_conn; struct daemon_conn gossip_client; - struct daemon_conn master; - /* If we're waiting for a specific reply, defer other messages. */ - enum channel_wire_type master_reply_type; - void (*handle_master_reply)(struct peer *peer, const u8 *msg); - struct msg_queue master_deferred; + /* Messages from master: we queue them since we might be waiting for + * a specific reply. */ + struct msg_queue from_master; struct timers timers; struct oneshot *commit_timer; @@ -337,13 +336,13 @@ static struct io_plan *handle_peer_funding_locked(struct io_conn *conn, "Wrong channel id in %s", tal_hex(trc, msg)); peer->funding_locked[REMOTE] = true; - daemon_conn_send(&peer->master, - take(towire_channel_got_funding_locked(peer, + wire_sync_write(MASTER_FD, + take(towire_channel_got_funding_locked(peer, &peer->remote_per_commit))); if (peer->funding_locked[LOCAL]) { - daemon_conn_send(&peer->master, - take(towire_channel_normal_operation(peer))); + wire_sync_write(MASTER_FD, + take(towire_channel_normal_operation(peer))); } send_announcement_signatures(peer); @@ -357,7 +356,7 @@ static void announce_channel(struct peer *peer) send_channel_update(peer, false); /* Tell the master that we just announced the channel, * so it may announce the node */ - daemon_conn_send(&peer->master, take(towire_channel_announced(peer))); + wire_sync_write(MASTER_FD, take(towire_channel_announced(peer))); } static struct io_plan *handle_peer_announcement_signatures(struct io_conn *conn, @@ -487,41 +486,41 @@ static void maybe_send_shutdown(struct peer *peer) peer->shutdown_sent[LOCAL] = true; } -/* Master has acknowledged that we're sending commitment, so send it. */ -static void handle_sending_commitsig_reply(struct peer *peer, const u8 *msg) +/* This queues other traffic from the master until we get reply. */ +static u8 *master_wait_sync_reply(const tal_t *ctx, + struct peer *peer, const u8 *msg, + enum channel_wire_type replytype) { - status_trace("Sending commit_sig with %zu htlc sigs", - tal_count(peer->next_commit_sigs->htlc_sigs)); + u8 *reply; - peer->next_index[REMOTE]++; + status_trace("Sending master %s", + channel_wire_type_name(fromwire_peektype(msg))); - msg = towire_commitment_signed(peer, &peer->channel_id, - &peer->next_commit_sigs->commit_sig, - peer->next_commit_sigs->htlc_sigs); - msg_enqueue(&peer->peer_out, take(msg)); - peer->next_commit_sigs = tal_free(peer->next_commit_sigs); + if (!wire_sync_write(MASTER_FD, msg)) + status_failed(WIRE_CHANNEL_INTERNAL_ERROR, + "Could not set sync write to master: %s", + strerror(errno)); - maybe_send_shutdown(peer); + status_trace("... , awaiting %s", + channel_wire_type_name(replytype)); - /* Timer now considered expired, you can add a new one. */ - peer->commit_timer = NULL; - start_commit_timer(peer); - - if (shutdown_complete(peer)) - io_break(peer); -} - -/* This blocks other traffic from the master until we get reply. */ -static void master_sync_reply(struct peer *peer, const u8 *msg, - enum channel_wire_type replytype, - void (*handle)(struct peer *peer, const u8 *msg)) -{ - assert(!peer->handle_master_reply); + for (;;) { + reply = wire_sync_read(ctx, MASTER_FD); + if (!reply) + status_failed(WIRE_CHANNEL_INTERNAL_ERROR, + "Could not set sync read from master: %s", + strerror(errno)); + if (fromwire_peektype(reply) == replytype) { + status_trace("Got it!"); + break; + } - peer->handle_master_reply = handle; - peer->master_reply_type = replytype; + status_trace("Nope, got %s instead", + channel_wire_type_name(fromwire_peektype(reply))); + msg_enqueue(&peer->from_master, take(reply)); + } - daemon_conn_send(&peer->master, msg); + return reply; } static struct commit_sigs *calc_commitsigs(const tal_t *ctx, @@ -615,10 +614,8 @@ static void send_commit(struct peer *peer) /* FIXME: Document this requirement in BOLT 2! */ /* We can't send two commits in a row. */ - if (channel_awaiting_revoke_and_ack(peer->channel) - || peer->handle_master_reply) { - status_trace("Can't send commit: waiting for revoke_and_ack %s", - peer->handle_master_reply ? "processing" : "reply"); + if (channel_awaiting_revoke_and_ack(peer->channel)) { + status_trace("Can't send commit: waiting for revoke_and_ack"); /* Mark this as done and try again. */ peer->commit_timer = NULL; start_commit_timer(peer); @@ -655,9 +652,30 @@ static void send_commit(struct peer *peer) changed_htlcs, &peer->next_commit_sigs->commit_sig, peer->next_commit_sigs->htlc_sigs); - master_sync_reply(peer, take(msg), - WIRE_CHANNEL_SENDING_COMMITSIG_REPLY, - handle_sending_commitsig_reply); + /* Message is empty; receiving it is the point. */ + master_wait_sync_reply(tmpctx, peer, take(msg), + WIRE_CHANNEL_SENDING_COMMITSIG_REPLY); + + status_trace("Sending commit_sig with %zu htlc sigs", + tal_count(peer->next_commit_sigs->htlc_sigs)); + + peer->next_index[REMOTE]++; + + msg = towire_commitment_signed(peer, &peer->channel_id, + &peer->next_commit_sigs->commit_sig, + peer->next_commit_sigs->htlc_sigs); + msg_enqueue(&peer->peer_out, take(msg)); + peer->next_commit_sigs = tal_free(peer->next_commit_sigs); + + maybe_send_shutdown(peer); + + /* Timer now considered expired, you can add a new one. */ + peer->commit_timer = NULL; + start_commit_timer(peer); + + if (shutdown_complete(peer)) + io_break(peer); + tal_free(tmpctx); } @@ -834,12 +852,6 @@ static u8 *got_commitsig_msg(const tal_t *ctx, return msg; } -/* Tell peer to continue now master has replied. */ -static void handle_reply_wake_peer(struct peer *peer, const u8 *msg) -{ - io_wake(peer); -} - static struct io_plan *handle_peer_commit_sig(struct io_conn *conn, struct peer *peer, const u8 *msg) { @@ -957,12 +969,10 @@ static struct io_plan *handle_peer_commit_sig(struct io_conn *conn, msg = got_commitsig_msg(tmpctx, peer->next_index[LOCAL], &commit_sig, htlc_sigs, changed_htlcs, txs[0]); - master_sync_reply(peer, take(msg), - WIRE_CHANNEL_GOT_COMMITSIG_REPLY, - handle_reply_wake_peer); - - /* And peer waits for reply. */ - return io_wait(conn, peer, send_revocation, peer); + master_wait_sync_reply(tmpctx, peer, take(msg), + WIRE_CHANNEL_GOT_COMMITSIG_REPLY); + tal_free(tmpctx); + return send_revocation(conn, peer); } static u8 *got_revoke_msg(const tal_t *ctx, u64 revoke_num, @@ -1013,6 +1023,7 @@ static struct io_plan *handle_peer_revoke_and_ack(struct io_conn *conn, struct privkey privkey; struct channel_id channel_id; struct pubkey per_commit_point, next_per_commit; + tal_t *tmpctx = tal_tmpctx(msg); const struct htlc **changed_htlcs = tal_arr(msg, const struct htlc *, 0); if (!fromwire_revoke_and_ack(msg, NULL, &channel_id, &old_commit_secret, @@ -1059,12 +1070,11 @@ static struct io_plan *handle_peer_revoke_and_ack(struct io_conn *conn, status_trace("No commits outstanding after recv revoke_and_ack"); /* Tell master about things this locks in, wait for response */ - msg = got_revoke_msg(msg, peer->next_index[REMOTE] - 2, + msg = got_revoke_msg(tmpctx, peer->next_index[REMOTE] - 2, &old_commit_secret, &next_per_commit, changed_htlcs); - master_sync_reply(peer, take(msg), - WIRE_CHANNEL_GOT_REVOKE_REPLY, - handle_reply_wake_peer); + master_wait_sync_reply(tmpctx, peer, take(msg), + WIRE_CHANNEL_GOT_REVOKE_REPLY); peer->old_remote_per_commit = peer->remote_per_commit; peer->remote_per_commit = next_per_commit; @@ -1075,8 +1085,8 @@ static struct io_plan *handle_peer_revoke_and_ack(struct io_conn *conn, type_to_string(trc, struct pubkey, &peer->old_remote_per_commit)); - /* And peer waits for reply. */ - return io_wait(conn, peer, accepted_revocation, peer); + tal_free(tmpctx); + return accepted_revocation(conn, peer); } static struct io_plan *handle_peer_fulfill_htlc(struct io_conn *conn, @@ -1275,8 +1285,8 @@ static struct io_plan *handle_pong(struct io_conn *conn, status_failed(WIRE_CHANNEL_PEER_READ_FAILED, "Unexpected pong"); peer->num_pings_outstanding--; - daemon_conn_send(&peer->master, - take(towire_channel_ping_reply(pong, tal_len(pong)))); + wire_sync_write(MASTER_FD, + take(towire_channel_ping_reply(pong, tal_len(pong)))); return peer_read_message(conn, &peer->pcs, peer_in); } @@ -1291,8 +1301,8 @@ static struct io_plan *handle_peer_shutdown(struct io_conn *conn, status_failed(WIRE_CHANNEL_PEER_READ_FAILED, "Bad shutdown"); /* Tell master, it will tell us what to send (if any). */ - daemon_conn_send(&peer->master, - take(towire_channel_got_shutdown(peer, scriptpubkey))); + wire_sync_write(MASTER_FD, + take(towire_channel_got_shutdown(peer, scriptpubkey))); peer->shutdown_sent[REMOTE] = true; if (shutdown_complete(peer)) @@ -1681,8 +1691,8 @@ static void handle_funding_locked(struct peer *peer, const u8 *msg) peer->funding_locked[LOCAL] = true; if (peer->funding_locked[REMOTE]) { - daemon_conn_send(&peer->master, - take(towire_channel_normal_operation(peer))); + wire_sync_write(MASTER_FD, + take(towire_channel_normal_operation(peer))); } } @@ -1737,7 +1747,7 @@ static void handle_offer_htlc(struct peer *peer, const u8 *inmsg) /* Tell the master. */ msg = towire_channel_offer_htlc_reply(inmsg, peer->htlc_id, 0, NULL); - daemon_conn_send(&peer->master, take(msg)); + wire_sync_write(MASTER_FD, take(msg)); peer->htlc_id++; return; case CHANNEL_ERR_INVALID_EXPIRY: @@ -1776,7 +1786,7 @@ static void handle_offer_htlc(struct peer *peer, const u8 *inmsg) /* Note: tal_fmt doesn't set tal_len() to exact length, so fix here. */ tal_resize(&failmsg, strlen(failmsg)+1); msg = towire_channel_offer_htlc_reply(inmsg, 0, failcode, (u8*)failmsg); - daemon_conn_send(&peer->master, take(msg)); + wire_sync_write(MASTER_FD, take(msg)); } static void handle_preimage(struct peer *peer, const u8 *inmsg) @@ -1885,8 +1895,8 @@ static void handle_ping_cmd(struct peer *peer, const u8 *inmsg) * it MUST ignore the `ping`. */ if (num_pong_bytes >= 65532) - daemon_conn_send(&peer->master, - take(towire_channel_ping_reply(peer, 0))); + wire_sync_write(MASTER_FD, + take(towire_channel_ping_reply(peer, 0))); else peer->num_pings_outstanding++; } @@ -1903,51 +1913,31 @@ static void handle_shutdown_cmd(struct peer *peer, const u8 *inmsg) start_commit_timer(peer); } -static struct io_plan *req_in(struct io_conn *conn, struct daemon_conn *master) +static void req_in(struct peer *peer, const u8 *msg) { - struct peer *peer = container_of(master, struct peer, master); - enum channel_wire_type t = fromwire_peektype(master->msg_in); - - /* Waiting for something specific? Defer others. */ - if (peer->handle_master_reply) { - void (*handle)(struct peer *peer, const u8 *msg); - - if (t != peer->master_reply_type) { - msg_enqueue(&peer->master_deferred, - take(master->msg_in)); - master->msg_in = NULL; - goto out_next; - } - - /* Just in case it resets this. */ - handle = peer->handle_master_reply; - peer->handle_master_reply = NULL; - - handle(peer, master->msg_in); - goto out; - } + enum channel_wire_type t = fromwire_peektype(msg); switch (t) { case WIRE_CHANNEL_FUNDING_LOCKED: - handle_funding_locked(peer, master->msg_in); + handle_funding_locked(peer, msg); goto out; case WIRE_CHANNEL_FUNDING_ANNOUNCE_DEPTH: - handle_funding_announce_depth(peer, master->msg_in); + handle_funding_announce_depth(peer, msg); goto out; case WIRE_CHANNEL_OFFER_HTLC: - handle_offer_htlc(peer, master->msg_in); + handle_offer_htlc(peer, msg); goto out; case WIRE_CHANNEL_FULFILL_HTLC: - handle_preimage(peer, master->msg_in); + handle_preimage(peer, msg); goto out; case WIRE_CHANNEL_FAIL_HTLC: - handle_fail(peer, master->msg_in); + handle_fail(peer, msg); goto out; case WIRE_CHANNEL_PING: - handle_ping_cmd(peer, master->msg_in); + handle_ping_cmd(peer, msg); goto out; case WIRE_CHANNEL_SEND_SHUTDOWN: - handle_shutdown_cmd(peer, master->msg_in); + handle_shutdown_cmd(peer, msg); goto out; case WIRE_CHANNEL_BAD_COMMAND: @@ -1978,25 +1968,7 @@ static struct io_plan *req_in(struct io_conn *conn, struct daemon_conn *master) channel_wire_type_name(t)); out: - /* In case we've now processed reply, process packet backlog. */ - if (!peer->handle_master_reply) { - const u8 *msg = msg_dequeue(&peer->master_deferred); - if (msg) { - /* Free old packet exactly like daemon_conn_read_next */ - master->msg_in = tal_free(master->msg_in); - master->msg_in = cast_const(u8 *, tal_steal(peer, msg)); - return req_in(conn, master); - } - } - -out_next: - return daemon_conn_read_next(conn, master); -} - -static void master_gone(struct io_conn *unused, struct daemon_conn *dc) -{ - /* Can't tell master, it's gone. */ - exit(2); + tal_free(msg); } /* We do this synchronously. */ @@ -2023,7 +1995,7 @@ static void init_channel(struct peer *peer) assert(!(fcntl(REQ_FD, F_GETFL) & O_NONBLOCK)); - msg = wire_sync_read(peer, REQ_FD); + msg = wire_sync_read(peer, MASTER_FD); if (!fromwire_channel_init(peer, msg, NULL, &peer->chain_hash, &funding_txid, &funding_txout, @@ -2070,9 +2042,7 @@ static void init_channel(struct peer *peer) status_failed(WIRE_CHANNEL_BAD_COMMAND, "Init: %s", tal_hex(msg, msg)); - /* After this we'll be async, so set up now. */ - daemon_conn_init(peer, &peer->master, REQ_FD, req_in, master_gone); - status_setup_async(&peer->master); + status_setup_sync(MASTER_FD); status_trace("init %s: remote_per_commit = %s, old_remote_per_commit = %s" " next_idx_local = %"PRIu64 @@ -2156,21 +2126,72 @@ static void send_shutdown_complete(struct peer *peer) } /* Now we can tell master shutdown is complete. */ - daemon_conn_send(&peer->master, - take(towire_channel_shutdown_complete(peer, - &peer->pcs.cs))); - daemon_conn_send_fd(&peer->master, PEER_FD); - daemon_conn_send_fd(&peer->master, GOSSIP_FD); - - if (!daemon_conn_sync_flush(&peer->master)) - status_failed(WIRE_CHANNEL_INTERNAL_ERROR, "Flushing master"); + wire_sync_write(MASTER_FD, + take(towire_channel_shutdown_complete(peer, + &peer->pcs.cs))); + fdpass_send(MASTER_FD, PEER_FD); + fdpass_send(MASTER_FD, GOSSIP_FD); + close(MASTER_FD); +} + +static bool process_reqs(struct peer *peer) +{ + const u8 *msg; + bool changed = false; + + /* In case we've deferred, process packet backlog. */ + while ((msg = msg_dequeue(&peer->from_master)) != NULL) { + status_trace("Now dealing with deferred %s", + channel_wire_type_name(fromwire_peektype(msg))); + req_in(peer, msg); + changed = true; + } + + return changed; +} + +static struct peer *peer; + +/* If this becomes a common pattern, we could make it a helper in common/ */ +static int poll_with_masterfd(struct pollfd *fds, nfds_t nfds, int timeout) +{ + struct pollfd *fds_plus; + int r; + + /* This can change things, so return as if poll found nothing. */ + if (process_reqs(peer)) + return 0; + + /* Add master fd to fds. */ + fds_plus = tal_dup_arr(peer, struct pollfd, fds, nfds, 1); + fds_plus[nfds].fd = MASTER_FD; + fds_plus[nfds].events = POLLIN; + fds_plus[nfds].revents = 0; + + r = poll(fds_plus, nfds+1, timeout); + if (r > 0) { + if (fds_plus[nfds].revents & POLLIN) { + u8 *msg = wire_sync_read(peer, MASTER_FD); + + if (!msg) + status_failed(WIRE_CHANNEL_BAD_COMMAND, + "Can't read command: %s", + strerror(errno)); + msg_enqueue(&peer->from_master, take(msg)); + r--; + } else if (fds_plus[nfds].revents & (POLLHUP|POLLNVAL|POLLERR)) + /* Can't report error, master gone. */ + errx(2, "Error polling master fd"); + } + /* Copy back revents values */ + memcpy(fds, fds_plus, nfds * sizeof(*fds)); + tal_free(fds_plus); + return r; } int main(int argc, char *argv[]) { - struct peer *peer = tal(NULL, struct peer); int i; - if (argc == 2 && streq(argv[1], "--version")) { printf("%s\n", version()); exit(0); @@ -2183,14 +2204,13 @@ int main(int argc, char *argv[]) secp256k1_ctx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY | SECP256K1_CONTEXT_SIGN); + peer = tal(NULL, struct peer); peer->num_pings_outstanding = 0; timers_init(&peer->timers, time_mono()); peer->commit_timer = NULL; peer->have_sigs[LOCAL] = peer->have_sigs[REMOTE] = false; peer->announce_depth_reached = false; - peer->handle_master_reply = NULL; - peer->master_reply_type = 0; - msg_queue_init(&peer->master_deferred, peer); + msg_queue_init(&peer->from_master, peer); msg_queue_init(&peer->peer_out, peer); peer->next_commit_sigs = NULL; peer->shutdown_sent[LOCAL] = false; @@ -2212,6 +2232,9 @@ int main(int argc, char *argv[]) /* Read init_channel message sync. */ init_channel(peer); + /* Make sure we process and listen for master msgs. */ + io_poll_override(poll_with_masterfd); + for (;;) { struct timer *expired = NULL; io_loop(&peer->timers, &expired);