-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathservice.js
163 lines (150 loc) · 4.71 KB
/
service.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import * as Server from '@ucanto/server'
import * as Client from '@ucanto/client'
import * as CAR from '@ucanto/transport/car'
import * as AggregatorCaps from '@web3-storage/capabilities/filecoin/aggregator'
import * as DealerCaps from '@web3-storage/capabilities/filecoin/dealer'
// eslint-disable-next-line no-unused-vars
import * as API from '../types.js'
import {
QueueOperationFailed,
StoreOperationFailed,
UnexpectedState,
UnsupportedCapability,
} from '../errors.js'
/**
* @param {API.Input<AggregatorCaps.pieceOffer>} input
* @param {import('./api.js').ServiceContext} context
* @returns {Promise<API.UcantoInterface.Result<API.PieceOfferSuccess, API.PieceOfferFailure> | API.UcantoInterface.JoinBuilder<API.PieceOfferSuccess>>}
*/
export const pieceOffer = async ({ capability }, context) => {
// Only service principal can perform offer
if (capability.with !== context.id.did()) {
return {
error: new UnsupportedCapability({ capability }),
}
}
const { piece, group } = capability.nb
// dedupe
const hasRes = await context.pieceStore.has({ piece, group })
if (hasRes.error) {
return {
error: new StoreOperationFailed(hasRes.error.message),
}
}
if (!hasRes.ok) {
const addRes = await context.pieceQueue.add({ piece, group })
if (addRes.error) {
return {
error: new QueueOperationFailed(addRes.error.message),
}
}
}
const fx = await AggregatorCaps.pieceAccept
.invoke({
issuer: context.id,
audience: context.id,
with: context.id.did(),
nb: {
piece,
group,
},
expiration: Infinity,
})
.delegate()
/** @type {API.UcantoInterface.OkBuilder<API.PieceOfferSuccess, API.PieceOfferFailure>} */
const result = Server.ok({ piece })
return result.join(fx.link())
}
/**
* @param {API.Input<AggregatorCaps.pieceAccept>} input
* @param {import('./api.js').ServiceContext} context
* @returns {Promise<API.UcantoInterface.Result<API.PieceAcceptSuccess, API.PieceAcceptFailure> | API.UcantoInterface.JoinBuilder<API.PieceAcceptSuccess>>}
*/
export const pieceAccept = async ({ capability }, context) => {
// Only service principal can perform accept
if (capability.with !== context.id.did()) {
return {
error: new UnsupportedCapability({ capability }),
}
}
const { piece, group } = capability.nb
// Get inclusion proof for piece associated with this group
const getInclusionRes = await context.inclusionStore.query({ piece, group })
if (getInclusionRes.error) {
return {
error: new StoreOperationFailed(getInclusionRes.error?.message),
}
}
if (!getInclusionRes.ok.results.length) {
return {
error: new UnexpectedState(
`no inclusion proof found for pair {${piece}, ${group}}`
),
}
}
// Get buffered pieces
const [{ aggregate, inclusion }] = getInclusionRes.ok.results
const getAggregateRes = await context.aggregateStore.get({ aggregate })
if (getAggregateRes.error) {
return {
error: new StoreOperationFailed(getAggregateRes.error.message),
}
}
const { pieces } = getAggregateRes.ok
// Create effect for receipt
const fx = await DealerCaps.aggregateOffer
.invoke({
issuer: context.id,
audience: context.dealerId,
with: context.id.did(),
nb: {
aggregate,
pieces,
},
expiration: Infinity,
})
.delegate()
/** @type {API.UcantoInterface.OkBuilder<API.PieceAcceptSuccess, API.PieceAcceptFailure>} */
const result = Server.ok({ piece, aggregate, inclusion })
return result.join(fx.link())
}
/**
* @param {import('./api.js').ServiceContext} context
*/
export function createService(context) {
return {
piece: {
offer: Server.provideAdvanced({
capability: AggregatorCaps.pieceOffer,
handler: (input) => pieceOffer(input, context),
}),
accept: Server.provideAdvanced({
capability: AggregatorCaps.pieceAccept,
handler: (input) => pieceAccept(input, context),
}),
},
}
}
/**
* @param {API.UcantoServerContext & import('./api.js').ServiceContext} context
*/
export const createServer = (context) =>
Server.create({
id: context.id,
codec: context.codec || CAR.inbound,
service: createService(context),
catch: (error) => context.errorReporter.catch(error),
validateAuthorization: (auth) => context.validateAuthorization(auth),
})
/**
* @param {object} options
* @param {API.UcantoInterface.Principal} options.id
* @param {API.UcantoInterface.Transport.Channel<API.AggregatorService>} options.channel
* @param {API.UcantoInterface.OutboundCodec} [options.codec]
*/
export const connect = ({ id, channel, codec = CAR.outbound }) =>
Client.connect({
id,
channel,
codec,
})