diff --git a/iocore/net/AtomicEvent.h b/iocore/net/AtomicEvent.h new file mode 100644 index 00000000000..de086c5ab75 --- /dev/null +++ b/iocore/net/AtomicEvent.h @@ -0,0 +1,91 @@ +/** @file + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#pragma once + +#include + +#include "I_EventSystem.h" + +class AtomicEvent +{ +public: + bool + schedule(Continuation *c, EThread *t, int event, void *data, ink_hrtime delay = 0, int periodic = 0) + { + auto new_e = ::eventAllocator.alloc(); + new_e->init(c, delay, periodic); + new_e->callback_event = event; + new_e->cookie = data; + + Event *tmp = nullptr; + if (this->_e.compare_exchange_weak(tmp, new_e, std::memory_order_acq_rel)) { + t->schedule(new_e); + return true; + } else { + // we should not reschedule events when event is -1 or nullptr. Because the connection + // might be closed or already have events in plane. + new_e->free(); + return false; + } + } + + // thread unsafe. only target thread can cancel this event. + void + cancel() + { + Event *tmp = nullptr; + do { + if (tmp != nullptr) { + tmp->cancel(); + } + tmp = this->_e.load(std::memory_order_acquire); + if (tmp == reinterpret_cast(-1)) { + return; + } + } while (!this->_e.compare_exchange_weak(tmp, static_cast(nullptr), std::memory_order_acq_rel)); + + if (tmp != nullptr) { + tmp->cancel(); + } + } + + void + close() + { + Event *tmp = nullptr; + do { + if (tmp != nullptr) { + tmp->cancel(); + } + + tmp = this->_e.load(std::memory_order_acquire); + ink_release_assert(tmp != reinterpret_cast(-1)); + } while (!this->_e.compare_exchange_weak(tmp, reinterpret_cast(-1), std::memory_order_acq_rel)); + + if (tmp != nullptr) { + tmp->cancel(); + } + } + +private: + std::atomic _e{}; +}; diff --git a/iocore/net/Makefile.am b/iocore/net/Makefile.am index 2ec6f4e668c..23586706373 100644 --- a/iocore/net/Makefile.am +++ b/iocore/net/Makefile.am @@ -37,7 +37,7 @@ AM_CPPFLAGS += \ TESTS = $(check_PROGRAMS) -check_PROGRAMS = test_certlookup test_UDPNet +check_PROGRAMS = test_certlookup test_UDPNet test_UDPAcceptEcho noinst_LIBRARIES = libinknet.a test_certlookup_LDFLAGS = \ @@ -163,7 +163,41 @@ libinknet_a_SOURCES = \ UnixNetVConnection.cc \ UnixUDPConnection.cc \ UnixUDPNet.cc \ - SSLDynlock.cc + SSLDynlock.cc \ + UDPProcessor.cc \ + UDPConnection.cc + +test_UDPAcceptEcho_CPPFLAGS = \ + $(AM_CPPFLAGS) \ + $(iocore_include_dirs) \ + -I$(abs_top_srcdir)/proxy \ + -I$(abs_top_srcdir)/proxy/hdrs \ + -I$(abs_top_srcdir)/proxy/http \ + -I$(abs_top_srcdir)/proxy/logging \ + -I$(abs_top_srcdir)/mgmt \ + -I$(abs_top_srcdir)/mgmt/utils \ + @OPENSSL_INCLUDES@ + +test_UDPAcceptEcho_LDFLAGS = \ + @AM_LDFLAGS@ \ + @OPENSSL_LDFLAGS@ \ + @YAMLCPP_LDFLAGS@ + +test_UDPAcceptEcho_LDADD = \ + libinknet.a \ + $(top_builddir)/iocore/eventsystem/libinkevent.a \ + $(top_builddir)/mgmt/libmgmt_p.la \ + $(top_builddir)/lib/records/librecords_p.a \ + $(top_builddir)/src/tscore/libtscore.la $(top_builddir)/src/tscpp/util/libtscpputil.la \ + $(top_builddir)/proxy/ParentSelectionStrategy.o \ + @HWLOC_LIBS@ @OPENSSL_LIBS@ @LIBPCRE@ @YAMLCPP_LIBS@ + +test_UDPAcceptEcho_SOURCES = \ + libinknet_stub.cc \ + UnixUDPConnection.cc \ + UnixUDPNet.cc \ + test_UDPAcceptEcho.cc + if ENABLE_QUIC libinknet_a_SOURCES += \ @@ -192,3 +226,4 @@ include $(top_srcdir)/build/tidy.mk clang-tidy-local: $(DIST_SOURCES) $(CXX_Clang_Tidy) + diff --git a/iocore/net/UDPConnection.cc b/iocore/net/UDPConnection.cc new file mode 100644 index 00000000000..d197defeeb1 --- /dev/null +++ b/iocore/net/UDPConnection.cc @@ -0,0 +1,952 @@ +/** @file + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include + +#include "UDPConnection.h" + +#include "tscore/ink_atomic.h" + +static const char * +udp_event_name(UDP2ConnectionImpl::UDPEvents e) +{ + switch (e) { + case UDP2ConnectionImpl::UDPEvents::UDP_START_EVENT: + return "UDP_START_EVENT"; + case UDP2ConnectionImpl::UDPEvents::UDP_CONNECT_EVENT: + return "UDP_CONNECT_EVENT"; + case UDP2ConnectionImpl::UDPEvents::UDP_USER_READ_READY: + return "UDP_USER_READ_READY"; + default: + return "UNKNOWN EVENT"; + }; + + return nullptr; +} + +static const char * +udp_event_name(int e) +{ + return udp_event_name(static_cast(e)); +} + +// +// Reschedule a NetEvent by moving it +// onto or off of the ready_list +// +static inline void +read_reschedule(NetHandler *nh, NetEvent *vc) +{ + vc->ep.refresh(EVENTIO_READ); + if (vc->read.triggered && vc->read.enabled) { + nh->read_ready_list.in_or_enqueue(vc); + } else { + nh->read_ready_list.remove(vc); + } +} + +static inline void +write_reschedule(NetHandler *nh, NetEvent *vc) +{ + vc->ep.refresh(EVENTIO_WRITE); + if (vc->write.triggered && vc->write.enabled) { + nh->write_ready_list.in_or_enqueue(vc); + } else { + nh->write_ready_list.remove(vc); + } +} + +// +// UDP2ConnectionImpl +// +UDP2ConnectionImpl::UDP2ConnectionImpl(Continuation *con, EThread *thread, int fd) : _con(con), _thread(thread), _fd(fd) +{ + this->mutex = con->mutex; + this->read.enabled = 1; // read enabled is always true because we expected all data; + if (thread == nullptr) { + this->_thread = this_ethread(); + } + if (this->mutex == nullptr) { + this->mutex = new_ProxyMutex(); + } + SET_HANDLER(&UDP2ConnectionImpl::startEvent); +} + +UDP2ConnectionImpl::~UDP2ConnectionImpl() +{ + Debug("udp_con", "destroy"); + + int fd = this->_fd; + + this->_fd = -1; + if (fd != -1) { + ::close(fd); + } +} + +void +UDP2ConnectionImpl::free(EThread *t) +{ + Debug("udp_con", "free connection"); + this->mutex = nullptr; + + this->_close_event(UDPEvents::UDP_USER_READ_READY); + this->_close_event(UDPEvents::UDP_START_EVENT); + this->_close_event(UDPEvents::UDP_CONNECT_EVENT); + + this->read.enabled = 0; + this->read.triggered = 0; + + this->write.enabled = 0; + this->write.triggered = 0; + this->nh->stopIO(this); + + int fd = this->_fd; + + this->_fd = -1; + if (fd != -1) { + ::close(fd); + } + + delete this; +} + +int +UDP2ConnectionImpl::callback(int event, void *data) +{ + if (this->_con == nullptr) { + return 0; + } + + MUTEX_TRY_LOCK(lock, this->_con->mutex == nullptr ? this->mutex : this->_con->mutex, this_ethread()); + if (!lock.is_locked()) { + // TODO reuse cached event + Debug("udpcon", "callback get con lock failed"); + this->_reschedule(UDPEvents::UDP_USER_READ_READY, nullptr); + return 0; + } + return this->_con->handleEvent(event, data); +} + +void +UDP2ConnectionImpl::set_inactivity_timeout(ink_hrtime timeout_in) +{ +} + +EThread * +UDP2ConnectionImpl::get_thread() +{ + return this->_thread; +} + +int +UDP2ConnectionImpl::close() +{ + // detach contiuation. we should not callback to con after `close` has been called + this->_con = nullptr; + this->mutex = this->_thread->mutex; + + this->_recv_list.clear(); + if (this->_is_send_complete()) { + this->free(nullptr); + return 0; + } + + this->_reenable(&this->write.vio); + this->nh->signalActivity(); + return 0; +} + +int +UDP2ConnectionImpl::get_fd() +{ + return this->_fd; +} + +Ptr & +UDP2ConnectionImpl::get_mutex() +{ + return this->mutex; +} + +ContFlags & +UDP2ConnectionImpl::get_control_flags() +{ + return _cont_flags; +} + +bool +UDP2ConnectionImpl::_is_closed() const +{ + return this->_con == nullptr; +} + +int +UDP2ConnectionImpl::startEvent(int event, void *data) +{ + Debug("udp_con", "startEvent %s-%d", udp_event_name(event), event); + this->_close_event(event); + switch (static_cast(event)) { + case UDPEvents::UDP_CONNECT_EVENT: + this->connect(&this->_to.sa); + break; + case UDPEvents::UDP_START_EVENT: { + NetHandler *nh = get_NetHandler(this->_thread); + if (this->_thread == this_ethread()) { + MUTEX_TRY_LOCK(lock, nh->mutex, this->_thread); + if (lock.is_locked()) { + SET_HANDLER(&UDP2ConnectionImpl::mainEvent); + ink_assert(nh->startIO(this) >= 0); + // reenable read since there might be some packets in socket's buffer. + if (!this->_recv_list.empty()) { + this->callback(NET_EVENT_DATAGRAM_READ_READY, this); + } + break; + } + } + this->_reschedule(UDPEvents::UDP_START_EVENT, nullptr, net_retry_delay); + break; + } + default: + ink_release_assert(0); + break; + } + + if (this->_is_closed() && this->_is_send_complete()) { + this->free(nullptr); + } else if (this->_is_closed()) { + this->_reenable(&this->write.vio); + this->nh->signalActivity(); + } + return 0; +} + +int +UDP2ConnectionImpl::mainEvent(int event, void *data) +{ + ink_assert(this->mutex->thread_holding == this->_thread); + this->_close_event(event); + switch (static_cast(event)) { + case UDPEvents::UDP_CONNECT_EVENT: + this->connect(&this->_to.sa); + break; + default: + Debug("udp_con", "unknown events: %d", event); + ink_release_assert(0); + break; + } + + if (this->_is_closed() && this->_is_send_complete()) { + this->free(nullptr); + } else if (this->_is_closed()) { + this->_reenable(&this->write.vio); + this->nh->signalActivity(); + } + + return 0; +} + +int +UDP2ConnectionImpl::start_io() +{ + return this->startEvent(0, nullptr); +} + +int +UDP2ConnectionImpl::create_socket(int family, int recv_buf, int send_buf) +{ + int res = 0; + int fd = -1; + + if (this->_fd != -1) { + return 0; + } + + if ((res = socketManager.socket(family, SOCK_DGRAM, 0)) < 0) { + goto Lerror; + } + + fd = res; + if ((res = safe_fcntl(fd, F_SETFL, O_NONBLOCK)) < 0) { + goto Lerror; + } + + if (recv_buf > 0) { + if (unlikely(socketManager.set_rcvbuf_size(fd, recv_buf))) { + Debug("udp_con", "set_dnsbuf_size(%d) failed", recv_buf); + } + } + if (send_buf > 0) { + if (unlikely(socketManager.set_sndbuf_size(fd, send_buf))) { + Debug("udp_con", "set_dnsbuf_size(%d) failed", send_buf); + } + } + + if (family == AF_INET) { + bool succeeded = false; + int enable = 1; +#ifdef IP_PKTINFO + if ((res = safe_setsockopt(fd, IPPROTO_IP, IP_PKTINFO, reinterpret_cast(&enable), sizeof(enable))) == 0) { + succeeded = true; + } +#endif +#ifdef IP_RECVDSTADDR + if ((res = safe_setsockopt(fd, IPPROTO_IP, IP_RECVDSTADDR, reinterpret_cast(&enable), sizeof(enable))) == 0) { + succeeded = true; + } +#endif + if (!succeeded) { + Debug("udp_con", "setsockeopt for pktinfo failed"); + goto Lerror; + } + } else if (family == AF_INET6) { + bool succeeded = false; + int enable = 1; +#ifdef IPV6_PKTINFO + if ((res = safe_setsockopt(fd, IPPROTO_IPV6, IPV6_PKTINFO, reinterpret_cast(&enable), sizeof(enable))) == 0) { + succeeded = true; + } +#endif +#ifdef IPV6_RECVPKTINFO + if ((res = safe_setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, reinterpret_cast(&enable), sizeof(enable))) == 0) { + succeeded = true; + } +#endif + if (!succeeded) { + Debug("udp_con", "setsockeopt for pktinfo failed"); + goto Lerror; + } + } + + // If this is a class D address (i.e. multicast address), use REUSEADDR. + if ((res = safe_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast(SOCKOPT_ON), sizeof(int)) < 0)) { + goto Lerror; + } + + if ((res = safe_setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, SOCKOPT_ON, sizeof(int))) < 0) { + goto Lerror; + } + + this->_fd = fd; + Debug("udp_con", "creating a udp socket family = %d---success", family); + return 0; +Lerror: + Debug("udp_con", "creating a udp socket family = %d---soft failure", family); + if (fd != -1) { + socketManager.close(fd); + } + + return -errno; +} + +int +UDP2ConnectionImpl::bind(sockaddr const *addr) +{ + int res = 0; + int local_addr_len = sizeof(this->_from); + + if (addr->sa_family == AF_INET6 && (res = safe_setsockopt(this->_fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int))) < 0) { + Debug("udp_con", "safe_setsockopt error IPPROTO_IPV6"); + goto Lerror; + } + + if (-1 == socketManager.ink_bind(this->_fd, addr, ats_ip_size(addr))) { + char buff[INET6_ADDRPORTSTRLEN]; + Debug("udp_con", "ink bind failed on %s %s", ats_ip_nptop(addr, buff, sizeof(buff)), strerror(errno)); + goto Lerror; + } + + if ((res = safe_getsockname(this->_fd, &this->_from.sa, &local_addr_len)) < 0) { + Debug("udp_con", "CreateUdpsocket: getsockname didn't work"); + goto Lerror; + } + + Debug("udp_con", "bind udp socket port = %d---success", ats_ip_port_host_order(addr)); + return 0; +Lerror: + Debug("udp_con", "creating a udp socket port = %d---soft failure", ats_ip_port_host_order(addr)); + if (this->_fd != -1) { + socketManager.close(this->_fd); + } + + this->_fd = -1; + return -errno; +} + +IpEndpoint +UDP2ConnectionImpl::from() +{ + return this->_from; +} + +IpEndpoint +UDP2ConnectionImpl::to() +{ + return this->_to; +} + +int +UDP2ConnectionImpl::_connect() +{ + ink_assert(this->_fd != NO_FD); + ink_assert(this->_to.port() != 0); + int res = ::connect(this->_fd, &this->_to.sa, ats_ip_size(&this->_to.sa)); + if (res >= 0) { + this->_connected = true; + return 0; + } + + return -errno; +} + +int +UDP2ConnectionImpl::connect(sockaddr const *addr) +{ + if (this->_to.port() == 0) { + ats_ip_copy(&this->_to, addr); + } + int res = this->_connect(); + if (res < 0) { + if ((res == -EINPROGRESS) || (res == -EWOULDBLOCK)) { + this->_reschedule(UDPEvents::UDP_CONNECT_EVENT, nullptr); + return 0; + } + return this->callback(NET_EVENT_DATAGRAM_CONNECT_ERROR, this); + } + return this->callback(NET_EVENT_DATAGRAM_CONNECT_SUCCESS, this); +} + +bool +UDP2ConnectionImpl::is_connected() const +{ + return this->_connected; +} + +void +UDP2ConnectionImpl::set_continuation(Continuation *con) +{ + // ink_assert(this->mutex == nullptr); + // rebind mutex; + this->_con = con; + this->mutex = con->mutex; + if (this->mutex == nullptr) { + this->mutex = new_ProxyMutex(); + } +} + +void +UDP2ConnectionImpl::bind_thread(EThread *thread) +{ + this->_thread = thread; +} + +void +UDP2ConnectionImpl::_reschedule(UDPEvents e, void *data, int64_t delay) +{ + Debug("udp_con", "schedule event %s", udp_event_name(e)); + Event **event = nullptr; + switch (e) { + case UDPEvents::UDP_START_EVENT: + event = &this->_start_event; + break; + case UDPEvents::UDP_CONNECT_EVENT: + event = &this->_connect_event; + break; + case UDPEvents::UDP_USER_READ_READY: + event = &this->_user_read_ready_event; + break; + default: + ink_release_assert(!"unknown events"); + break; + } + + if (*event != nullptr) { + (*event)->cancel(); + (*event) = nullptr; + } + + if (delay) { + *event = this->_thread->schedule_in(this, delay, static_cast(e), data); + } else { + *event = this->_thread->schedule_imm(this, static_cast(e), data); + } +} + +void +UDP2ConnectionImpl::_close_event(int e) +{ + this->_close_event(static_cast(e)); +} + +void +UDP2ConnectionImpl::_close_event(UDPEvents e) +{ + Event **ptr = nullptr; + switch (e) { + case UDPEvents::UDP_START_EVENT: + ptr = &this->_start_event; + break; + case UDPEvents::UDP_CONNECT_EVENT: + ptr = &this->_connect_event; + break; + case UDPEvents::UDP_USER_READ_READY: + ptr = &this->_user_read_ready_event; + break; + default: + ink_release_assert(!"unknown ptrs"); + break; + } + + if (*ptr != nullptr) { + (*ptr)->cancel(); + *ptr = nullptr; + } +} + +void +UDP2ConnectionImpl::net_read_io(NetHandler *nh, EThread *thread) +{ + ink_assert(this->nh = nh); + ink_assert(this->nh->mutex->thread_holding == thread); + MUTEX_TRY_LOCK(lock, this->mutex, thread); + if (!lock.is_locked()) { + read_reschedule(nh, this); + return; + } + + NetState *s = &this->read; + if (!s->enabled) { + read_disable(nh, this); + return; + } + + this->_read_from_net(nh, thread, true); + + read_reschedule(nh, this); +} + +void +UDP2ConnectionImpl::_read_from_net(NetHandler *nh, EThread *thread, bool callback) +{ + // receive packet and queue onto UDPConnection. + // don't call back connection at this time. + int64_t r = 0; + int count = 0; + + Ptr chain, next_chain; + struct iovec tiovec[MAX_NIOV]; + int64_t size_index = BUFFER_SIZE_INDEX_2K; + int64_t buffer_size = BUFFER_SIZE_FOR_INDEX(size_index); + // The max length of receive buffer is 32 * buffer_size (2048) = 65536 bytes. + // Because the 'UDP Length' is type of uint16_t defined in RFC 768. + // And there is 8 octets in 'User Datagram Header' which means the max length of payload is no more than 65527 bytes. + do { + // create IOBufferBlock chain to receive data + unsigned int niov; + IOBufferBlock *b, *last; + + // build struct iov + // reuse the block in chain if available + b = chain.get(); + last = nullptr; + for (niov = 0; niov < MAX_NIOV; niov++) { + if (b == nullptr) { + b = new_IOBufferBlock(); + b->alloc(size_index); + if (last == nullptr) { + chain = b; + } else { + last->next = b; + } + } + + tiovec[niov].iov_base = b->buf(); + tiovec[niov].iov_len = b->block_size(); + + last = b; + b = b->next.get(); + } + + UDP2PacketUPtr p = std::make_unique(); + r = this->is_connected() ? this->_read(tiovec, niov, p->from, p->to) : this->_readmsg(tiovec, niov, p->from, p->to); + if (r <= 0) { + if (r == -EAGAIN || r == -ENOTCONN) { + this->read.triggered = 0; + break; + } + + if (callback) { + this->callback(NET_EVENT_DATAGRAM_READ_ERROR, this); + } + return; + } + + // fill the IOBufferBlock chain + int64_t saved = r; + b = chain.get(); + while (b && saved > 0) { + if (saved > buffer_size) { + b->fill(buffer_size); + saved -= buffer_size; + b = b->next.get(); + } else { + b->fill(saved); + saved = 0; + next_chain = b->next.get(); + b->next = nullptr; + } + } + + p->chain = chain; + + // queue onto the UDPConnection + this->_recv_list.push_back(std::move(p)); + + // reload the unused block + chain = next_chain; + next_chain = nullptr; + count++; + } while (r > 0); + + Debug("udp_con", "read %d packets from net", count); + + if (callback && !this->_recv_list.empty()) { + this->callback(NET_EVENT_DATAGRAM_READ_READY, this); + } + return; +} + +int +UDP2ConnectionImpl::_readmsg(struct iovec *iov, int len, IpEndpoint &fromaddr, IpEndpoint &toaddr) +{ + struct msghdr msg; + int toaddr_len = sizeof(toaddr); + char *cbuf[1024]; + msg.msg_name = &fromaddr.sin6; + msg.msg_namelen = sizeof(fromaddr); + msg.msg_iov = iov; + msg.msg_iovlen = len; + msg.msg_control = cbuf; + msg.msg_controllen = sizeof(cbuf); + + int rc = socketManager.recvmsg(this->get_fd(), &msg, 0); + if (rc <= 0) { + return rc; + } + + // truncated check + if (msg.msg_flags & MSG_TRUNC) { + Debug("udp-read", "The UDP packet is truncated"); + ink_assert(!"truncate should not happen, if so please increase MAX_NIOV"); + rc = -0x12345; + return rc; + } + + safe_getsockname(this->get_fd(), &toaddr.sa, &toaddr_len); + for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) { + switch (cmsg->cmsg_type) { +#ifdef IP_PKTINFO + case IP_PKTINFO: + if (cmsg->cmsg_level == IPPROTO_IP) { + struct in_pktinfo *pktinfo = reinterpret_cast(CMSG_DATA(cmsg)); + reinterpret_cast(&toaddr)->sin_addr.s_addr = pktinfo->ipi_addr.s_addr; + } + break; +#endif +#ifdef IP_RECVDSTADDR + case IP_RECVDSTADDR: + if (cmsg->cmsg_level == IPPROTO_IP) { + struct in_addr *addr = reinterpret_cast(CMSG_DATA(cmsg)); + reinterpret_cast(&toaddr)->sin_addr.s_addr = addr->s_addr; + } + break; +#endif +#if defined(IPV6_PKTINFO) || defined(IPV6_RECVPKTINFO) + case IPV6_PKTINFO: // IPV6_RECVPKTINFO uses IPV6_PKTINFO too + if (cmsg->cmsg_level == IPPROTO_IPV6) { + struct in6_pktinfo *pktinfo = reinterpret_cast(CMSG_DATA(cmsg)); + memcpy(toaddr.sin6.sin6_addr.s6_addr, &pktinfo->ipi6_addr, 16); + } + break; +#endif + } + } + + char buff[INET6_ADDRPORTSTRLEN * 2] = {0}; + Debug("udp_accept", "read packet %s ----> %s", ats_ip_nptop(&fromaddr.sa, buff, sizeof(buff) - INET6_ADDRPORTSTRLEN), + ats_ip_nptop(&toaddr.sa, buff + INET6_ADDRPORTSTRLEN, sizeof(buff) - INET6_ADDRPORTSTRLEN)); + ink_release_assert(!ats_ip_addr_port_eq(&fromaddr.sa, &toaddr.sa)); + return rc; +} + +int +UDP2ConnectionImpl::_read(struct iovec *iov, int len, IpEndpoint &from, IpEndpoint &to) +{ + ink_release_assert(this->_from.isValid() && this->_to.isValid()); + int rc = socketManager.readv(this->get_fd(), iov, len); + if (rc <= 0) { + return rc; + } + + ats_ip_copy(&from, &this->_to.sa); + ats_ip_copy(&to, &this->_from.sa); + return rc; +} + +void +UDP2ConnectionImpl::net_write_io(NetHandler *nh, EThread *thread) +{ + ink_assert(this->nh = nh); + ink_assert(this->nh->mutex->thread_holding == thread); + MUTEX_TRY_LOCK(lock, this->mutex, thread); + if (!lock.is_locked()) { + write_reschedule(nh, this); + return; + } + + MUTEX_TRY_LOCK(lock2, this->nh->mutex, thread); + if (!lock2.is_locked()) { + read_reschedule(nh, this); + return; + } + + NetState *s = &this->write; + if (!s->enabled) { + write_disable(nh, this); + return; + } + + SList(UDP2Packet, link) aq(this->_external_send_list.popall()); + UDP2Packet *tp; + Queue tmp; + while ((tp = aq.pop())) { + tmp.push(tp); + } + + while ((tp = tmp.pop())) { + this->_send_list.push_back(UDP2PacketUPtr(tp)); + } + + int count = 0; + while (!this->_send_list.empty()) { + auto p = this->_send_list.front().get(); + + int rc = this->is_connected() ? this->_send(p) : this->_sendmsg(p); + if (rc >= 0) { + count++; + this->_send_list.pop_front(); + continue; + } + + if (errno == EAGAIN) { + this->write.triggered = 0; + write_reschedule(nh, this); + break; + } else { + this->write.triggered = 0; + this->callback(NET_EVENT_DATAGRAM_WRITE_ERROR, this); + } + } + + if (count > 0) { + this->callback(NET_EVENT_DATAGRAM_WRITE_READY, this); + } + + if (this->_is_closed() && this->_is_send_complete()) { + this->free(nullptr); + } + + return; +} + +bool +UDP2ConnectionImpl::_is_send_complete() +{ + return this->_send_list.empty() && this->_external_send_list.empty(); +} + +int +UDP2ConnectionImpl::_send(UDP2Packet *p) +{ + ink_assert(this->is_connected()); + struct iovec iov[MAX_NIOV]; + int n, iov_len = 0; + + for (IOBufferBlock *b = p->chain.get(); b != nullptr; b = b->next.get()) { + iov[iov_len].iov_base = static_cast(b->start()); + iov[iov_len].iov_len = b->size(); + iov_len++; + } + + n = socketManager.writev(this->_fd, iov, iov_len); + if (n >= 0) { + return n; + } + + Debug("udp_con", "writev failed: %s", strerror(errno)); + return -errno; +} + +int +UDP2ConnectionImpl::_sendmsg(UDP2Packet *p) +{ + ink_assert(p->to.isValid()); + ink_assert(this->is_connected() == false); + struct msghdr msg; + struct iovec iov[MAX_NIOV]; + int real_len = 0; + int n, iov_len = 0; + +#if !defined(solaris) + msg.msg_control = nullptr; + msg.msg_controllen = 0; + msg.msg_flags = 0; +#endif + msg.msg_name = reinterpret_cast(&p->to.sa); + msg.msg_namelen = ats_ip_size(p->to); + iov_len = 0; + + for (IOBufferBlock *b = p->chain.get(); b != nullptr; b = b->next.get()) { + iov[iov_len].iov_base = static_cast(b->start()); + iov[iov_len].iov_len = b->size(); + real_len += iov[iov_len].iov_len; + iov_len++; + } + + msg.msg_iov = iov; + msg.msg_iovlen = iov_len; + + n = socketManager.sendmsg(this->get_fd(), &msg, 0); + if (n >= 0) { + char buff[INET6_ADDRPORTSTRLEN * 2] = {0}; + Debug("udp_accept", "send packet %s ----> %s", ats_ip_nptop(&p->from.sa, buff, sizeof(buff) - INET6_ADDRPORTSTRLEN), + ats_ip_nptop(&p->to.sa, buff + INET6_ADDRPORTSTRLEN, sizeof(buff) - INET6_ADDRPORTSTRLEN)); + return n; + } + + Debug("udp_conn", "send from external thread failed: %d-%s", errno, strerror(errno)); + return -errno; +} + +UDP2PacketUPtr +UDP2ConnectionImpl::recv() +{ + ink_assert(!this->_is_closed()); + ink_assert(this->mutex->thread_holding == this->_thread); + if (this->_recv_list.empty()) { + return nullptr; + } + + auto p = std::move(this->_recv_list.front()); + this->_recv_list.pop_front(); + return p; +} + +void +UDP2ConnectionImpl::_reenable(VIO *vio) +{ + NetState *state = &this->write; + if (vio != &this->write.vio) { + state = &this->read; + } + + Debug("udp_con", "udp connection reenable %s", vio == &this->read.vio ? "read" : "write"); + state->enabled = 1; + ink_release_assert(!closed); + auto t = this_ethread(); + if (nh->mutex->thread_holding == t) { + if (vio == &read.vio) { + ep.modify(EVENTIO_READ); + ep.refresh(EVENTIO_READ); + if (read.triggered) { + nh->read_ready_list.in_or_enqueue(this); + } else { + nh->read_ready_list.remove(this); + } + } else { + ep.modify(EVENTIO_WRITE); + ep.refresh(EVENTIO_WRITE); + if (write.triggered) { + nh->write_ready_list.in_or_enqueue(this); + } else { + nh->write_ready_list.remove(this); + } + } + } else { + MUTEX_TRY_LOCK(lock, nh->mutex, t); + if (!lock.is_locked()) { + if (vio == &read.vio) { + int isin = ink_atomic_swap(&read.in_enabled_list, 1); + if (!isin) { + nh->read_enable_list.push(this); + } + } else { + int isin = ink_atomic_swap(&write.in_enabled_list, 1); + if (!isin) { + nh->write_enable_list.push(this); + } + } + if (likely(nh->thread)) { + nh->thread->tail_cb->signalActivity(); + } else if (nh->trigger_event) { + nh->trigger_event->ethread->tail_cb->signalActivity(); + } + } else { + if (vio == &read.vio) { + ep.modify(EVENTIO_READ); + ep.refresh(EVENTIO_READ); + if (read.triggered) { + nh->read_ready_list.in_or_enqueue(this); + } else { + nh->read_ready_list.remove(this); + } + } else { + ep.modify(EVENTIO_WRITE); + ep.refresh(EVENTIO_WRITE); + if (write.triggered) { + nh->write_ready_list.in_or_enqueue(this); + } else { + nh->write_ready_list.remove(this); + } + } + } + } +} + +int +UDP2ConnectionImpl::send(UDP2PacketUPtr p, bool flush) +{ + ink_assert(!this->_is_closed()); + ink_assert(this->is_connected() || p->to.isValid()); + this->_external_send_list.push(p.get()); + p.release(); + if (flush) { + this->flush(); + } + return 0; +} + +void +UDP2ConnectionImpl::flush() +{ + this->_reenable(&this->write.vio); + this->nh->signalActivity(); +} diff --git a/iocore/net/UDPConnection.h b/iocore/net/UDPConnection.h new file mode 100644 index 00000000000..90c0ac1ab84 --- /dev/null +++ b/iocore/net/UDPConnection.h @@ -0,0 +1,134 @@ +/** @file + * + * @section license License + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include "tscore/ink_sock.h" +#include "I_EventSystem.h" +#include "P_Net.h" +#include "NetEvent.h" +#include "UDPPacket.h" + +#define NET_EVENT_DATAGRAM_CONNECT_SUCCESS (NET_EVENT_EVENTS_START + 170) +#define NET_EVENT_DATAGRAM_CONNECT_ERROR (NET_EVENT_DATAGRAM_CONNECT_SUCCESS + 1) +#define NET_EVENT_DATAGRAM_WRITE_READY (NET_EVENT_DATAGRAM_CONNECT_SUCCESS + 1) + +class Continuation; + +class UDP2ConnectionImpl : public Continuation, public NetEvent +{ +public: + UDP2ConnectionImpl() = delete; + // independent allocate. + UDP2ConnectionImpl(Continuation *con, EThread *ethread = nullptr, int fd = -1); + ~UDP2ConnectionImpl(); + + enum class UDPEvents : uint8_t { + UDP_START_EVENT, + UDP_CONNECT_EVENT, + UDP_USER_READ_READY, + }; + + // NetEventHandler + virtual void net_read_io(NetHandler *nh, EThread *lthread) override; + void net_write_io(NetHandler *nh, EThread *lthread) override; + void free(EThread *t) override; + int callback(int event = CONTINUATION_EVENT_NONE, void *data = nullptr) override; + void set_inactivity_timeout(ink_hrtime timeout_in) override; + EThread *get_thread() override; + int close() override; + int get_fd() override; + Ptr &get_mutex() override; + ContFlags &get_control_flags() override; + int start_io(); + + int send(UDP2PacketUPtr packet, bool flush = true); + void flush(); + UDP2PacketUPtr recv(); + IpEndpoint from(); + IpEndpoint to(); + void set_continuation(Continuation *con); + + int create_socket(int family, int recv_buf = 0, int send_buf = 0); + int bind(sockaddr const *addr); + int connect(sockaddr const *addr); + bool is_connected() const; + void bind_thread(EThread *thread); + + int startEvent(int event, void *data); + int mainEvent(int event, void *data); + + void + set_data(void *data) + { + this->_data = data; + } + + void * + get_data() + { + return this->_data; + } + +protected: + // control max data size per read, This can be calculated as MAX_NIOV * 1024 / read + static constexpr int MAX_NIOV = 1; + + bool _is_closed() const; + void _reschedule(UDPEvents e, void *data, int64_t delay = 0); + void _reenable(VIO *vio); + void _read_from_net(NetHandler *nh, EThread *t, bool callback = true); + virtual int _send(UDP2Packet *p); + virtual int _sendmsg(UDP2Packet *p); + virtual int _read(struct iovec *iov, int len, IpEndpoint &from, IpEndpoint &to); + virtual int _readmsg(struct iovec *iov, int len, IpEndpoint &from, IpEndpoint &to); + bool _is_send_complete(); + + Continuation *_con = nullptr; + EThread *_thread = nullptr; + + ASLL(UDP2Packet, link) _external_send_list; + +private: + // internal schedule. + void _close_event(UDPEvents e); + void _close_event(int e); + int _connect(); + + IpEndpoint _from{}; + IpEndpoint _to{}; + + int _fd = -1; + bool _connected = false; + Event *_start_event = nullptr; + Event *_connect_event = nullptr; + Event *_user_read_ready_event = nullptr; + + // TODO removed + NetVCOptions _options{}; + ContFlags _cont_flags{}; + + std::deque _recv_list; + std::deque _send_list; + + void *_data = nullptr; +}; diff --git a/iocore/net/UDPPacket.h b/iocore/net/UDPPacket.h new file mode 100644 index 00000000000..5c99eaca9fb --- /dev/null +++ b/iocore/net/UDPPacket.h @@ -0,0 +1,52 @@ +/** @file + + ALPNSupport.cc provides implmentations for ALPNSupport methods + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#pragma once + +#include "tscore/ink_sock.h" +#include "tscore/ink_inet.h" +#include "I_EventSystem.h" + +#include + +class UDP2Connection; + +struct UDP2Packet { + UDP2Packet() = default; + UDP2Packet(const IpEndpoint &from, const IpEndpoint &to, Ptr &chain) : from(from), to(to), chain(chain) {} + UDP2Packet(sockaddr const *from, sockaddr *to, Ptr &chain) : chain(chain) + { + ats_ip_copy(&this->from, from); + ats_ip_copy(&this->to, to); + } + + ~UDP2Packet() { this->chain = nullptr; } + IpEndpoint from{}; + IpEndpoint to{}; + Ptr chain; + + SLINK(UDP2Packet, in_link); + SLINK(UDP2Packet, out_link); + LINK(UDP2Packet, link); +}; + +using UDP2PacketUPtr = std::unique_ptr; diff --git a/iocore/net/UDPProcessor.cc b/iocore/net/UDPProcessor.cc new file mode 100644 index 00000000000..1f50141c5f9 --- /dev/null +++ b/iocore/net/UDPProcessor.cc @@ -0,0 +1,74 @@ +/** @file + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include "UDPProcessor.h" +#include "P_Net.h" + +UDP2NetProcessor udp2Net; +EventType ET_UDP2; + +void +initialize_thread_for_udp2_net(EThread *thread) +{ + NetHandler *nh = get_NetHandler(thread); + + new (reinterpret_cast(nh)) NetHandler(); + new (reinterpret_cast(get_PollCont(thread))) PollCont(thread->mutex, nh); + nh->mutex = new_ProxyMutex(); + nh->thread = thread; + + PollCont *pc = get_PollCont(thread); + PollDescriptor *pd = pc->pollDescriptor; + + memcpy(&nh->config, &NetHandler::global_config, sizeof(NetHandler::global_config)); + nh->configure_per_thread_values(); + + thread->set_tail_handler(nh); + thread->ep = static_cast(ats_malloc(sizeof(EventIO))); + new (thread->ep) EventIO(); + thread->ep->type = EVENTIO_ASYNC_SIGNAL; +#if HAVE_EVENTFD + thread->ep->start(pd, thread->evfd, nullptr, EVENTIO_READ); +#else + thread->ep->start(pd, thread->evpipe[0], nullptr, EVENTIO_READ); +#endif +} + +int +UDP2NetProcessor::start(int n_upd_threads, size_t stacksize) +{ + if (n_upd_threads < 1) { + return -1; + } + + if (unix_netProcessor.pollCont_offset < 0) { + unix_netProcessor.pollCont_offset = eventProcessor.allocate(sizeof(PollCont)); + } + + if (unix_netProcessor.netHandler_offset < 0) { + unix_netProcessor.netHandler_offset = eventProcessor.allocate(sizeof(NetHandler)); + } + + ET_UDP2 = eventProcessor.register_event_type("ET_UDP2"); + eventProcessor.schedule_spawn(&initialize_thread_for_udp2_net, ET_UDP2); + eventProcessor.spawn_event_threads(ET_UDP2, n_upd_threads, stacksize); + return 0; +} diff --git a/iocore/net/UDPProcessor.h b/iocore/net/UDPProcessor.h new file mode 100644 index 00000000000..074acee98f9 --- /dev/null +++ b/iocore/net/UDPProcessor.h @@ -0,0 +1,35 @@ +/** @file + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#pragma once + +#include "I_EventSystem.h" + +class UDP2ConnectionManager; + +class UDP2NetProcessor : public Processor +{ +public: + int start(int n_upd_threads, size_t stacksize) override; +}; + +extern EventType ET_UDP2; +extern UDP2NetProcessor udp2Net; diff --git a/iocore/net/libinknet_stub.cc b/iocore/net/libinknet_stub.cc index 11fee253e2f..d55e7051612 100644 --- a/iocore/net/libinknet_stub.cc +++ b/iocore/net/libinknet_stub.cc @@ -47,7 +47,7 @@ DNSConnection::trigger() void StatPagesManager::register_http(char const *, Action *(*)(Continuation *, HTTPHdr *)) { - ink_assert(false); + // ink_assert(false); } #include "ParentSelection.h" diff --git a/iocore/net/test_UDPAcceptEcho.cc b/iocore/net/test_UDPAcceptEcho.cc new file mode 100644 index 00000000000..ef27b696073 --- /dev/null +++ b/iocore/net/test_UDPAcceptEcho.cc @@ -0,0 +1,274 @@ +/** @file + + A brief file description + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include +#include +#include + +#include "tscore/I_Layout.h" +#include "tscore/TestBox.h" + +#include "I_EventSystem.h" +#include "I_Net.h" +#include "UDPConnection.h" +#include "UDPProcessor.h" +#include "records/I_RecProcess.h" +#include "RecordsConfig.h" + +#include "diags.i" + +static pid_t pid; +const char payload[] = "helloword"; +const char payload1[] = "helloword1"; +const char payload2[] = "helloword2"; + +void +signal_handler(int signum) +{ + std::exit(EXIT_SUCCESS); +} + +in_port_t port = 0; +int pfd[2]; // Pipe used to signal client with transient port. + +class AcceptServer : public Continuation +{ +public: + int + mainEvent(int event, void *data) + { + switch (event) { + case NET_EVENT_DATAGRAM_READ_READY: { + ink_assert(this->_con == static_cast(data)); + while (true) { + auto p = this->_con->recv(); + if (p == nullptr) { + return 0; + } + + if (!this->_first) { + this->_first = true; + ink_release_assert(this->_con->connect(&p->from.sa) >= 0); + } + + this->_closed = std::string(p->chain->start(), p->chain->read_avail()) == payload2; + std::cout << "receive msg from accept: " << std::string(p->chain->start(), p->chain->read_avail()) << std::endl; + auto tmp = p->from; + p->from = p->to; + p->to = tmp; + this->_con->send(std::move(p)); + } + break; + } + case NET_EVENT_DATAGRAM_WRITE_READY: + if (this->_closed) { + std::cout << "accept exit" << std::endl; + signal_handler(0); + } + break; + case NET_EVENT_DATAGRAM_CONNECT_SUCCESS: + break; + default: + ink_release_assert(0); + break; + } + return 0; + } + + AcceptServer() + { + SET_HANDLER(&AcceptServer::mainEvent); + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + addr.sin_port = 0; + + this->_con = new UDP2ConnectionImpl(this, eventProcessor.assign_thread(ET_UDP2)); + ink_release_assert(this->_con->create_socket(AF_INET) >= 0); + ink_release_assert(this->_con->bind(reinterpret_cast(&addr)) >= 0); + ink_release_assert(this->_con->start_io() >= 0); + ink_release_assert(this->_con != nullptr); + std::cout << "bind to port: " << ats_ip_port_host_order(this->_con->from()) << std::endl; + int port = ats_ip_port_host_order(this->_con->from()); + ink_release_assert(write(pfd[1], &port, sizeof(port)) == sizeof(port)); + this->mutex = this->_con->mutex; + } + +private: + UDP2ConnectionImpl *_con = nullptr; + bool _first = false; + bool _closed = false; +}; + +void +udp_client(TestBox &box) +{ + int sock = socket(AF_INET, SOCK_DGRAM, 0); + if (sock < 0) { + std::cout << "Couldn't create socket" << std::endl; + std::exit(EXIT_FAILURE); + } + + struct timeval tv; + tv.tv_sec = 20; + tv.tv_usec = 0; + + setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast(&tv), sizeof(tv)); + setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast(&tv), sizeof(tv)); + + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + addr.sin_port = htons(port); + + auto bsend = [sock, addr](const char *payload) { + ssize_t n = sendto(sock, payload, strlen(payload), 0, + reinterpret_cast(const_cast(&addr)), sizeof(addr)); + if (n < 0) { + std::cout << "Couldn't send udp packet" << std::endl; + close(sock); + std::exit(EXIT_FAILURE); + } + }; + + auto brecv = [sock, box](const char *expect) -> bool { + char buf[128] = {0}; + ssize_t l = recv(sock, buf, sizeof(buf), 0); + if (l < 0) { + std::cout << "Couldn't recv udp packet" << std::endl; + close(sock); + const_cast(&box)->check(false, "errno recv"); + return false; + } + std::cout << "client recv payload: " << buf << std::endl; + const_cast(&box)->check(strncmp(buf, expect, sizeof(payload)) == 0, "echo doesn't match"); + if (strncmp(buf, expect, sizeof(payload))) { + kill(pid, SIGINT); + } + return strncmp(buf, expect, sizeof(payload)) == 0; + }; + +#define CHECK_RECV(statement) \ + do { \ + if (!statement) { \ + return; \ + } \ + } while (0) + + std::cout << "client send payload" << std::endl; + bsend(payload); // send payload to accept; + CHECK_RECV(brecv(payload)); // accept reply the payload + // CHECK_RECV(brecv(payload)); // sub udp connection send another one. + + // send to accept udp connection since we are sleeping in one second. + std::cout << "client send payload1" << std::endl; + bsend(payload1); // send to accept udp connection since we are sleeping in one second. + + std::cout << "client send payload2" << std::endl; + bsend(payload2); // send to accept udp again. + + // recv from sub udp connection + CHECK_RECV(brecv(payload1)); + CHECK_RECV(brecv(payload2)); + + std::cout << "client exit" << std::endl; + close(sock); + return; +} + +void +udp_echo_server() +{ + Layout::create(); + RecModeT mode_type = RECM_STAND_ALONE; + RecProcessInit(mode_type); + + Thread *main_thread = new EThread(); + main_thread->set_specific(); + net_config_poll_timeout = 10; + RecProcessInit(RECM_STAND_ALONE); + LibRecordsConfigInit(); + ink_net_init(ts::ModuleVersion(1, 0, ts::ModuleVersion::PRIVATE)); + + // statPagesManager.init(); + init_diags("udp", nullptr); + ink_event_system_init(EVENT_SYSTEM_MODULE_PUBLIC_VERSION); + netProcessor.init(); + eventProcessor.start(1); + udp2Net.start(1, 1048576); + + initialize_thread_for_net(this_ethread()); + + signal(SIGPIPE, SIG_IGN); + signal(SIGTERM, signal_handler); + + AcceptServer *server = new AcceptServer; + (void)server; + + this_thread()->execute(); +} + +REGRESSION_TEST(UDPNet_echo)(RegressionTest *t, int /* atype ATS_UNUSED */, int *pstatus) +{ + TestBox box(t, pstatus); + box = REGRESSION_TEST_PASSED; + + int z = pipe(pfd); + if (z < 0) { + std::cout << "Unable to create pipe" << std::endl; + std::exit(EXIT_FAILURE); + } + + pid = fork(); + if (pid < 0) { + std::cout << "Couldn't fork" << std::endl; + std::exit(EXIT_FAILURE); + } else if (pid == 0) { + close(pfd[0]); + udp_echo_server(); + } else { + close(pfd[1]); + if (read(pfd[0], &port, sizeof(port)) <= 0) { + std::cout << "Failed to get signal with port data [" << errno << ']' << std::endl; + std::exit(EXIT_FAILURE); + } + Debug("udp_echo", "client get ports: %d", port); + udp_client(box); + + // kill(pid, SIGTERM); + int status; + wait(&status); + + if (WIFEXITED(status) && WEXITSTATUS(status) != 0) { + std::cout << "UDP Echo Server exit failure" << std::endl; + std::exit(EXIT_FAILURE); + } + } +} + +int +main(int /* argc ATS_UNUSED */, const char ** /* argv ATS_UNUSED */) +{ + RegressionTest::run("UDPNet", REGRESSION_TEST_QUICK); + return RegressionTest::final_status == REGRESSION_TEST_PASSED ? 0 : 1; +}