Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Recording Static Resources #7886

Merged
merged 4 commits into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/client-core/src/recording/RecordingService.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { RecordingResult } from '@etherealengine/common/src/interfaces/Recording'
import { IKSerialization } from '@etherealengine/engine/src/avatar/IKSerialization'
import { Engine } from '@etherealengine/engine/src/ecs/classes/Engine'
import { ECSRecordingActions } from '@etherealengine/engine/src/ecs/ECSRecording'
import { mocapDataChannelType } from '@etherealengine/engine/src/mocap/MotionCaptureSystem'
import { webcamVideoDataChannelType } from '@etherealengine/engine/src/networking/NetworkState'
import { PhysicsSerialization } from '@etherealengine/engine/src/physics/PhysicsSerialization'
import { createActionQueue, defineState, getMutableState, getState, removeActionQueue } from '@etherealengine/hyperflux'

import { API } from '../API'
import { NotificationService } from '../common/services/NotificationService'

export const RecordingState = defineState({
Expand Down Expand Up @@ -41,7 +41,7 @@ export const RecordingFunctions = {
if (state.config.video) {
schema.push(webcamVideoDataChannelType)
}
const recording = (await API.instance.client.service('recording').create({
const recording = (await Engine.instance.api.service('recording').create({
schema: JSON.stringify(schema)
})) as RecordingResult
return recording.id
Expand All @@ -50,7 +50,7 @@ export const RecordingFunctions = {
}
},
getRecordings: async () => {
const recordings = (await API.instance.client.service('recording').find()).data as RecordingResult[]
const recordings = (await Engine.instance.api.service('recording').find()).data as RecordingResult[]
const recordingState = getMutableState(RecordingState)
recordingState.recordings.set(recordings)
}
Expand Down
4 changes: 4 additions & 0 deletions packages/common/src/dbmodels/Recording.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ export interface RecordingInterface {
ended: boolean
schema: Array<string>
}

export interface RecordingResourceInterface {
id: string
}
7 changes: 7 additions & 0 deletions packages/common/src/interfaces/Recording.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,11 @@ export interface RecordingResult {
userId: UserId
ended: boolean
schema: string
resources?: Array<string> // storage provider keys
}

export interface RecordingResourceResult {
id: string
recordingId: string
staticResourceId: string
}
42 changes: 24 additions & 18 deletions packages/instanceserver/src/MediaRecordingFunctions.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { createHash } from 'crypto'
import { Router } from 'mediasoup/node/lib/types'

import { PeerID } from '@etherealengine/common/src/interfaces/PeerID'
Expand All @@ -12,10 +13,10 @@ import {
webcamVideoDataChannelType
} from '@etherealengine/engine/src/networking/NetworkState'
import { localConfig } from '@etherealengine/server-core/src/config'
import { getStorageProvider } from '@etherealengine/server-core/src/media/storageprovider/storageprovider'
import serverLogger from '@etherealengine/server-core/src/ServerLogger'

import { startFFMPEG } from './FFMPEG'
import { uploadRecordingStaticResource } from './ServerRecordingSystem'
import { SocketWebRTCServerNetwork } from './SocketWebRTCServerFunctions'

const logger = serverLogger.child({ module: 'instanceserver:MediaRecording' })
Expand Down Expand Up @@ -84,7 +85,11 @@ export type MediaTrackPair = {
audioConsumer?: any
}

export const startMediaRecordingPair = async (peerID: PeerID, mediaType: string, tracks: MediaTrackPair) => {
export const startMediaRecordingPair = async (
peerID: PeerID,
mediaType: 'webcam' | 'screenshare',
tracks: MediaTrackPair
) => {
const network = Engine.instance.mediaNetwork as SocketWebRTCServerNetwork

const promises = [] as Promise<any>[]
Expand Down Expand Up @@ -139,7 +144,7 @@ export const startMediaRecordingPair = async (peerID: PeerID, mediaType: string,
}

/** start ffmpeg */
const isH264 = !!tracks.video?.encodings.find((encoding) => encoding.mimeType === 'video/h264')
const isH264 = !!tracks.video && !!tracks.video?.encodings.find((encoding) => encoding.mimeType === 'video/h264')
const ffmpegProcess = await startFFMPEG(!!tracks.audio, !!tracks.video, onExit, isH264)

/** resume consumers */
Expand All @@ -156,7 +161,7 @@ export const startMediaRecordingPair = async (peerID: PeerID, mediaType: string,
stream: ffmpegProcess.stream,
peerID,
mediaType,
format: isH264 ? 'h264' : 'vp8'
format: isH264 ? 'h264' : ((tracks.video ? 'vp8' : 'mp3') as 'h264' | 'vp8' | 'mp3')
}
}

Expand Down Expand Up @@ -196,28 +201,29 @@ export const startMediaRecording = async (recordingID: string, userID: UserId, m

for (const [peer, media] of Object.entries(mediaStreams)) {
for (const [mediaType, tracks] of Object.entries(media)) {
recordingPromises.push(startMediaRecordingPair(peer as PeerID, mediaType, tracks))
recordingPromises.push(startMediaRecordingPair(peer as PeerID, mediaType as 'webcam' | 'screenshare', tracks))
}
}

const recordings = await Promise.all(recordingPromises)

const storageprovider = getStorageProvider()

const activeUploads = recordings.map((recording) => {
const stream = recording.stream
const upload = storageprovider.putObject({
Key:
'recordings/' +
recordingID +
'/' +
recording.peerID +
'-' +
recording.mediaType +
(recording.format === 'vp8' ? '.webm' : '.mp4'),
Body: stream,
ContentType: recording.format === 'vp8' ? 'video/webm' : 'video/mp4'
const format = recording.format === 'mp3' ? 'audio/opus' : recording.format === 'vp8' ? 'video/webm' : 'video/mp4'
const ext = recording.format === 'mp3' ? 'mp3' : recording.format === 'vp8' ? 'webm' : 'mp4'
const key = `recordings/${recordingID}/${recording.peerID}-${recording.mediaType}.${ext}`

const upload = uploadRecordingStaticResource({
recordingID,
key,
body: stream,
mimeType: format,
staticResourceType: recording.format === 'mp3' ? 'audio' : 'video',
hash: createHash('sha3-256').update(key.split('/').pop()!.split('.')[0]).digest('hex')
}).then(() => {
logger.info('Uploaded media file' + key)
})

return upload
})

Expand Down
103 changes: 75 additions & 28 deletions packages/instanceserver/src/ServerRecordingSystem.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { decode, encode } from 'msgpackr'
import { PassThrough } from 'stream'

import { EntityUUID } from '@etherealengine/common/src/interfaces/EntityUUID'
import { PeerID } from '@etherealengine/common/src/interfaces/PeerID'
import { RecordingResult } from '@etherealengine/common/src/interfaces/Recording'
import { StaticResourceInterface } from '@etherealengine/common/src/interfaces/StaticResourceInterface'
import { UserId } from '@etherealengine/common/src/interfaces/UserId'
import multiLogger from '@etherealengine/common/src/logger'
import { Engine } from '@etherealengine/engine/src/ecs/classes/Engine'
Expand Down Expand Up @@ -30,6 +32,7 @@ import { Application } from '@etherealengine/server-core/declarations'
import { checkScope } from '@etherealengine/server-core/src/hooks/verify-scope'
import { getStorageProvider } from '@etherealengine/server-core/src/media/storageprovider/storageprovider'
import { StorageObjectInterface } from '@etherealengine/server-core/src/media/storageprovider/storageprovider.interface'
import { createStaticResourceHash } from '@etherealengine/server-core/src/media/upload-asset/upload-asset.service'

import { startMediaRecording } from './MediaRecordingFunctions'
import { getServerNetwork, SocketWebRTCServerNetwork } from './SocketWebRTCServerFunctions'
Expand Down Expand Up @@ -60,11 +63,46 @@ export const activeRecordings = new Map<string, ActiveRecording>()
export const activePlaybacks = new Map<string, ActivePlayback>()

export const dispatchError = (error: string, targetUser: UserId) => {
const app = Engine.instance.api as Application as Application
const app = Engine.instance.api as Application
logger.error('Recording Error: ' + error)
dispatchAction(ECSRecordingActions.error({ error, $to: targetUser, $topic: getServerNetwork(app).topic }))
}

export const uploadRecordingStaticResource = async (props: {
recordingID: string
key: string
body: Buffer | PassThrough
mimeType: string
staticResourceType: string
hash: string
}) => {
const app = Engine.instance.api as Application
const storageProvider = getStorageProvider()

const upload = await storageProvider.putObject({
Key: props.key,
Body: props.body,
ContentType: props.mimeType
})

const staticResource = (await app.service('static-resource').create(
{
hash: props.hash,
key: props.key,
mimeType: props.mimeType,
staticResourceType: props.staticResourceType
},
{ isInternal: true }
)) as StaticResourceInterface

await app.service('recording-resource').create({
staticResourceId: staticResource.id,
recordingId: props.recordingID
})

return upload
}

const mediaDataChannels = [
webcamVideoDataChannelType,
webcamAudioDataChannelType,
Expand All @@ -73,7 +111,7 @@ const mediaDataChannels = [
]

export const onStartRecording = async (action: ReturnType<typeof ECSRecordingActions.startRecording>) => {
const app = Engine.instance.api as Application as Application
const app = Engine.instance.api as Application

console.log('onStartRecording', action)

Expand Down Expand Up @@ -136,28 +174,33 @@ export const onStartRecording = async (action: ReturnType<typeof ECSRecordingAct
schema: serializationSchema,
chunkLength,
onCommitChunk(chunk, chunkIndex) {
storageProvider
.putObject({
Key: 'recordings/' + recording.id + '/entities-' + chunkIndex + '.ee',
Body: encode(chunk),
ContentType: 'application/octet-stream'
})
.then(() => {
logger.info('Uploaded entities chunk', chunkIndex)
})
const key = 'recordings/' + recording.id + '/entities-' + chunkIndex + '.ee'
const buffer = encode(chunk)
uploadRecordingStaticResource({
recordingID: recording.id,
key,
body: buffer,
mimeType: 'application/octet-stream',
staticResourceType: 'data',
hash: createStaticResourceHash(buffer, { assetURL: key })
}).then(() => {
logger.info('Uploaded entities chunk', chunkIndex)
})

for (const [dataChannel, data] of dataChannelsRecording.entries()) {
if (data.length) {
const count = chunkIndex
storageProvider
.putObject({
Key: 'recordings/' + recording.id + '/' + dataChannel + '-' + chunkIndex + '.ee',
Body: encode(data),
ContentType: 'application/octet-stream'
})
.then(() => {
logger.info('Uploaded raw chunk', count)
})
const key = 'recordings/' + recording.id + '/' + dataChannel + '-' + chunkIndex + '.ee'
const buffer = encode(data)
uploadRecordingStaticResource({
recordingID: recording.id,
key,
body: buffer,
mimeType: 'application/octet-stream',
staticResourceType: 'data',
hash: createStaticResourceHash(buffer, { assetURL: key })
}).then(() => {
logger.info('Uploaded raw chunk', chunkIndex)
})
}
dataChannelsRecording.set(dataChannel, [])
}
Expand Down Expand Up @@ -243,7 +286,7 @@ export const onStopRecording = async (action: ReturnType<typeof ECSRecordingActi
export const onStartPlayback = async (action: ReturnType<typeof ECSRecordingActions.startPlayback>) => {
const app = Engine.instance.api as Application

const recording = (await app.service('recording').get(action.recordingID)) as RecordingResult
const recording = (await app.service('recording').get(action.recordingID, { internal: true })) as RecordingResult

const isClone = !action.targetUser

Expand All @@ -257,6 +300,8 @@ export const onStartPlayback = async (action: ReturnType<typeof ECSRecordingActi
const hasScopes = await checkScope(user, app, 'recording', 'read')
if (!hasScopes) return dispatchError('User does not have record:read scope', recording.userId)

if (!recording.resources?.length) return dispatchError('Recording has no resources', recording.userId)

const schema = JSON.parse(recording.schema) as string[]

const activePlayback = {
Expand All @@ -265,23 +310,25 @@ export const onStartPlayback = async (action: ReturnType<typeof ECSRecordingActi

const storageProvider = getStorageProvider()

const files = await storageProvider.listObjects('recordings/' + action.recordingID + '/', true)
const entityFiles = recording.resources.filter((key) => key.includes('entities-'))

const entityFiles = files.Contents.filter((file) => file.Key.includes('entities-'))
const rawFiles = recording.resources.filter(
(key) => !key.includes('entities-') && !new RegExp(mediaDataChannels.join('|')).test(key)
)

const rawFiles = files.Contents.filter((file) => !file.Key.includes('entities-'))
const mediaFiles = recording.resources.filter((key) => new RegExp(mediaDataChannels.join('|')).test(key))

const entityChunks = (await Promise.all(entityFiles.map((file) => storageProvider.getObject(file.Key)))).map((data) =>
const entityChunks = (await Promise.all(entityFiles.map((key) => storageProvider.getObject(key)))).map((data) =>
decode(data.Body)
)

const dataChannelChunks = new Map<DataChannelType, DataChannelFrame<any>[][]>()

await Promise.all(
rawFiles.map(async (file) => {
const dataChannel = file.Key.split('/')[2].split('-')[0] as DataChannelType
const dataChannel = file.split('/')[2].split('-')[0] as DataChannelType
if (!dataChannelChunks.has(dataChannel)) dataChannelChunks.set(dataChannel, [])
const data = await storageProvider.getObject(file.Key)
const data = await storageProvider.getObject(file)
dataChannelChunks.get(dataChannel)!.push(decode(data.Body))
})
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ const uploadLOD = async (file: Buffer, mimeType: string, key: string, storagePro
)
}

export const createStaticResourceHash = (file: Buffer, props: { name?: string; assetURL?: string }) => {
return createHash('sha3-256')
.update(file.length.toString())
.update(props.name || props.assetURL!.split('/').pop()!.split('.')[0])
.digest('hex')
}

export const addGenericAssetToS3AndStaticResources = async (
app: Application,
file: Buffer,
Expand All @@ -87,12 +94,7 @@ export const addGenericAssetToS3AndStaticResources = async (

let promises: Promise<any>[] = []
const assetURL = getCachedURL(key, provider.cacheDomain)
const hash =
args.hash ||
createHash('sha3-256')
.update(file.length.toString())
.update(args.name || assetURL.split('/').pop()!.split('.')[0])
.digest('hex')
const hash = args.hash || createStaticResourceHash(file, { name: args.name, assetURL })
if (!args.numLODs) args.numLODs = 1
const body = {
hash,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { SequelizeServiceOptions, Service } from 'feathers-sequelize'

import { RecordingResourceResult } from '@etherealengine/common/src/interfaces/Recording'

import { Application } from '../../../declarations'

export type RecordingResourceDataType = RecordingResourceResult

export class RecordingResource<T = RecordingResourceDataType> extends Service<T> {
app: Application
docs: any
constructor(options: Partial<SequelizeServiceOptions>, app: Application) {
super(options)
this.app = app
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/**
* An object for swagger documentation configuration
*/
export default {
definitions: {
recordingResource: {
type: 'object',
properties: {}
}
},
securities: ['create', 'update', 'patch', 'remove'],
operations: {
find: {
security: [{ bearer: [] }]
}
}
}
Loading