Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async read and write #10

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bench.mojo
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fn main():
try:
var server = SysServer()
let handler = TechEmpowerRouter()
server.listen_and_serve("0.0.0.0:8080", handler)
server.listen_and_serve_async("0.0.0.0:8080", handler)
except e:
print("Error starting server: " + e.__str__())
return
Expand Down
432 changes: 432 additions & 0 deletions external/b64.mojo

Large diffs are not rendered by default.

104 changes: 104 additions & 0 deletions lightbug_http/io/bytes.mojo
Original file line number Diff line number Diff line change
@@ -1,6 +1,110 @@
from python import PythonObject
from base64 import b64encode
from memory.unsafe import bitcast

alias ByteDType = DType.int8
alias Bytes = DynamicVector[Int8]
alias Byte = Int8


fn to_bytes(string: String) -> Bytes:
return b64encode(string)._buffer


fn to_bytes[type: DType, nelts: Int = 1](simd: SIMD[type, nelts]) -> Bytes:
let simd_bytes = bitcast[ByteDType, nelts * sizeof[type](), type, nelts](simd)

var bytes = Bytes(nelts * sizeof[type]())

@unroll
for i in range(nelts * sizeof[type]()):
bytes.append(simd_bytes[i])

return bytes


fn to_string(bytes: Bytes) -> String:
return b64decode(bytes)


fn rstrip_unsafe(content: String, chars: String = " ") -> String:
var strip_pos: Int = len(content)
for i in range(len(content)):
let c = content[len(content) - i - 1 : len(content) - i]
if chars.find(c) == -1:
strip_pos = len(content) - i
break

return content[:strip_pos]


# Temporary until stdlib base64 decode is implemented
fn b64decode(s: String) -> String:
alias base64: String = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"

let padding = s.count("=")

let s_strip = rstrip_unsafe(s, "=")

# base64 decode
var binary_string: String = ""
for i in range(len(s_strip)):
let index: Byte = base64.find(s_strip[i : i + 1])
binary_string += byte_to_binary_string(index)

if padding:
binary_string = binary_string[: -padding * 2]

var decoded_string: String = ""
for i in range(0, len(binary_string), 8):
let byte = binary_string[i : i + 8]
decoded_string += chr(binary_string_to_byte(byte).to_int())

return decoded_string


fn byte_to_binary_string(byte: Byte) -> String:
var binary_string: String = ""
for i in range(8):
let bit = (byte >> i) & 1
binary_string += String(bit)

# find significant bits
var significant_binary_string: String = ""
var found_significant_bit: Bool = False
for i in range(len(binary_string)):
let bit = binary_string[len(binary_string) - i - 1 : len(binary_string) - i]
if bit == "1":
found_significant_bit = True
if found_significant_bit:
significant_binary_string += bit

# left pad to 6 bits if less than 6 bits
if len(significant_binary_string) < 6:
let padding = 6 - len(significant_binary_string)
for i in range(padding):
significant_binary_string = "0" + significant_binary_string

return significant_binary_string


fn binary_string_to_byte(binary_string: String) -> Byte:
var total = 0
let length = len(binary_string)
for i in range(length):
# Get the value at the current position (0 or 1)
let bit = binary_string[length - 1 - i]

let bit_value: Int
if bit == "1":
bit_value = 1
else:
bit_value = 0

# Add to the total, considering its position (2^i)
total += bit_value * (2**i)

return total


@value
Expand Down
9 changes: 9 additions & 0 deletions lightbug_http/strings.mojo
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ alias rChar = String("\r")._buffer
alias nChar = String("\n")._buffer


struct S[lifetime: MutLifetime]:
var s: Reference[String, __mlir_attr.`1: i1`, lifetime] # zero is immut

fn __init__(
inout self, s: Reference[String, __mlir_attr.`1: i1`, lifetime]
) -> None:
self.s = s


# TODO: tuples don't work with strings in Mojo currently, to be replaced with a tuple
@value
struct TwoLines:
Expand Down
36 changes: 34 additions & 2 deletions lightbug_http/sys/net.mojo
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ from lightbug_http.net import (
default_tcp_keep_alive,
)
from lightbug_http.strings import NetworkType
from lightbug_http.io.bytes import Bytes
from lightbug_http.io.bytes import Bytes, to_string, to_bytes, b64decode
from lightbug_http.io.sync import Duration
from external.libc import (
c_void,
Expand All @@ -24,6 +24,7 @@ from external.libc import (
SOCK_STREAM,
SOL_SOCKET,
SO_REUSEADDR,
O_NONBLOCK,
SHUT_RDWR,
htons,
inet_pton,
Expand All @@ -38,6 +39,9 @@ from external.libc import (
shutdown,
close,
)
from external.b64 import encode as b64_encode

# from external.b64 import decode as b64_decode


@value
Expand All @@ -57,7 +61,7 @@ struct SysListener(Listener):
self.__addr = addr
self.fd = fd

@always_inline
# @always_inline
fn accept[T: Connection](self) raises -> T:
let their_addr_ptr = Pointer[sockaddr].alloc(1)
var sin_size = socklen_t(sizeof[socklen_t]())
Expand Down Expand Up @@ -162,12 +166,40 @@ struct SysConnection(Connection):
buf = bytes_str._buffer
return bytes_recv

async fn read_async(self, inout buf: Bytes) raises -> Int:
@parameter
async fn task() -> Int:
try:
_ = self.read(buf)
return buf[0].__int__()
except e:
print("Failed to read from connection: " + e.__str__())
return -1

let routine: Coroutine[Int] = task()
return await routine

fn write(self, buf: Bytes) raises -> Int:
let msg = String(buf)
print("Sending response: " + msg)
if send(self.fd, to_char_ptr(msg).bitcast[c_void](), len(msg), 0) == -1:
print("Failed to send response")
return len(buf)

# This has to be a def for now because of a weird bug in the Mojo compiler
async def write_async(self, buf: Bytes) -> Int:
@parameter
async def task(task_buf: Bytes) -> Int:
try:
let write_len = self.write(task_buf)
return write_len
except e:
print("Failed to write to connection: " + e.__str__())
return -1

let routine: RaisingCoroutine[Int] = task(buf)
return await routine

fn close(self) raises:
_ = shutdown(self.fd, SHUT_RDWR)
let close_status = close(self.fd)
Expand Down
86 changes: 83 additions & 3 deletions lightbug_http/sys/server.mojo
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ from lightbug_http.header import RequestHeader
from lightbug_http.sys.net import SysListener, SysConnection, SysNet
from lightbug_http.service import HTTPService
from lightbug_http.io.sync import Duration
from lightbug_http.io.bytes import Bytes
from lightbug_http.io.bytes import Bytes, to_bytes, to_string
from lightbug_http.error import ErrorHandler
from lightbug_http.strings import next_line, NetworkType
from lightbug_http.strings import next_line, NetworkType, S
from external.b64 import encode as b64_encode


struct SysServer:
Expand Down Expand Up @@ -54,10 +55,89 @@ struct SysServer:
let listener = __net.listen(NetworkType.tcp4.value, address)
self.serve(listener, handler)

fn serve[T: HTTPService](inout self, ln: SysListener, handler: T) raises -> None:
fn listen_and_serve_async[
T: HTTPService
](inout self, address: String, handler: T) raises -> None:
var __net = SysNet()
let listener = __net.listen(NetworkType.tcp4.value, address)
self.serve_async(listener, handler)

fn serve_async[
T: HTTPService
](inout self, ln: SysListener, handler: T) raises -> None:
self.ln = ln
# let max_worker_count = self.get_concurrency()
# TODO: logic for non-blocking read and write here, see for example https://github.com/valyala/fasthttp/blob/9ba16466dfd5d83e2e6a005576ee0d8e127457e2/server.go#L1789

async fn handle_connection(conn: SysConnection, handler: T) -> None:
var buf = Bytes()
try:
let read_len = await conn.read_async(buf)
except e:
try:
conn.close()
except e:
print("Failed to close connection")
print("Failed to read from connection")
try:
let first_line_and_headers = next_line(buf)
let request_line = first_line_and_headers.first_line
let rest_of_headers = first_line_and_headers.rest

var uri = URI(request_line)
try:
uri.parse()
except:
try:
conn.close()
except e:
print("Failed to close connection")
print("Failed to parse request line")

var header = RequestHeader(buf)
try:
header.parse()
except:
try:
conn.close()
except e:
print("Failed to close connection")
print("Failed to parse request header")

let res = handler.func(
HTTPRequest(
uri,
buf,
header,
)
)
var res_encoded = encode(res)
try:
_ = await conn.write_async(res_encoded)
except e:
print("Ooph! " + e.__str__())
try:
conn.close()
except e:
print("Failed to close connection")
print("Failed to read from connection")
try:
conn.close()
except e:
print("Failed to close connection")
except e:
print("Failed to parse request line")
try:
conn.close()
except e:
print("Failed to close connection")

while True:
let conn = self.ln.accept[SysConnection]()
let coroutine: Coroutine[NoneType] = handle_connection(conn, handler)
_ = coroutine() # Execute the coroutine synchronously

fn serve[T: HTTPService](inout self, ln: SysListener, handler: T) raises -> None:
self.ln = ln

while True:
Expand Down
2 changes: 1 addition & 1 deletion lightbug_http/tests/utils.mojo
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ from lightbug_http.client import Client

alias default_server_host = "localhost"
alias default_server_port = 8080
alias default_server_conn_string = "http://" + default_server_host + ":" + default_server_port.__str__()
alias default_server_conn_string = String("http://localhost:8080")
saviorand marked this conversation as resolved.
Show resolved Hide resolved

alias getRequest = String(
"GET /foobar?baz HTTP/1.1\r\nHost: google.com\r\nUser-Agent: aaa/bbb/ccc/ddd/eee"
Expand Down
1 change: 0 additions & 1 deletion lightbug_http/uri.mojo
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ from lightbug_http.strings import (
)


# TODO: convenience type, not currently used properly but will be helpful in the future
@value
struct URI:
var __path_original: Bytes
Expand Down