Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: udp recvfrom #9418

Merged
merged 3 commits into from
Jan 6, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ Library improvements

* A new `Val{T}` type allows one to dispatch on bits-type values ([#9452])

* Added `recvfrom` to get source address of UDP packets ([#9418])

Deprecated or removed
---------------------

Expand Down
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,7 @@ export
redirect_stdin,
redirect_stdout,
recv,
recvfrom,
reset,
seek,
seekend,
Expand Down
24 changes: 21 additions & 3 deletions base/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ _bind(sock::UDPSocket, host::IPv6, port::UInt16, flags::UInt32 = uint32(0)) = cc

function bind(sock::UDPSocket, host::IPv6, port::UInt16; ipv6only = false)
@assert sock.status == StatusInit
err = _bind(sock,host,ipv6only ? UV_UDP_IPV6ONLY : 0)
err = _bind(sock,host,port, uint32(ipv6only ? UV_UDP_IPV6ONLY : 0))
if err < 0
if err != UV_EADDRINUSE && err != UV_EACCES
error(UVError("bind",err))
Expand Down Expand Up @@ -470,22 +470,40 @@ end
_recv_stop(sock::UDPSocket) = uv_error("recv_stop",ccall(:uv_udp_recv_stop,Cint,(Ptr{Void},),sock.handle))

function recv(sock::UDPSocket)
addr, data = recvfrom(sock)
data
end

function recvfrom(sock::UDPSocket)
# If the socket has not been bound, it will be bound implicitly to ::0 and a random port
if sock.status != StatusInit && sock.status != StatusOpen
error("Invalid socket state")
end
_recv_start(sock)
stream_wait(sock,sock.recvnotify)::Vector{UInt8}
stream_wait(sock,sock.recvnotify)::(Union(IPv4, IPv6), Vector{UInt8})
end


function _uv_hook_recv(sock::UDPSocket, nread::Int, buf_addr::Ptr{Void}, buf_size::UInt, addr::Ptr{Void}, flags::Int32)
# C signature documented as (*uv_udp_recv_cb)(...)
if flags & UV_UDP_PARTIAL > 0
# TODO: Decide what to do in this case. For now throw an error
c_free(buf_addr)
notify_error(sock.recvnotify,"Partial message received")
end

# need to check the address type in order to convert to a Julia IPAddr
addrout = if (addr == C_NULL)
IPv4(0)
elseif ccall(:jl_sockaddr_in_is_ip4, Cint, (Ptr{Void},), addr) == 1
IPv4(ntoh(ccall(:jl_sockaddr_host4, Uint32, (Ptr{Void},), addr)))
else
tmp = [uint128(0)]
ccall(:jl_sockaddr_host6, Uint32, (Ptr{Void}, Ptr{Uint8}), addr, pointer(tmp))
IPv6(ntoh(tmp[1]))
end
buf = pointer_to_array(convert(Ptr{UInt8},buf_addr),int(buf_size),true)
notify(sock.recvnotify,buf[1:nread])
notify(sock.recvnotify,(addrout,buf[1:nread]))
end

function _send(sock::UDPSocket,ipaddr::IPv4,port::UInt16,buf)
Expand Down
4 changes: 4 additions & 0 deletions doc/stdlib/io-network.rst
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,10 @@ Network I/O

Read a UDP packet from the specified socket, and return the bytes received. This call blocks.

.. function:: recvfrom(socket::UDPSocket) -> (address, data)

Read a UDP packet from the specified socket, returning a tuple of (address, data), where address will be either IPv4 or IPv6 as appropriate.

.. function:: setopt(sock::UDPSocket; multicast_loop = nothing, multicast_ttl=nothing, enable_broadcast=nothing, ttl=nothing)

Set UDP socket options. ``multicast_loop``: loopback for multicast packets (default: true). ``multicast_ttl``: TTL for multicast packets. ``enable_broadcast``: flag must be set to true if socket will be used for broadcast messages, or else the UDP system will return an access error (default: false). ``ttl``: Time-to-live of packets sent on the socket.
Expand Down
54 changes: 39 additions & 15 deletions test/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,49 @@ close(server)

@test_throws Base.UVError connect(".invalid",80)

a = UDPSocket()
b = UDPSocket()
bind(a,ip"127.0.0.1",port)
bind(b,ip"127.0.0.1",port+1)
begin
a = UDPSocket()
b = UDPSocket()
bind(a,ip"127.0.0.1",port)
bind(b,ip"127.0.0.1",port+1)

c = Condition()
@async begin
@test bytestring(recv(a)) == "Hello World"
# Issue 6505
c = Condition()
@async begin
@test bytestring(recv(a)) == "Hello World"
notify(c)
# Issue 6505
@async begin
@test bytestring(recv(a)) == "Hello World"
notify(c)
end
send(b,ip"127.0.0.1",port,"Hello World")
end
send(b,ip"127.0.0.1",port,"Hello World")
end
send(b,ip"127.0.0.1",port,"Hello World")
wait(c)
wait(c)

@test_throws MethodError bind(UDPSocket(),port)
@async begin
@test begin
(addr,data) = recvfrom(a)
addr == ip"127.0.0.1" && bytestring(data) == "Hello World"
end
end
send(b, ip"127.0.0.1",port,"Hello World")

close(a)
close(b)
@test_throws MethodError bind(UDPSocket(),port)

close(a)
close(b)
end
begin
a = UDPSocket()
b = UDPSocket()
bind(a, ip"::1", uint16(port))
bind(b, ip"::1", uint16(port+1))

@async begin
@test begin
(addr, data) = recvfrom(a)
addr == ip"::1" && bytestring(data) == "Hello World"
end
end
send(b, ip"::1", port, "Hello World")
end