Skip to content

Commit 4d811a9

Browse files
committed
Address PR comments
Signed-off-by: Liran Funaro <liran.funaro@gmail.com>
1 parent a38d7eb commit 4d811a9

File tree

9 files changed

+128
-104
lines changed

9 files changed

+128
-104
lines changed

api/protoblocktx/block_tx.pb.go

Lines changed: 40 additions & 39 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/protoblocktx/block_tx.proto

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,8 @@ enum Status {
107107
MALFORMED_MISSING_TX_ID = 102; // Envelope is missing transaction ID.
108108

109109
// Stored in the state database. Prevents submitting a transaction with the same ID.
110-
MALFORMED_UNSUPPORTED_TX_PAYLOAD = 103; // Unsupported transaction payload type.
111-
MALFORMED_BAD_PAYLOAD = 104; // Cannot unmarshal envelope's payload.
110+
MALFORMED_UNSUPPORTED_ENVELOPE_PAYLOAD = 103; // Unsupported envelope payload type.
111+
MALFORMED_BAD_ENVELOPE_PAYLOAD = 104; // Cannot unmarshal envelope's payload.
112112
MALFORMED_TX_ID_CONFLICT = 105; // Envelope's TX ID does not match the payload's TX ID.
113113
MALFORMED_EMPTY_NAMESPACES = 106; // No namespaces provided.
114114
MALFORMED_DUPLICATE_NAMESPACE = 107; // Duplicate namespace detected.

service/coordinator/coordinator_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ func TestCoordinatorServiceRejectedTx(t *testing.T) {
306306
{
307307
TxNum: 0,
308308
Id: "rejected",
309-
Status: protoblocktx.Status_MALFORMED_UNSUPPORTED_TX_PAYLOAD,
309+
Status: protoblocktx.Status_MALFORMED_UNSUPPORTED_ENVELOPE_PAYLOAD,
310310
},
311311
},
312312
})
@@ -317,11 +317,11 @@ func TestCoordinatorServiceRejectedTx(t *testing.T) {
317317
)
318318

319319
env.requireStatus(ctx, t, map[string]*protoblocktx.StatusWithHeight{
320-
"rejected": {Code: protoblocktx.Status_MALFORMED_UNSUPPORTED_TX_PAYLOAD, BlockNumber: 1},
320+
"rejected": {Code: protoblocktx.Status_MALFORMED_UNSUPPORTED_ENVELOPE_PAYLOAD, BlockNumber: 1},
321321
}, nil)
322322

323323
test.RequireIntMetricValue(t, 1, env.coordinator.metrics.transactionCommittedTotal.WithLabelValues(
324-
protoblocktx.Status_MALFORMED_UNSUPPORTED_TX_PAYLOAD.String(),
324+
protoblocktx.Status_MALFORMED_UNSUPPORTED_ENVELOPE_PAYLOAD.String(),
325325
))
326326
test.RequireIntMetricValue(t, preMetricsValue, env.coordinator.metrics.transactionCommittedTotal.WithLabelValues(
327327
protoblocktx.Status_COMMITTED.String(),

service/sidecar/mapping.go

Lines changed: 64 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ const (
4343
statusIdx = int(common.BlockMetadataIndex_TRANSACTIONS_FILTER)
4444
)
4545

46-
func mapBlock(block *common.Block, txIDToHeight *utils.SyncMap[string, types.Height]) *scBlockWithStatus {
46+
func mapBlock(block *common.Block, txIDToHeight *utils.SyncMap[string, types.Height]) (*scBlockWithStatus, error) {
4747
// Prepare block's metadata.
4848
if block.Metadata == nil {
4949
block.Metadata = &common.BlockMetadata{}
@@ -63,7 +63,7 @@ func mapBlock(block *common.Block, txIDToHeight *utils.SyncMap[string, types.Hei
6363
block: block,
6464
},
6565
txIDToHeight: txIDToHeight,
66-
}
66+
}, nil
6767
}
6868

6969
txCount := len(block.Data.Data)
@@ -83,92 +83,95 @@ func mapBlock(block *common.Block, txIDToHeight *utils.SyncMap[string, types.Hei
8383
}
8484
for msgIndex, msg := range block.Data.Data {
8585
logger.Debugf("Mapping transaction [blk,tx] = [%d,%d]", mappedBlock.block.Number, msgIndex)
86-
mappedBlock.mapMessage(uint32(msgIndex), msg) //nolint:gosec // int -> uint32.
86+
err := mappedBlock.mapMessage(uint32(msgIndex), msg) //nolint:gosec // int -> uint32.
87+
if err != nil {
88+
// This can never occur unless there is a bug in the relay.
89+
return nil, err
90+
}
8791
}
88-
return mappedBlock
92+
return mappedBlock, nil
8993
}
9094

91-
func (b *scBlockWithStatus) mapMessage(msgIndex uint32, msg []byte) {
95+
func (b *scBlockWithStatus) mapMessage(msgIndex uint32, msg []byte) error {
9296
data, hdr, envErr := serialization.UnwrapEnvelope(msg)
9397
if envErr != nil {
94-
b.rejectTx(msgIndex, hdr, protoblocktx.Status_MALFORMED_BAD_ENVELOPE, envErr.Error())
95-
return
98+
return b.rejectNonDBStatusTx(msgIndex, hdr, protoblocktx.Status_MALFORMED_BAD_ENVELOPE, envErr.Error())
9699
}
97100
if hdr.TxId == "" {
98-
b.rejectTx(msgIndex, hdr, protoblocktx.Status_MALFORMED_MISSING_TX_ID, "no TX ID")
99-
return
101+
return b.rejectNonDBStatusTx(msgIndex, hdr, protoblocktx.Status_MALFORMED_MISSING_TX_ID, "no TX ID")
100102
}
101103

102104
switch common.HeaderType(hdr.Type) {
103105
default:
104-
b.rejectTx(msgIndex, hdr, protoblocktx.Status_MALFORMED_UNSUPPORTED_TX_PAYLOAD, "message type")
106+
return b.rejectTx(msgIndex, hdr, protoblocktx.Status_MALFORMED_UNSUPPORTED_ENVELOPE_PAYLOAD, "message type")
105107
case common.HeaderType_CONFIG:
106108
_, err := policy.ParsePolicyFromConfigTx(msg)
107109
if err != nil {
108-
b.rejectTx(msgIndex, hdr, protoblocktx.Status_MALFORMED_CONFIG_TX_INVALID, err.Error())
109-
return
110+
return b.rejectTx(msgIndex, hdr, protoblocktx.Status_MALFORMED_CONFIG_TX_INVALID, err.Error())
110111
}
111-
b.appendTx(msgIndex, hdr, configTx(hdr.TxId, msg))
112112
b.isConfig = true
113+
return b.appendTx(msgIndex, hdr, configTx(hdr.TxId, msg))
113114
case common.HeaderType_MESSAGE:
114115
tx, err := serialization.UnmarshalTx(data)
115116
if err != nil {
116-
b.rejectTx(msgIndex, hdr, protoblocktx.Status_MALFORMED_BAD_PAYLOAD, err.Error())
117-
return
117+
return b.rejectTx(msgIndex, hdr, protoblocktx.Status_MALFORMED_BAD_ENVELOPE_PAYLOAD, err.Error())
118118
}
119-
if hdr.TxId != tx.Id {
120-
b.rejectTx(msgIndex, hdr, protoblocktx.Status_MALFORMED_TX_ID_CONFLICT, "tx ID does not match")
119+
if status := verifyTxForm(tx, hdr); status != statusNotYetValidated {
120+
return b.rejectTx(msgIndex, hdr, status, "malformed tx")
121121
}
122-
if status := verifyTxForm(tx); status != statusNotYetValidated {
123-
b.rejectTx(msgIndex, hdr, status, "malformed tx")
124-
return
125-
}
126-
b.appendTx(msgIndex, hdr, tx)
122+
return b.appendTx(msgIndex, hdr, tx)
127123
}
128124
}
129125

130-
func (b *scBlockWithStatus) appendTx(txNum uint32, hdr *common.ChannelHeader, tx *protoblocktx.Tx) {
131-
if idAlreadyExists := b.addTxIDMapping(txNum, hdr); idAlreadyExists {
132-
return
126+
func (b *scBlockWithStatus) appendTx(txNum uint32, hdr *common.ChannelHeader, tx *protoblocktx.Tx) error {
127+
if idAlreadyExists, err := b.addTxIDMapping(txNum, hdr); idAlreadyExists || err != nil {
128+
return err
133129
}
134130
b.block.TxsNum = append(b.block.TxsNum, txNum)
135131
b.block.Txs = append(b.block.Txs, tx)
136132
debugTx(hdr, "included: %s", hdr.TxId)
133+
return nil
137134
}
138135

139136
func (b *scBlockWithStatus) rejectTx(
140137
txNum uint32, hdr *common.ChannelHeader, status protoblocktx.Status, reason string,
141-
) {
138+
) error {
142139
if !IsStatusStoredInDB(status) {
143-
err := b.withStatus.setFinalStatus(txNum, status)
144-
if err != nil {
145-
// This should never happen unless there is a bug in the mapper.
146-
logger.Errorf("[BUG] failed to set final-status for tx %d: %v", txNum, err)
147-
return
148-
}
149-
debugTx(hdr, "excluded: %s (%s)", &status, reason)
150-
return
140+
return b.rejectNonDBStatusTx(txNum, hdr, status, reason)
151141
}
152-
if idAlreadyExists := b.addTxIDMapping(txNum, hdr); idAlreadyExists {
153-
return
142+
if idAlreadyExists, err := b.addTxIDMapping(txNum, hdr); idAlreadyExists || err != nil {
143+
return err
154144
}
155145
b.block.Rejected = append(b.block.Rejected, &protocoordinatorservice.TxStatusInfo{
156146
TxNum: txNum,
157147
Id: hdr.TxId,
158148
Status: status,
159149
})
160150
debugTx(hdr, "rejected: %s (%s)", &status, reason)
151+
return nil
161152
}
162153

163-
func (b *scBlockWithStatus) addTxIDMapping(txNum uint32, hdr *common.ChannelHeader) (idAlreadyExists bool) {
154+
func (b *scBlockWithStatus) rejectNonDBStatusTx(
155+
txNum uint32, hdr *common.ChannelHeader, status protoblocktx.Status, reason string,
156+
) error {
157+
if IsStatusStoredInDB(status) {
158+
// This can never occur unless there is a bug in the relay.
159+
return errors.Newf("[BUG] status should be stored [blk:%d,num:%d]: %s", b.block.Number, txNum, &status)
160+
}
161+
err := b.withStatus.setFinalStatus(txNum, status)
162+
debugTx(hdr, "excluded: %s (%s)", &status, reason)
163+
return err
164+
}
165+
166+
func (b *scBlockWithStatus) addTxIDMapping(txNum uint32, hdr *common.ChannelHeader) (idAlreadyExists bool, err error) {
164167
_, idAlreadyExists = b.txIDToHeight.LoadOrStore(hdr.TxId, types.Height{
165168
BlockNum: b.block.Number,
166169
TxNum: txNum,
167170
})
168171
if idAlreadyExists {
169-
b.rejectTx(txNum, hdr, protoblocktx.Status_REJECTED_DUPLICATE_TX_ID, "duplicate tx")
172+
err = b.rejectNonDBStatusTx(txNum, hdr, protoblocktx.Status_REJECTED_DUPLICATE_TX_ID, "duplicate tx")
170173
}
171-
return idAlreadyExists
174+
return idAlreadyExists, err
172175
}
173176

174177
func (b *blockWithStatus) setFinalStatus(txNum uint32, status protoblocktx.Status) error {
@@ -230,13 +233,9 @@ func configTx(id string, value []byte) *protoblocktx.Tx {
230233

231234
// verifyTxForm verifies that a TX is not malformed.
232235
// It returns status MALFORMED_<reason> if it is malformed, or not-validated otherwise.
233-
func verifyTxForm(tx *protoblocktx.Tx) protoblocktx.Status {
234-
if tx.Id == "" || !utf8.ValidString(tx.Id) {
235-
// ASN.1. Marshalling only supports valid UTF8 strings.
236-
// This case is unlikely as the message received via protobuf message which also only support
237-
// valid UTF8 strings.
238-
// Thus, we do not create a designated status for such error.
239-
return protoblocktx.Status_MALFORMED_MISSING_TX_ID
236+
func verifyTxForm(tx *protoblocktx.Tx, hdr *common.ChannelHeader) protoblocktx.Status {
237+
if status := validateTxID(tx, hdr); status != statusNotYetValidated {
238+
return status
240239
}
241240

242241
if len(tx.Namespaces) == 0 {
@@ -268,6 +267,22 @@ func verifyTxForm(tx *protoblocktx.Tx) protoblocktx.Status {
268267
return statusNotYetValidated
269268
}
270269

270+
func validateTxID(tx *protoblocktx.Tx, hdr *common.ChannelHeader) protoblocktx.Status {
271+
if hdr.TxId != tx.Id {
272+
// This is a temporary workaround. Later TX versions will not include the TX ID.
273+
return protoblocktx.Status_MALFORMED_TX_ID_CONFLICT
274+
}
275+
276+
if tx.Id == "" || !utf8.ValidString(tx.Id) {
277+
// ASN.1. Marshalling only supports valid UTF8 strings.
278+
// This case is unlikely as the message received via protobuf message which also only support
279+
// valid UTF8 strings.
280+
// Thus, we do not create a designated status for such error.
281+
return protoblocktx.Status_MALFORMED_MISSING_TX_ID
282+
}
283+
return statusNotYetValidated
284+
}
285+
271286
func checkNamespaceFormation(ns *protoblocktx.TxNamespace) protoblocktx.Status {
272287
if len(ns.ReadWrites) == 0 && len(ns.BlindWrites) == 0 {
273288
return protoblocktx.Status_MALFORMED_NO_WRITES
@@ -320,14 +335,14 @@ func checkMetaNamespace(txNs *protoblocktx.TxNamespace) protoblocktx.Status {
320335

321336
// checkKeys verifies there are no duplicate keys and no nil keys.
322337
func checkKeys(keys [][]byte) protoblocktx.Status {
323-
seenKeys := make(map[string]any, len(keys))
338+
uniqueKeys := make(map[string]any, len(keys))
324339
for _, k := range keys {
325340
if len(k) == 0 {
326341
return protoblocktx.Status_MALFORMED_EMPTY_KEY
327342
}
328-
seenKeys[string(k)] = nil
343+
uniqueKeys[string(k)] = nil
329344
}
330-
if len(seenKeys) != len(keys) {
345+
if len(uniqueKeys) != len(keys) {
331346
return protoblocktx.Status_MALFORMED_DUPLICATE_KEY_IN_READ_WRITE_SET
332347
}
333348
return statusNotYetValidated

0 commit comments

Comments
 (0)