@@ -73,12 +73,10 @@ RecInt aio_io_uring_wq_unbounded = 0;
7373RecRawStatBlock *aio_rsb = nullptr ;
7474Continuation *aio_err_callbck = nullptr ;
7575// AIO Stats
76- std::atomic<uint64_t > aio_num_read = 0 ;
77- std::atomic<uint64_t > aio_bytes_read = 0 ;
78- std::atomic<uint64_t > aio_num_write = 0 ;
79- std::atomic<uint64_t > aio_bytes_written = 0 ;
80- std::atomic<uint64_t > io_uring_submissions = 0 ;
81- std::atomic<uint64_t > io_uring_completions = 0 ;
76+ std::atomic<uint64_t > aio_num_read = 0 ;
77+ std::atomic<uint64_t > aio_bytes_read = 0 ;
78+ std::atomic<uint64_t > aio_num_write = 0 ;
79+ std::atomic<uint64_t > aio_bytes_written = 0 ;
8280
8381/*
8482 * Stats
@@ -534,87 +532,12 @@ AIOThreadInfo::aio_thread_main(AIOThreadInfo *thr_info)
534532
535533#elif AIO_MODE == AIO_MODE_IO_URING
536534
537- std::atomic<int > aio_main_wq_fd;
538-
539- DiskHandler::DiskHandler ()
540- {
541- io_uring_params p{};
542-
543- if (aio_io_uring_attach_wq > 0 ) {
544- int wq_fd = get_main_queue_fd ();
545- if (wq_fd > 0 ) {
546- p.flags = IORING_SETUP_ATTACH_WQ;
547- p.wq_fd = wq_fd;
548- }
549- }
550-
551- if (aio_io_uring_sq_poll_ms > 0 ) {
552- p.flags |= IORING_SETUP_SQPOLL;
553- p.sq_thread_idle = aio_io_uring_sq_poll_ms;
554- }
555-
556- int ret = io_uring_queue_init_params (aio_io_uring_queue_entries, &ring, &p);
557- if (ret < 0 ) {
558- throw std::runtime_error (strerror (-ret));
559- }
560-
561- /* no sharing for non-fixed either */
562- if (aio_io_uring_sq_poll_ms && !(p.features & IORING_FEAT_SQPOLL_NONFIXED)) {
563- throw std::runtime_error (" No SQPOLL sharing with nonfixed" );
564- }
565-
566- // assign this handler to the thread
567- // TODO(cmcfarlen): maybe a bad place for this!
568- this_ethread ()->diskHandler = this ;
569- }
570-
571- DiskHandler::~DiskHandler ()
572- {
573- io_uring_queue_exit (&ring);
574- }
575-
576- void
577- DiskHandler::set_main_queue (DiskHandler *dh)
578- {
579- dh->set_wq_max_workers (aio_io_uring_wq_bounded, aio_io_uring_wq_unbounded);
580- aio_main_wq_fd.store (dh->ring .ring_fd );
581- }
582-
583- int
584- DiskHandler::get_main_queue_fd ()
585- {
586- return aio_main_wq_fd.load ();
587- }
588-
589- int
590- DiskHandler::set_wq_max_workers (unsigned int bounded, unsigned int unbounded)
591- {
592- if (bounded == 0 && unbounded == 0 ) {
593- return 0 ;
594- }
595- unsigned int args[2 ] = {bounded, unbounded};
596- int result = io_uring_register_iowq_max_workers (&ring, args);
597- return result;
598- }
599-
600- std::pair<int , int >
601- DiskHandler::get_wq_max_workers ()
602- {
603- unsigned int args[2 ] = {0 , 0 };
604- io_uring_register_iowq_max_workers (&ring, args);
605- return std::make_pair (args[0 ], args[1 ]);
606- }
607-
608- void
609- DiskHandler::submit ()
610- {
611- io_uring_submissions.fetch_add (io_uring_submit (&ring));
612- }
535+ #include " I_IO_URING.h"
613536
614537void
615- DiskHandler::handle_cqe (io_uring_cqe *cqe)
538+ ink_aiocb::handle_complete (io_uring_cqe *cqe)
616539{
617- AIOCallback *op = static_cast <AIOCallback *>( io_uring_cqe_get_data (cqe)) ;
540+ AIOCallback *op = this_op ;
618541
619542 op->aio_result = static_cast <int64_t >(cqe->res );
620543 op->link .prev = nullptr ;
@@ -643,68 +566,15 @@ DiskHandler::handle_cqe(io_uring_cqe *cqe)
643566 }
644567}
645568
646- void
647- DiskHandler::service ()
648- {
649- io_uring_cqe *cqe = nullptr ;
650- io_uring_peek_cqe (&ring, &cqe);
651- while (cqe) {
652- handle_cqe (cqe);
653- io_uring_completions.fetch_add (1 );
654- io_uring_cqe_seen (&ring, cqe);
655-
656- cqe = nullptr ;
657- io_uring_peek_cqe (&ring, &cqe);
658- }
659- }
660-
661- void
662- DiskHandler::submit_and_wait (int ms)
663- {
664- ink_hrtime t = ink_hrtime_from_msec (ms);
665- timespec ts = ink_hrtime_to_timespec (t);
666- __kernel_timespec timeout = {ts.tv_sec , ts.tv_nsec };
667- io_uring_cqe *cqe = nullptr ;
668-
669- io_uring_submit_and_wait_timeout (&ring, &cqe, 1 , &timeout, nullptr );
670- while (cqe) {
671- handle_cqe (cqe);
672- io_uring_completions.fetch_add (1 );
673- io_uring_cqe_seen (&ring, cqe);
674-
675- cqe = nullptr ;
676- io_uring_peek_cqe (&ring, &cqe);
677- }
678- }
679-
680- int
681- DiskHandler::register_eventfd ()
682- {
683- int fd = eventfd (0 , EFD_NONBLOCK | EFD_CLOEXEC);
684-
685- io_uring_register_eventfd (&ring, fd);
686-
687- return fd;
688- }
689-
690- DiskHandler *
691- DiskHandler::local_context ()
692- {
693- // TODO(cmcfarlen): load config
694- thread_local DiskHandler threadContext;
695-
696- return &threadContext;
697- }
698-
699569int
700570ink_aio_read (AIOCallback *op_in, int /* fromAPI ATS_UNUSED */ )
701571{
702- EThread *t = this_ethread ();
703- AIOCallback *op = op_in;
572+ IOUringContext *ur = IOUringContext::local_context ();
573+ AIOCallback *op = op_in;
704574 while (op) {
705- io_uring_sqe *sqe = t->diskHandler ->next_sqe ();
575+ op->aiocb .this_op = op;
576+ io_uring_sqe *sqe = ur->next_sqe (&op->aiocb );
706577 io_uring_prep_read (sqe, op->aiocb .aio_fildes , op->aiocb .aio_buf , op->aiocb .aio_nbytes , op->aiocb .aio_offset );
707- io_uring_sqe_set_data (sqe, op);
708578 op->aiocb .aio_lio_opcode = LIO_READ;
709579 if (op->then ) {
710580 sqe->flags |= IOSQE_IO_LINK;
@@ -720,12 +590,12 @@ ink_aio_read(AIOCallback *op_in, int /* fromAPI ATS_UNUSED */)
720590int
721591ink_aio_write (AIOCallback *op_in, int /* fromAPI ATS_UNUSED */ )
722592{
723- EThread *t = this_ethread ();
724- AIOCallback *op = op_in;
593+ IOUringContext *ur = IOUringContext::local_context ();
594+ AIOCallback *op = op_in;
725595 while (op) {
726- io_uring_sqe *sqe = t->diskHandler ->next_sqe ();
596+ op->aiocb .this_op = op;
597+ io_uring_sqe *sqe = ur->next_sqe (&op->aiocb );
727598 io_uring_prep_write (sqe, op->aiocb .aio_fildes , op->aiocb .aio_buf , op->aiocb .aio_nbytes , op->aiocb .aio_offset );
728- io_uring_sqe_set_data (sqe, op);
729599 op->aiocb .aio_lio_opcode = LIO_WRITE;
730600 if (op->then ) {
731601 sqe->flags |= IOSQE_IO_LINK;
0 commit comments