Skip to content

Commit

Permalink
Fix bug with rw errors
Browse files Browse the repository at this point in the history
  • Loading branch information
petuhovskiy committed Dec 14, 2021
1 parent 385680f commit 707ce19
Showing 1 changed file with 29 additions and 16 deletions.
45 changes: 29 additions & 16 deletions src/backend/replication/walproposer.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ static term_t GetHighestTerm(TermHistory *th);
static term_t GetEpoch(WalKeeper *wk);
static void SendProposerElected(WalKeeper *wk);
static void StartStreaming(WalKeeper *wk);
static void SendAppendRequests(WalKeeper *wk, uint32 events);
static bool SendAppendRequests(WalKeeper *wk, uint32 events);


/*
Expand Down Expand Up @@ -199,6 +199,8 @@ InitEventSet(void)
static void
UpdateEventSet(WalKeeper *wk, uint32 events)
{
elog(LOG, "updating event set for [%d], new_events=%d, state=%s, eventPos=%d", (int) (wk - walkeeper), events, FormatWalKeeperState(wk->state), wk->eventPos);

/* eventPos = -1 when we don't have an event */
Assert(wk->eventPos != -1);

Expand Down Expand Up @@ -731,7 +733,8 @@ SendMessageToNode(int i, WalMessage *msg)
wk->state = SS_ACTIVE;

/* TODO: do we need to send messages immediately? */
SendAppendRequests(wk, WL_SOCKET_WRITEABLE);
if (!SendAppendRequests(wk, WL_SOCKET_WRITEABLE))
return;

UpdateEventSet(wk, WL_SOCKET_READABLE | (wk->flushWrite ? WL_SOCKET_WRITEABLE : 0));
}
Expand Down Expand Up @@ -1229,8 +1232,11 @@ WalProposerPoll(void)
/*
* Send queue messages starting from wk->currMsg until the end or non-writable
* socket, whichever comes first.
*
* Can change state if Async* functions encounter errors and reset connection.
* Returns false in this case, true otherwise.
*/
static void
static bool
SendAppendRequests(WalKeeper *wk, uint32 events)
{
int wki = wk - walkeeper;
Expand All @@ -1240,8 +1246,11 @@ SendAppendRequests(WalKeeper *wk, uint32 events)
if (wk->flushWrite)
{
if (!AsyncFlush(wki, (events & WL_SOCKET_READABLE) != 0))
/* nothing to write, should wait for writeable socket */
return;
/*
* AsyncFlush failed, that could happen if the socket is closed or
* we have nothing to write and should wait for writeable socket.
*/
return wk->state == SS_ACTIVE;

wk->currMsg = wk->currMsg->next;
wk->flushWrite = false;
Expand Down Expand Up @@ -1281,8 +1290,7 @@ SendAppendRequests(WalKeeper *wk, uint32 events)
}

elog(LOG,
"sending message msg=%p len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s",
(void*) msg,
"sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s",
req->endLsn - req->beginLsn,
LSN_FORMAT_ARGS(req->beginLsn),
LSN_FORMAT_ARGS(req->endLsn),
Expand All @@ -1305,22 +1313,25 @@ SendAppendRequests(WalKeeper *wk, uint32 events)
wk->flushWrite = true;
if (req != &msg->req)
free(req);
return;
return wk->state == SS_ACTIVE;
}
if (req != &msg->req)
free(req);

/* continue writing the next message */
wk->currMsg = wk->currMsg->next;
}

return true;
}

/*
* Receive and process all available feedback.
*
* Receive and process all available feedback. Can change state if Async* functions
* encounter errors and reset connection.
*
* NB: This function can call SendMessageToNode and produce new messages.
*/
static void
static bool
RecvAppendResponses(WalKeeper *wk)
{
XLogRecPtr minQuorumLsn;
Expand Down Expand Up @@ -1351,7 +1362,7 @@ RecvAppendResponses(WalKeeper *wk)
}

if (!readAnything)
return;
return wk->state == SS_ACTIVE;

HandleWalKeeperResponse();

Expand All @@ -1367,6 +1378,8 @@ RecvAppendResponses(WalKeeper *wk)
BroadcastMessage(CreateMessageCommitLsnOnly(lastSentLsn));
lastSentCommitLsn = minQuorumLsn;
}

return wk->state == SS_ACTIVE;
}

/* Performs the logic for advancing the state machine of the 'i'th walkeeper,
Expand Down Expand Up @@ -1791,13 +1804,13 @@ AdvancePollState(int i, uint32 events)


case SS_ACTIVE:
/* TODO: how to detect EOF? */

if (events & WL_SOCKET_WRITEABLE)
SendAppendRequests(wk, events);
if (!SendAppendRequests(wk, events))
return;

if (events & WL_SOCKET_READABLE)
RecvAppendResponses(wk);
if (!RecvAppendResponses(wk))
return;

if (wk->currMsg == NULL && wk->ackMsg == NULL)
{
Expand Down

0 comments on commit 707ce19

Please sign in to comment.