Skip to content

Commit

Permalink
Merge pull request #139 from textileio/asutula/stage-chunks
Browse files Browse the repository at this point in the history
Send chunked data in stage
  • Loading branch information
asutula authored Sep 29, 2020
2 parents e9ee5b4 + c247d79 commit 1677d8a
Show file tree
Hide file tree
Showing 6 changed files with 417 additions and 32 deletions.
1 change: 1 addition & 0 deletions modules.d.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
declare module 'ipfs-http-client'
declare module 'wait-on'
declare module 'it-block'
77 changes: 60 additions & 17 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@
}
},
"dependencies": {
"@improbable-eng/grpc-web-node-http-transport": "^0.13.0",
"@textile/grpc-powergate-client": "0.6.2",
"ipfs-http-client": "^45.0.0"
"@textile/grpc-powergate-client": "0.6.8",
"@textile/grpc-transport": "0.0.3",
"ipfs-http-client": "^45.0.0",
"it-block": "^2.0.0"
}
}
28 changes: 20 additions & 8 deletions src/ffs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ import {
} from "@textile/grpc-powergate-client/dist/ffs/rpc/rpc_pb_service"
import fs from "fs"
import ipfsClient from "ipfs-http-client"
import block from "it-block"
import path from "path"
import { Config } from "../types"
import { promise } from "../util"
import { File, normaliseInput } from "./normalize"
import {
GetFolderOptions,
ListDealRecordsOptions,
Expand Down Expand Up @@ -553,9 +555,11 @@ export const createFFS = (
)
},

stage: (input: Uint8Array) => {
// TODO: figure out how to stream data in here, or at least stream to the server
return new Promise<StageResponse.AsObject>((resolve, reject) => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
stage: async (input: any) => {
// Only process the first input if there are more than one
const source: File | undefined = (await normaliseInput(input).next()).value
return new Promise<StageResponse.AsObject>(async (resolve, reject) => {
const client = grpc.client(RPCService.Stage, config)
client.onMessage((message) => {
resolve(message.toObject() as StageResponse.AsObject)
Expand All @@ -567,11 +571,19 @@ export const createFFS = (
reject("ended with no message")
}
})
client.start(getMeta())
const req = new StageRequest()
req.setChunk(input)
client.send(req)
client.finishSend()
if (source?.content) {
client.start(getMeta())
const process = await block({ size: 32000, noPad: true })
for await (const chunk of process(source.content)) {
const buf = chunk.slice()
const req = new StageRequest()
req.setChunk(buf as Buffer)
client.send(req)
}
client.finishSend()
} else {
reject(new Error("no content to stage"))
}
})
},

Expand Down
Loading

0 comments on commit 1677d8a

Please sign in to comment.