-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreader.c
165 lines (145 loc) · 5.54 KB
/
reader.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// Functions copied from backend/access/transam/xlogreader.c -- not exported
#include "postgres.h"
#include "access/xlogreader.h"
#define DEFAULT_DECODE_BUFFER_SIZE (64 * 1024)
DecodedXLogRecord *XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversized);
void report_invalid_record(XLogReaderState *state, const char *fmt, ...);
bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr);
DecodedXLogRecord *XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversized)
{
size_t required_space = DecodeXLogRecordRequiredSpace(xl_tot_len);
DecodedXLogRecord *decoded = NULL;
/* Allocate a circular decode buffer if we don't have one already. */
if (unlikely(state->decode_buffer == NULL))
{
if (state->decode_buffer_size == 0)
state->decode_buffer_size = DEFAULT_DECODE_BUFFER_SIZE;
state->decode_buffer = palloc(state->decode_buffer_size);
state->decode_buffer_head = state->decode_buffer;
state->decode_buffer_tail = state->decode_buffer;
state->free_decode_buffer = true;
}
/* Try to allocate space in the circular decode buffer. */
if (state->decode_buffer_tail >= state->decode_buffer_head)
{
/* Empty, or tail is to the right of head. */
if (state->decode_buffer_tail + required_space <=
state->decode_buffer + state->decode_buffer_size)
{
/* There is space between tail and end. */
decoded = (DecodedXLogRecord *)state->decode_buffer_tail;
decoded->oversized = false;
return decoded;
}
else if (state->decode_buffer + required_space <
state->decode_buffer_head)
{
/* There is space between start and head. */
decoded = (DecodedXLogRecord *)state->decode_buffer;
decoded->oversized = false;
return decoded;
}
}
else
{
/* Tail is to the left of head. */
if (state->decode_buffer_tail + required_space <
state->decode_buffer_head)
{
/* There is space between tail and head. */
decoded = (DecodedXLogRecord *)state->decode_buffer_tail;
decoded->oversized = false;
return decoded;
}
}
/* Not enough space in the decode buffer. Are we allowed to allocate? */
if (allow_oversized)
{
decoded = palloc_extended(required_space, MCXT_ALLOC_NO_OOM);
if (decoded == NULL)
return NULL;
decoded->oversized = true;
return decoded;
}
return NULL;
}
void report_invalid_record(XLogReaderState *state, const char *fmt, ...)
{
#define MAX_ERRORMSG_LEN 1000
va_list args;
fmt = _(fmt);
va_start(args, fmt);
vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
va_end(args);
state->errormsg_deferred = true;
}
bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess)
{
if (record->xl_tot_len < SizeOfXLogRecord)
{
report_invalid_record(state,
"invalid record length at %X/%X: expected at least %u, got %u",
LSN_FORMAT_ARGS(RecPtr),
(uint32)SizeOfXLogRecord, record->xl_tot_len);
return false;
}
if (!RmgrIdIsValid(record->xl_rmid))
{
report_invalid_record(state,
"invalid resource manager ID %u at %X/%X",
record->xl_rmid, LSN_FORMAT_ARGS(RecPtr));
return false;
}
if (randAccess)
{
/*
* We can't exactly verify the prev-link, but surely it should be less
* than the record's own address.
*/
if (!(record->xl_prev < RecPtr))
{
report_invalid_record(state,
"record with incorrect prev-link %X/%X at %X/%X",
LSN_FORMAT_ARGS(record->xl_prev),
LSN_FORMAT_ARGS(RecPtr));
return false;
}
}
else
{
/*
* Record's prev-link should exactly match our previous location. This
* check guards against torn WAL pages where a stale but valid-looking
* WAL record starts on a sector boundary.
*/
if (record->xl_prev != PrevRecPtr)
{
report_invalid_record(state,
"record with incorrect prev-link %X/%X at %X/%X",
LSN_FORMAT_ARGS(record->xl_prev),
LSN_FORMAT_ARGS(RecPtr));
return false;
}
}
return true;
}
bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
{
pg_crc32c crc;
Assert(record->xl_tot_len >= SizeOfXLogRecord);
/* Calculate the CRC */
INIT_CRC32C(crc);
COMP_CRC32C(crc, ((char *)record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord);
/* include the record header last */
COMP_CRC32C(crc, (char *)record, offsetof(XLogRecord, xl_crc));
FIN_CRC32C(crc);
if (!EQ_CRC32C(record->xl_crc, crc))
{
report_invalid_record(state,
"incorrect resource manager data checksum in record at %X/%X",
LSN_FORMAT_ARGS(recptr));
return false;
}
return true;
}