Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Fixed bugs in instanceserver connections #10887

Merged
merged 3 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 86 additions & 27 deletions packages/client-core/src/transports/SocketWebRTCClientFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import { getSearchParamFromURL } from '@etherealengine/common/src/utils/getSearc
import { Engine } from '@etherealengine/ecs/src/Engine'
import { defineSystem, destroySystem } from '@etherealengine/ecs/src/SystemFunctions'
import { PresentationSystemGroup } from '@etherealengine/ecs/src/SystemGroups'
import { AuthTask } from '@etherealengine/engine/src/avatar/functions/receiveJoinWorld'
import { AuthTask, ReadyTask } from '@etherealengine/engine/src/avatar/functions/receiveJoinWorld'
import { Identifiable, PeerID, State, dispatchAction, getMutableState, getState, none } from '@etherealengine/hyperflux'
import {
Action,
Expand Down Expand Up @@ -98,6 +98,8 @@ import {
stopFaceTracking,
stopLipsyncTracking
} from '../media/webcam/WebcamInput'
import { ChannelState } from '../social/services/ChannelService'
import { LocationState } from '../social/services/LocationService'
import { AuthState } from '../user/services/AuthService'
import { MediaStreamState, MediaStreamService as _MediaStreamService } from './MediaStreams'
import { clearPeerMediaChannels } from './PeerMediaChannelState'
Expand Down Expand Up @@ -221,7 +223,7 @@ export const connectToInstance = (
if (instanceStillProvisioned(instanceID, locationID, channelID)) _connect()
}, 3000)

const onConnect = () => {
const onConnect = async () => {
if (aborted || !primus) return
connecting = false
primus.off('incoming::open', onConnect)
Expand All @@ -230,31 +232,49 @@ export const connectToInstance = (
clearTimeout(connectionFailTimeout)

const topic = locationID ? NetworkTopics.world : NetworkTopics.media
authenticatePrimus(primus, instanceID, topic)

/** Server closed the connection. */
const onDisconnect = () => {
if (aborted) return
if (primus) {
primus.off('incoming::end', onDisconnect)
primus.off('end', onDisconnect)
const instanceserverReady = await checkInstanceserverReady(primus, instanceID, topic)
if (instanceserverReady) {
await authenticatePrimus(primus, instanceID, topic)

/** Server closed the connection. */
const onDisconnect = () => {
if (aborted) return
if (primus) {
primus.off('incoming::end', onDisconnect)
primus.off('end', onDisconnect)
}
const network = getState(NetworkState).networks[instanceID] as SocketWebRTCClientNetwork
if (!network) return logger.error('Disconnected from unconnected instance ' + instanceID)

logger.info('Disconnected from network %o', { topic: network.topic, id: network.id })
/**
* If we are disconnected (server closes our socket) rather than leave the network,
* we just need to destroy and recreate the transport
*/
closeNetwork(network)
/** If we still have the instance provisioned, we should try again */
if (instanceStillProvisioned(instanceID, locationID, channelID)) _connect()
}
// incoming::end is emitted when the server closes the connection
primus.on('incoming::end', onDisconnect)
// end is emitted when the client closes the connection
primus.on('end', onDisconnect)
} else {
if (locationID) {
const currentLocation = getMutableState(LocationState).currentLocation.location
const currentLocationId = currentLocation.id.value
currentLocation.id.set(undefined as unknown as LocationID)
currentLocation.id.set(currentLocationId)
} else {
const channelState = getMutableState(ChannelState)
const targetChannelId = channelState.targetChannelId.value
channelState.targetChannelId.set(undefined as unknown as ChannelID)
channelState.targetChannelId.set(targetChannelId)
}
const network = getState(NetworkState).networks[instanceID] as SocketWebRTCClientNetwork
if (!network) return logger.error('Disconnected from unconnected instance ' + instanceID)

logger.info('Disonnected from network %o', { topic: network.topic, id: network.id })
/**
* If we are disconnected (server closes our socket) rather than leave the network,
* we just need to destroy and recreate the transport
*/
closeNetwork(network)
/** If we still have the instance provisioned, we should try again */
if (instanceStillProvisioned(instanceID, locationID, channelID)) _connect()
primus.removeAllListeners()
primus.end()
console.log('PRIMUS GONE')
}
// incoming::end is emitted when the server closes the connection
primus.on('incoming::end', onDisconnect)
// end is emitted when the client closes the connection
primus.on('end', onDisconnect)
}
primus!.on('incoming::open', onConnect)
}
Expand Down Expand Up @@ -283,6 +303,45 @@ export const getChannelIdFromTransport = (network: SocketWebRTCClientNetwork) =>
return isWorldConnection ? null : currentChannelInstanceConnection?.channelId
}

export async function checkInstanceserverReady(primus: Primus, instanceID: InstanceID, topic: Topic) {
logger.info('Checking that instanceserver is ready')
const { instanceReady } = await new Promise<ReadyTask>((resolve) => {
const onStatus = (response: ReadyTask) => {
// eslint-disable-next-line no-prototype-builtins
if (response.hasOwnProperty('instanceReady')) {
clearInterval(interval)
resolve(response)
primus.off('data', onStatus)
primus.removeListener('incoming::end', onDisconnect)
}
}

primus.on('data', onStatus)

let disconnected = false
const interval = setInterval(() => {
if (disconnected) {
clearInterval(interval)
resolve({ instanceReady: false })
primus.removeAllListeners()
primus.end()
return
}
}, 100)

const onDisconnect = () => {
disconnected = true
}
primus.addListener('incoming::end', onDisconnect)
})

if (!instanceReady) {
unprovisionInstance(topic, instanceID)
}

return instanceReady
}

export async function authenticatePrimus(primus: Primus, instanceID: InstanceID, topic: Topic) {
logger.info('Authenticating instance ' + instanceID)

Expand Down Expand Up @@ -325,10 +384,10 @@ export async function authenticatePrimus(primus: Primus, instanceID: InstanceID,
/** We failed to connect to be authenticated, we do not want to try again */
// TODO: do we want to unprovision here?
unprovisionInstance(topic, instanceID)
return logger.error(new Error('Unable to connect with credentials' + error))
return logger.error(new Error('Unable to connect with credentials ' + error))
}

connectToNetwork(primus, instanceID, topic, hostPeerID!, routerRtpCapabilities!, cachedActions!)
await connectToNetwork(primus, instanceID, topic, hostPeerID!, routerRtpCapabilities!, cachedActions!)
}

export const connectToNetwork = async (
Expand Down
4 changes: 4 additions & 0 deletions packages/engine/src/avatar/functions/receiveJoinWorld.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ export type AuthTask = {
error?: AuthError
}

export type ReadyTask = {
instanceReady: boolean
}

export type JoinWorldRequestData = {
inviteCode?: InviteCode
}
Expand Down
18 changes: 16 additions & 2 deletions packages/instanceserver/src/SocketFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import { getServerNetwork } from './SocketWebRTCServerFunctions'

const logger = multiLogger.child({ component: 'instanceserver:spark' })

const NON_READY_INTERVALS = 100 //100 tenths of a second, i.e. 10 seconds

export const setupSocketFunctions = async (app: Application, spark: any) => {
let authTask: AuthTask | undefined

Expand All @@ -50,15 +52,27 @@ export const setupSocketFunctions = async (app: Application, spark: any) => {
*
* Authorize user and make sure everything is valid before allowing them to join the world
**/
await new Promise<void>((resolve) => {
const ready = await new Promise<boolean>((resolve) => {
let counter = 0
const interval = setInterval(() => {
counter++
if (getState(InstanceServerState).ready) {
clearInterval(interval)
resolve()
resolve(true)
}
if (counter > NON_READY_INTERVALS) {
clearInterval(interval)
resolve(false)
}
}, 100)
})

if (!ready) {
app.primus.write({ instanceReady: false })
return
}

app.primus.write({ instanceReady: true })
const network = getServerNetwork(app)

const onAuthenticationRequest = async (data) => {
Expand Down
10 changes: 7 additions & 3 deletions packages/instanceserver/src/channels.ts
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,13 @@ const updateInstance = async ({
if (isNeedingNewServer && !instanceStarted) {
instanceStarted = true
const initialized = await initializeInstance({ app, status, headers, userId })
if (initialized) await loadEngine({ app, sceneId, headers })
else instanceStarted = false
return true
if (initialized) {
await loadEngine({ app, sceneId, headers })
return true
} else {
instanceStarted = false
return false
}
} else {
try {
if (!getState(InstanceServerState).ready)
Expand Down
Loading