33const core = require ( 'datastore-core' )
44const ShardingStore = core . ShardingDatastore
55const Block = require ( 'ipld-block' )
6- const { cidToKey, keyToCid } = require ( './blockstore-utils' )
6+ const { cidToKey } = require ( './blockstore-utils' )
77const map = require ( 'it-map' )
8- const pipe = require ( 'it-pipe' )
8+ const drain = require ( 'it-drain' )
9+ const pushable = require ( 'it-pushable' )
910
1011module . exports = async ( filestore , options ) => {
1112 const store = await maybeWithSharding ( filestore , options )
@@ -32,6 +33,7 @@ function createBaseStore (store) {
3233 async * query ( query , options ) { // eslint-disable-line require-await
3334 yield * store . query ( query , options )
3435 } ,
36+
3537 /**
3638 * Get a single block by CID.
3739 *
@@ -41,27 +43,11 @@ function createBaseStore (store) {
4143 */
4244 async get ( cid , options ) {
4345 const key = cidToKey ( cid )
44- let blockData
45- try {
46- blockData = await store . get ( key , options )
47- return new Block ( blockData , cid )
48- } catch ( err ) {
49- if ( err . code === 'ERR_NOT_FOUND' ) {
50- const otherCid = cidToOtherVersion ( cid )
51-
52- if ( ! otherCid ) {
53- throw err
54- }
55-
56- const otherKey = cidToKey ( otherCid )
57- const blockData = await store . get ( otherKey , options )
58- await store . put ( key , blockData )
59- return new Block ( blockData , cid )
60- }
46+ const blockData = await store . get ( key , options )
6147
62- throw err
63- }
48+ return new Block ( blockData , cid )
6449 } ,
50+
6551 /**
6652 * Like get, but for more.
6753 *
@@ -74,6 +60,7 @@ function createBaseStore (store) {
7460 yield this . get ( cid , options )
7561 }
7662 } ,
63+
7764 /**
7865 * Write a single block to the store.
7966 *
@@ -86,59 +73,69 @@ function createBaseStore (store) {
8673 throw new Error ( 'invalid block' )
8774 }
8875
89- const exists = await this . has ( block . cid )
76+ const key = cidToKey ( block . cid )
77+ const exists = await store . has ( key , options )
9078
91- if ( exists ) {
92- return this . get ( block . cid , options )
79+ if ( ! exists ) {
80+ await store . put ( key , block . data , options )
9381 }
9482
95- await store . put ( cidToKey ( block . cid ) , block . data , options )
96-
9783 return block
9884 } ,
9985
10086 /**
101- * Like put, but for more.
87+ * Like put, but for more
10288 *
10389 * @param {AsyncIterable<Block>|Iterable<Block> } blocks
10490 * @param {Object } options
10591 * @returns {AsyncIterable<Block> }
10692 */
10793 async * putMany ( blocks , options ) { // eslint-disable-line require-await
108- yield * pipe (
109- blocks ,
110- ( source ) => {
111- // turn them into a key/value pair
112- return map ( source , ( block ) => {
113- return { key : cidToKey ( block . cid ) , value : block . data }
114- } )
115- } ,
116- ( source ) => {
117- // put them into the datastore
118- return store . putMany ( source , options )
119- } ,
120- ( source ) => {
121- // map the returned key/value back into a block
122- return map ( source , ( { key, value } ) => {
123- return new Block ( value , keyToCid ( key ) )
124- } )
94+ // we cannot simply chain to `store.putMany` because we convert a CID into
95+ // a key based on the multihash only, so we lose the version & codec and
96+ // cannot give the user back the CID they used to create the block
97+ // nb. we want to use `store.putMany` here so bitswap can control batching
98+ // up block HAVEs to send to the network - if we use multiple `store.put`s
99+ // it will not be able to guess we are about to `store.put` more blocks
100+ const output = pushable ( )
101+
102+ setImmediate ( async ( ) => {
103+ try {
104+ await drain ( store . putMany ( async function * ( ) {
105+ for await ( const block of blocks ) {
106+ const key = cidToKey ( block . cid )
107+ const exists = await store . has ( key , options )
108+
109+ if ( ! exists ) {
110+ yield { key, value : block . data }
111+ }
112+
113+ // there is an assumption here that after the yield has completed
114+ // the underlying datastore has finished writing the block
115+ output . push ( block )
116+ }
117+ } ( ) ) )
118+
119+ output . end ( )
120+ } catch ( err ) {
121+ output . end ( err )
125122 }
126- )
123+ } )
124+
125+ yield * output
127126 } ,
127+
128128 /**
129- * Does the store contain block with this cid ?
129+ * Does the store contain block with this CID ?
130130 *
131131 * @param {CID } cid
132132 * @param {Object } options
133133 * @returns {Promise<bool> }
134134 */
135- async has ( cid , options ) {
136- const exists = await store . has ( cidToKey ( cid ) , options )
137- if ( exists ) return exists
138- const otherCid = cidToOtherVersion ( cid )
139- if ( ! otherCid ) return false
140- return store . has ( cidToKey ( otherCid ) , options )
135+ async has ( cid , options ) { // eslint-disable-line require-await
136+ return store . has ( cidToKey ( cid ) , options )
141137 } ,
138+
142139 /**
143140 * Delete a block from the store
144141 *
@@ -149,6 +146,7 @@ function createBaseStore (store) {
149146 async delete ( cid , options ) { // eslint-disable-line require-await
150147 return store . delete ( cidToKey ( cid ) , options )
151148 } ,
149+
152150 /**
153151 * Delete a block from the store
154152 *
@@ -157,12 +155,9 @@ function createBaseStore (store) {
157155 * @returns {Promise<void> }
158156 */
159157 async * deleteMany ( cids , options ) { // eslint-disable-line require-await
160- yield * store . deleteMany ( ( async function * ( ) {
161- for await ( const cid of cids ) {
162- yield cidToKey ( cid )
163- }
164- } ( ) ) , options )
158+ yield * store . deleteMany ( map ( cids , cid => cidToKey ( cid ) ) , options )
165159 } ,
160+
166161 /**
167162 * Close the store
168163 *
@@ -173,11 +168,3 @@ function createBaseStore (store) {
173168 }
174169 }
175170}
176-
177- function cidToOtherVersion ( cid ) {
178- try {
179- return cid . version === 0 ? cid . toV1 ( ) : cid . toV0 ( )
180- } catch ( err ) {
181- return null
182- }
183- }
0 commit comments