-
Notifications
You must be signed in to change notification settings - Fork 14
/
unreal_engine.py
68 lines (49 loc) · 1.67 KB
/
unreal_engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#compatability script
import shared_globals as g
import asyncio
def run_on_sio(future):
if future == None:
pass
asyncio.run_coroutine_threadsafe(future, g.sio_loop)
# if g.sio_loop == None:
# pass
#g.sio_loop.call_soon_threadsafe(lambda: await future)
#if(asyncio.get_event_loop() != None):
#asyncio.run_coroutine_threadsafe(future, g.sio.eio.event_loop)
#asyncio.run_coroutine_threadsafe(future, future.get_loop())
def sio_future():
#return g.sio_loop.create_future()
return asyncio.get_event_loop().create_future()
def emit_wrapper(event, data):
#asyncio.get_event_loop().call_soon_threadsafe(lambda:g.sio.emit(event, data))
#g.sio_loop.call_soon_threadsafe(lambda:g.sio.emit(event, data))
future = g.sio.emit(event, data)
asyncio.run_coroutine_threadsafe(future, g.sio_loop)
#todo: fix events and emitting while in sync mode
def log(text):
print('ue.log: ', text)
if(g.sio != None):
emit_wrapper('log', str(text))
emit_wrapper('chatMessage', 'log:' + str(text))
def get_content_dir():
return './unreal/content/'
def custom_event(event, data, use_json):
if(g.sio != None):
run_on_sio(g.sio.emit('customEvent', {'event':event, 'data':data, 'useJson': use_json}))
def set_sio_link(link_sio, link_app):
print('link set')
g.sio = link_sio
g.app = link_app
g.eio = g.sio.eio
g.custom_event = custom_event
print(g.eio)
#store socket.io loop for callback scheduling, we can get it from calling thread
g.sio_loop = asyncio.get_event_loop()
def set_loop_link():
g.sio_loop = asyncio.get_event_loop()
# wrap around callbacks
def run_on_gt(callback, param=None):
#called directly
print("run_on_gt callback: " + str(param))
if(callback != None):
callback(param)