Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes timeout issues and garbled output issues #3

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 41 additions & 41 deletions ghidra_jython_kernel/repl.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import signal
import subprocess
import os
import hashlib
import re
import time

from pathlib import Path
from pexpect import spawn

Expand All @@ -11,28 +15,29 @@ def execute(cmd):
# execute command
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
stdout, stderr = p.communicate()

# check status code is ok
# if it's not, will raise RuntimeError exception
if p.returncode != 0:
raise RuntimeError('"{0}" run fails, err={1}'.format(
cmd, stderr.decode('utf-8', errors='replace')))

# return stdout utf-8 string
return stdout.decode('utf-8').replace('\r\n', '').replace('\n', '')


class GhidraJythonRepl:

def __init__(self, ghidra_home=None):

# those paths come from "$GHIDRA_INSTALL_DIR/support/launch.sh"
# User must define "GHIDRA_INSTALL_DIR" for Ghidra's installation directory
# i.e. GHIDRA_INSTALL_DIR=/path/to/ghidra_9.1_PUBLIC
self.INSTALL_DIR = Path(ghidra_home or os.environ['GHIDRA_INSTALL_DIR'])

self._java_home = None
self._java_vmargs = None

# build pythonRun commandline
run_cmd = '{java_home}/bin/java {java_vmargs} -showversion -cp "{utility_jar}" \
ghidra.GhidraLauncher "ghidra.python.PythonRun"'.format(
Expand All @@ -49,8 +54,9 @@ def __init__(self, ghidra_home=None):
self.prompt2 = r'... '

# wait for first prompt
self.child.expect(self.prompt1)

self.child.expect('>>> ')
self.inital_msg = self.child.before

@property
def java_home(self):
if self._java_home is None:
Expand All @@ -65,43 +71,37 @@ def java_vmargs(self):
self.INSTALL_DIR / 'support/LaunchSupport.jar', self.INSTALL_DIR))
return self._java_vmargs

def read_output(self):
''' Read current output. '''

result = ''

# read output, expect echo content
if self.child.before.splitlines()[1:]:
out = self.child.before.splitlines()[1:]
result += '\n'.join([line for line in out if line])

return result

def _repl(self, code):
self.child.sendline(code)

# idk why tho, Ghidra's jython interpreter should wait twice
self.child.expect_exact([self.prompt1, self.prompt2])
self.child.expect_exact([self.prompt1, self.prompt2])

return self.read_output()

def repl(self, code):
''' Ghidra's Jython Interpreter REPL function. '''

code_lines = code.splitlines()

# if code has new line, should send ENTER('') at last
if '\n' in code:
code_lines.append('')

result = ''
# We could escape only key chars for efficiency, but brute force is safer and easier
# e.g., "do_code()" => exec('\\x64\\x6f\\x5f\\x63\\x6f\\x64\\x65\\x28\\x29')
hex_escaped_code = "exec('{}')".format(''.join(['\\x{:02x}'.format(ord(c)) for c in code]))

# Insert some unique line to signify completion, this should run
# eventually, even in any exceptional cases.
flag = hashlib.md5(str(time.time()).encode("ascii")).hexdigest()
completed_cmd = "print('# comp'+'lete {}')".format(flag) # plus sign injected so terminal echo wont match expect pattern

# Run command
self.child.sendline(hex_escaped_code + "\n" + completed_cmd)

# Wait for completion
exp = re.compile("# complete {}".format(flag))
self.child.expect([exp], timeout=1000*1000*1000)
result = self.child.before

# filter all control chars except newline and tab
ccfiltered = re.sub(r'[\x00-\x08\x0b-\x1F]+', '', result)
# filter our two exec/print lines
exp = re.compile('^(>>> )+(exec|print).*$', re.MULTILINE)
metafiltered = re.sub(exp, '', ccfiltered)
# filter out the completed flag
filtered = re.sub(r'# complete [0-9a-f]{32}\n','',metafiltered)

# REPL each line of code
for c in code_lines:
result += self._repl(c)

return result

# Return everything that's fit to print
return filtered

def kill(self):
self.child.kill(signal.SIGKILL)
self.child.kill(signal.SIGKILL)