diff --git a/configure b/configure index af56d450b2..df3e49e681 100755 --- a/configure +++ b/configure @@ -6,6 +6,7 @@ FLAGS="--enable-debug" TCP_UDP_BUFFER_SIZE=1500 INSTALL_PREFIX="/" KERNBUILDDIR="/lib/modules/`uname -r`/build" + BUILD_USER="y" # Option parsing diff --git a/kernel/dtcp-ps-default.c b/kernel/dtcp-ps-default.c index 92b5259ecc..4fbac7f90d 100644 --- a/kernel/dtcp-ps-default.c +++ b/kernel/dtcp-ps-default.c @@ -34,6 +34,7 @@ #include "dtp-utils.h" #include "du.h" #include "logs.h" +#include "rds/rtimer.h" int default_lost_control_pdu(struct dtcp_ps * ps) { @@ -110,9 +111,15 @@ int default_sender_ack(struct dtcp_ps * ps, seq_num_t seq_num) } spin_lock_bh(&dtcp->parent->sv_lock); + tr = dtcp->parent->sv->tr; - spin_unlock_bh(&dtcp->parent->sv_lock); + rtxq_ack(dtcp->parent->rtxq, seq_num, tr); + + /* Update LWE */ + dtcp->sv->snd_lft_win = seq_num + 1; + + spin_unlock_bh(&dtcp->parent->sv_lock); } return 0; @@ -166,7 +173,6 @@ int default_receiving_flow_control(struct dtcp_ps * ps, const struct pci * pci) return -1; LOG_DBG("DTCP Sending FC (CPU: %d)", smp_processor_id()); - dump_we(dtcp, &du->pci); if (dtcp_pdu_send(dtcp, du)) return -1; @@ -179,6 +185,8 @@ int default_rcvr_flow_control(struct dtcp_ps * ps, const struct pci * pci) struct dtcp * dtcp = ps->dm; seq_num_t LWE; seq_num_t RWE; + seq_num_t lwe_p; + seq_num_t rwe_p; if (!dtcp) { LOG_ERR("No instance passed, cannot run policy"); @@ -188,6 +196,8 @@ int default_rcvr_flow_control(struct dtcp_ps * ps, const struct pci * pci) LOG_ERR("No PCI passed, cannot run policy"); return -1; } + lwe_p = pci_control_new_left_wind_edge(pci); + rwe_p = pci_control_new_rt_wind_edge(pci); spin_lock_bh(&dtcp->parent->sv_lock); LWE = dtcp->parent->sv->rcv_left_window_edge; @@ -195,8 +205,10 @@ int default_rcvr_flow_control(struct dtcp_ps * ps, const struct pci * pci) RWE = dtcp->sv->rcvr_rt_wind_edge; spin_unlock_bh(&dtcp->parent->sv_lock); - LOG_DBG("DTCP: %pK", dtcp); - LOG_DBG("LWE: %u RWE: %u", LWE, RWE); + if (dtcp->sv->rendezvous_rcvr) { + LOG_DBG("DTCP: LWE: %u RWE: %u -- PCI: lwe: %u, rwe: %u", + LWE, RWE, lwe_p, rwe_p); + } return 0; } @@ -308,6 +320,9 @@ int default_rtt_estimator(struct dtcp_ps * ps, seq_num_t sn) if (start_time == 0) { LOG_DBG("RTTestimator: PDU %u has been retransmitted", sn); return 0; + } else if (start_time == -1) { + /* The PDU being ACKed is no longer in the RTX queue */ + return -1; } rtt_calculation(ps, start_time); @@ -338,6 +353,73 @@ int default_rtt_estimator_nortx(struct dtcp_ps * ps, seq_num_t sn) return 0; } +int default_rcvr_rendezvous(struct dtcp_ps * ps, const struct pci * pci) +{ + struct dtcp * dtcp; + seq_num_t rcv_lft, rcv_rt, snd_lft, snd_rt; + timeout_t rv; + + if (!ps) + return -1; + dtcp = ps->dm; + if (!dtcp) + return -1; + + spin_lock_bh(&dtcp->parent->sv_lock); + /* TODO: check if retransmission control enabled */ + + if (dtcp->parent->sv->window_based) { + rcv_lft = pci_control_new_left_wind_edge(pci); + rcv_rt = pci_control_new_rt_wind_edge(pci); + snd_lft = pci_control_my_left_wind_edge(pci); + snd_rt = pci_control_my_rt_wind_edge(pci); + + /* Check Consistency of the Receiving Window values with the + * values in the PDU. + */ + if (dtcp->sv->snd_lft_win != rcv_lft) { + /* TODO what to do? */ + } + + if (dtcp->sv->snd_rt_wind_edge != rcv_rt) { + /* TODO what to do? */ + } + LOG_DBG("RCVR rendezvous. RCV LWE: %u | RCV RWE: %u || " + "SND LWE: %u | SND RWE: %u", + dtcp->parent->sv->rcv_left_window_edge, + dtcp->sv->rcvr_rt_wind_edge, snd_lft, snd_rt); + + dtcp->sv->rcvr_rt_wind_edge = snd_lft + dtcp->sv->rcvr_credit; + } + + if (dtcp->sv->flow_ctl && dtcp->parent->sv->rate_based) { + /* TODO implement */ + } + + /* TODO Receiver is in the Rendezvous-at-the-receiver state. The next PDU is + * expected to have DRF bit set to true + */ + + dtcp->sv->rendezvous_rcvr = true; + + spin_unlock_bh(&dtcp->parent->sv_lock); + + /* Send a ControlAck PDU to confirm reception of RendezvousPDU via + * lastControlPDU value or send any other control PDU with Flow Control + * information opening the window. + */ + if (dtcp->sv->rcvr_credit != 0) { + rv = jiffies_to_msecs(dtcp->parent->sv->tr); + rtimer_start(&dtcp->rendezvous_rcv, rv); + } + + atomic_inc(&dtcp->cpdus_in_transit); + LOG_DBG("DTCP 1st Sending FC to stop Rendezvous (CPU: %d)", + smp_processor_id()); + + return ctrl_pdu_send(dtcp, PDU_TYPE_FC, true); +} + struct ps_base * dtcp_ps_default_create(struct rina_component * component) { struct dtcp * dtcp = dtcp_from_component(component); @@ -354,7 +436,7 @@ struct ps_base * dtcp_ps_default_create(struct rina_component * component) ps->lost_control_pdu = default_lost_control_pdu; if (ps->rtx_ctrl) { ps->rtt_estimator = default_rtt_estimator; - } else { + } else if (ps->flow_ctrl) { ps->rtt_estimator = default_rtt_estimator_nortx; } ps->retransmission_timer_expiry = NULL; @@ -376,6 +458,7 @@ struct ps_base * dtcp_ps_default_create(struct rina_component * component) ps->rcvr_control_ack = NULL; ps->no_rate_slow_down = NULL; ps->no_override_default_peak = NULL; + ps->rcvr_rendezvous = default_rcvr_rendezvous; return &ps->base; } diff --git a/kernel/dtcp-ps-default.h b/kernel/dtcp-ps-default.h index ca49019633..804fb1f4cb 100644 --- a/kernel/dtcp-ps-default.h +++ b/kernel/dtcp-ps-default.h @@ -57,4 +57,6 @@ int default_rtt_estimator(struct dtcp_ps * ps, seq_num_t sn); int default_rtt_estimator_nortx(struct dtcp_ps * ps, seq_num_t sn); +int default_rcvr_rendezvous(struct dtcp_ps * ps, const struct pci * pci); + #endif /* RINA_DTCP_PS_COMMON_H */ diff --git a/kernel/dtcp-ps.h b/kernel/dtcp-ps.h index 41c51ea66c..1ad0719d61 100644 --- a/kernel/dtcp-ps.h +++ b/kernel/dtcp-ps.h @@ -81,6 +81,8 @@ struct dtcp_ps { int (* rcvr_control_ack)(struct dtcp_ps * instance); int (* no_rate_slow_down)(struct dtcp_ps * instance); int (* no_override_default_peak)(struct dtcp_ps * instance); + int (* rcvr_rendezvous)(struct dtcp_ps * instance, + const struct pci * pci); /* Parametric policies. */ bool flow_ctrl; diff --git a/kernel/dtcp.c b/kernel/dtcp.c index 3f8c9bafa6..a3624be5b5 100644 --- a/kernel/dtcp.c +++ b/kernel/dtcp.c @@ -171,6 +171,91 @@ RINA_SYSFS_OPS(dtcp); RINA_ATTRS(dtcp, rtt, srtt, rttvar, ps_name); RINA_KTYPE(dtcp); +int ctrl_pdu_send(struct dtcp * dtcp, pdu_type_t type, bool direct) +{ + struct du * du; + + if (dtcp->sv->rendezvous_rcvr) { + LOG_DBG("Generating FC PDU in RV at RCVR"); + } + + du = pdu_ctrl_generate(dtcp, type); + if (!du) { + atomic_dec(&dtcp->cpdus_in_transit); + return -1; + } + if (dtcp->sv->rendezvous_rcvr) { + LOG_DBG("Generated FC PDU in RV at RCVR"); + } + + if (direct) { + if (dtp_pdu_send(dtcp->parent, dtcp->rmt, du)){ + atomic_dec(&dtcp->cpdus_in_transit); + du_destroy(du); + return -1; + } + } else { + if (dtcp_pdu_send(dtcp, du)){ + atomic_dec(&dtcp->cpdus_in_transit); + du_destroy(du); + return -1; + } + } + + atomic_dec(&dtcp->cpdus_in_transit); + + return 0; +} + +/* Runs the Rendezvous-at-receiver timer */ +#if LINUX_VERSION_CODE < KERNEL_VERSION(4,15,0) +static void tf_rendezvous_rcv(void * data) +#else +static void tf_rendezvous_rcv(struct timer_list * tl) +#endif +{ + struct dtcp * dtcp; + struct dtp * dtp; + bool start_rv_rcv_timer; + timeout_t rv; + + LOG_DBG("Running rendezvous-at-receiver timer..."); +#if LINUX_VERSION_CODE < KERNEL_VERSION(4,15,0) + dtcp = (struct dtcp *) data; +#else + dtcp = from_timer(dtcp, tl, rendezvous_rcv); +#endif + if (!dtcp) { + LOG_ERR("No dtcp to work with"); + return; + } + dtp = dtcp->parent; + + /* Check if the reliable ACK PDU needs to be sent*/ + start_rv_rcv_timer = false; + spin_lock_bh(&dtp->sv_lock); + if (dtcp->sv->rendezvous_rcvr) { + /* Start rendezvous-at-receiver timer, wait for Tr to fire */ + start_rv_rcv_timer = true; + rv = jiffies_to_msecs(dtp->sv->tr); + } + spin_unlock_bh(&dtp->sv_lock); + + if (start_rv_rcv_timer) { + LOG_DBG("DTCP Sending FC: RCV LWE: %u | RCV RWE: %u", + dtp->sv->rcv_left_window_edge, + dtcp->sv->rcvr_rt_wind_edge); + + /* Send rendezvous PDU and start timer */ + atomic_inc(&dtcp->cpdus_in_transit); + ctrl_pdu_send(dtcp, PDU_TYPE_FC, true); + + rtimer_start(&dtcp->rendezvous_rcv, rv); + } + + return; +} + static int push_pdus_rmt(struct dtcp * dtcp) { ASSERT(dtcp); @@ -261,6 +346,7 @@ static int populate_ctrl_pci(struct pci * pci, return -1; } return 0; + case PDU_TYPE_RENDEZVOUS: case PDU_TYPE_CACK: if (pci_control_last_seq_num_rcvd_set(pci, last_rcv_ctl_seq)) { LOG_ERR("Could not set last ctrl sn rcvd"); @@ -310,51 +396,6 @@ struct du * pdu_ctrl_generate(struct dtcp * dtcp, pdu_type_t type) } EXPORT_SYMBOL(pdu_ctrl_generate); -void dump_we(struct dtcp * dtcp, struct pci * pci) -{ - seq_num_t snd_rt_we; - seq_num_t snd_lf_we; - seq_num_t rcv_rt_we; - seq_num_t rcv_lf_we; - seq_num_t new_rt_we; - seq_num_t new_lf_we; - seq_num_t pci_seqn; - seq_num_t my_rt_we; - seq_num_t my_lf_we; - seq_num_t ack; - uint_t rate; - uint_t tframe; - - ASSERT(dtcp); - ASSERT(pci); - - spin_lock_bh(&dtcp->parent->sv_lock); - snd_rt_we = dtcp->sv->snd_rt_wind_edge; - snd_lf_we = dtcp->sv->snd_lft_win; - rcv_rt_we = dtcp->sv->rcvr_rt_wind_edge; - rcv_lf_we = dtcp->parent->sv->rcv_left_window_edge; - spin_unlock_bh(&dtcp->parent->sv_lock); - - new_rt_we = pci_control_new_rt_wind_edge(pci); - new_lf_we = pci_control_new_left_wind_edge(pci); - my_lf_we = pci_control_my_left_wind_edge(pci); - my_rt_we = pci_control_my_rt_wind_edge(pci); - pci_seqn = pci_sequence_number_get(pci); - ack = pci_control_ack_seq_num(pci); - rate = pci_control_sndr_rate(pci); - tframe = pci_control_time_frame(pci); - - LOG_DBG("SEQN: %u N/Ack: %u SndRWE: %u SndLWE: %u " - "RcvRWE: %u RcvLWE: %u " - "newRWE: %u newLWE: %u " - "myRWE: %u myLWE: %u " - "rate: %u tframe: %u", - pci_seqn, ack, snd_rt_we, snd_lf_we, rcv_rt_we, rcv_lf_we, - new_rt_we, new_lf_we, my_rt_we, my_lf_we, - rate, tframe); -} -EXPORT_SYMBOL(dump_we); - /* not a policy according to specs */ static int rcv_nack_ctl(struct dtcp * dtcp, struct du * du) @@ -386,7 +427,6 @@ static int rcv_nack_ctl(struct dtcp * dtcp, rcu_read_unlock(); LOG_DBG("DTCP received NACK (CPU: %d)", smp_processor_id()); - dump_we(dtcp, &du->pci); du_destroy(du); @@ -405,14 +445,22 @@ static int rcv_ack(struct dtcp * dtcp, rcu_read_lock(); ps = container_of(rcu_dereference(dtcp->base.ps), struct dtcp_ps, base); - if (ps->rtx_ctrl && ps->rtt_estimator) - ps->rtt_estimator(ps, pci_control_ack_seq_num(&du->pci)); + if (ps->rtx_ctrl && ps->rtt_estimator) { + if (ps->rtt_estimator(ps, + pci_control_ack_seq_num(&du->pci)) == -1) { + /* Don't process this PDU anymore, it is ACKing a + * DT PDU that is not in the RTX queue (was already + * discarded) + */ + rcu_read_unlock(); + return 0; + } + } + ret = ps->sender_ack(ps, seq); rcu_read_unlock(); - LOG_DBG("DTCP received ACK (CPU: %d)", smp_processor_id()); - dump_we(dtcp, &du->pci); du_destroy(du); @@ -424,6 +472,7 @@ static int update_window_and_rate(struct dtcp * dtcp, { uint_t rt; uint_t tf; + bool cancel_rv_timer; spin_lock_bh(&dtcp->parent->sv_lock); if(dtcp_window_based_fctrl(dtcp->cfg)) { @@ -446,8 +495,19 @@ static int update_window_and_rate(struct dtcp * dtcp, LOG_DBG("New SND RWE: %u, New LWE: %u, DTCP: %pK", dtcp->sv->snd_rt_wind_edge, dtcp->sv->snd_lft_win, dtcp); + + /* Check if rendezvous timer is active */ + cancel_rv_timer = false; + if (dtcp->sv->rendezvous_sndr) { + dtcp->sv->rendezvous_sndr = false; + cancel_rv_timer = true; + LOG_DBG("Stopping rendezvous timer"); + } spin_unlock_bh(&dtcp->parent->sv_lock); + if (cancel_rv_timer) + rtimer_stop(&dtcp->parent->timers.rendezvous); + push_pdus_rmt(dtcp); du_destroy(du); @@ -495,11 +555,22 @@ static int rcv_ack_and_flow_ctl(struct dtcp * dtcp, rcu_read_lock(); ps = container_of(rcu_dereference(dtcp->base.ps), struct dtcp_ps, base); + + if (ps->rtx_ctrl && ps->rtt_estimator) { + if (ps->rtt_estimator(ps, seq) == -1) { + /* Don't process this PDU anymore, it is ACKing a + * DT PDU that is not in the RTX queue (was already + * discarded) + */ + rcu_read_unlock(); + return 0; + } + } + /* This updates sender LWE */ - if (ps->rtx_ctrl && ps->rtt_estimator) - ps->rtt_estimator(ps, pci_control_ack_seq_num(&du->pci)); if (ps->sender_ack(ps, seq)) LOG_ERR("Could not update RTXQ and LWE"); + rcu_read_unlock(); LOG_DBG("DTCP received ACK-FC (CPU: %d)", smp_processor_id()); @@ -507,6 +578,28 @@ static int rcv_ack_and_flow_ctl(struct dtcp * dtcp, return update_window_and_rate(dtcp, du); } +static int rcvr_rendezvous(struct dtcp * dtcp, + struct du * du) +{ + struct dtcp_ps * ps; + int ret; + + rcu_read_lock(); + ps = container_of(rcu_dereference(dtcp->base.ps), + struct dtcp_ps, base); + if (ps->rcvr_rendezvous) + ret = ps->rcvr_rendezvous(ps, &du->pci); + else + ret = 0; + rcu_read_unlock(); + + LOG_DBG("DTCP received Rendezvous (CPU: %d)", smp_processor_id()); + + du_destroy(du); + + return ret; +} + int dtcp_common_rcv_control(struct dtcp * dtcp, struct du * du) { struct dtcp_ps * ps; @@ -585,12 +678,18 @@ int dtcp_common_rcv_control(struct dtcp * dtcp, struct du * du) case PDU_TYPE_ACK_AND_FC: ret = rcv_ack_and_flow_ctl(dtcp, du); break; + case PDU_TYPE_RENDEZVOUS: + ret = rcvr_rendezvous(dtcp, du); + break; default: ret = -1; break; } atomic_dec(&dtcp->cpdus_in_transit); + + dtp_send_pending_ctrl_pdus(dtcp->parent); + return ret; } @@ -687,11 +786,8 @@ EXPORT_SYMBOL(pdu_ctrl_type_get); int dtcp_ack_flow_control_pdu_send(struct dtcp * dtcp, seq_num_t seq) { - struct du * du; pdu_type_t type; - seq_num_t dbg_seq_num; - if (!dtcp) { LOG_ERR("No instance passed, cannot run policy"); return -1; @@ -705,28 +801,20 @@ int dtcp_ack_flow_control_pdu_send(struct dtcp * dtcp, seq_num_t seq) return 0; } - du = pdu_ctrl_generate(dtcp, type); - if (!du) { - atomic_dec(&dtcp->cpdus_in_transit); - return -1; - } - - dbg_seq_num = pci_sequence_number_get(&du->pci); - - LOG_DBG("DTCP Sending ACK %u (CPU: %d)", dbg_seq_num, smp_processor_id()); - dump_we(dtcp, &du->pci); + LOG_DBG("DTCP Sending ACK (CPU: %d)", smp_processor_id()); - if (dtcp_pdu_send(dtcp, du)){ - atomic_dec(&dtcp->cpdus_in_transit); - return -1; - } - - atomic_dec(&dtcp->cpdus_in_transit); - - return 0; + return ctrl_pdu_send(dtcp, type, false); } EXPORT_SYMBOL(dtcp_ack_flow_control_pdu_send); +int dtcp_rendezvous_pdu_send(struct dtcp * dtcp) +{ + atomic_inc(&dtcp->cpdus_in_transit); + LOG_DBG("DTCP Sending Rendezvous (CPU: %d)", smp_processor_id()); + return ctrl_pdu_send(dtcp, PDU_TYPE_RENDEZVOUS, true); +} +EXPORT_SYMBOL(dtcp_rendezvous_pdu_send); + static struct dtcp_sv default_sv = { .pdus_per_time_unit = 0, .next_snd_ctl_seq = 0, @@ -748,6 +836,8 @@ static struct dtcp_sv default_sv = { .flow_ctl = 0, .rtt = 0, .srtt = 0, + .rendezvous_sndr = false, + .rendezvous_rcvr = false, }; /* FIXME: this should be completed with other parameters from the config */ @@ -916,6 +1006,9 @@ int dtcp_select_policy_set(struct dtcp * dtcp, ps->rtt_estimator = default_rtt_estimator_nortx; } } + if (!ps->rcvr_rendezvous) { + ps->rcvr_rendezvous = default_rcvr_rendezvous; + } } base_select_policy_set_finish(&dtcp->base, &trans); @@ -1028,7 +1121,7 @@ EXPORT_SYMBOL(dtcp_set_policy_set_param); struct dtcp * dtcp_create(struct dtp * dtp, struct rmt * rmt, struct dtcp_config * dtcp_cfg, - struct robject * parent) + struct robject * parent) { struct dtcp * tmp; string_t * ps_name; @@ -1073,6 +1166,7 @@ struct dtcp * dtcp_create(struct dtp * dtp, tmp->cfg = dtcp_cfg; tmp->rmt = rmt; atomic_set(&tmp->cpdus_in_transit, 0); + rtimer_init(tf_rendezvous_rcv, &tmp->rendezvous_rcv, tmp); rina_component_init(&tmp->base); @@ -1134,8 +1228,9 @@ int dtcp_destroy(struct dtcp * instance) if (instance->sv) rkfree(instance->sv); if (instance->cfg) dtcp_config_destroy(instance->cfg); + rtimer_destroy(&instance->rendezvous_rcv); rina_component_fini(&instance->base); - robject_del(&instance->robj); + robject_del(&instance->robj); rkfree(instance); LOG_DBG("Instance %pK destroyed successfully", instance); diff --git a/kernel/dtcp.h b/kernel/dtcp.h index 4f58dfdafb..2225a71a58 100644 --- a/kernel/dtcp.h +++ b/kernel/dtcp.h @@ -54,6 +54,7 @@ int dtcp_common_rcv_control(struct dtcp * dtcp, /* Used by DTP to have an ack-control PDU sent by DTCP */ int dtcp_ack_flow_control_pdu_send(struct dtcp * instance, seq_num_t seq); +int dtcp_rendezvous_pdu_send(struct dtcp * instance); /* begin SDK */ int dtcp_select_policy_set(struct dtcp * dtcp, const string_t *path, @@ -68,7 +69,6 @@ struct dtcp *dtcp_from_component(struct rina_component *component); struct dtcp_ps *dtcp_ps_get(struct dtcp *dtcp); pdu_type_t pdu_ctrl_type_get(struct dtcp *dtcp, seq_num_t seq); struct du *pdu_ctrl_create_ni(struct dtcp *dtcp, pdu_type_t type); -void dump_we(struct dtcp *dtcp, struct pci *pci); int dtcp_pdu_send(struct dtcp *dtcp, struct du *du); struct du *pdu_ctrl_ack_create(struct dtcp *dtcp, seq_num_t last_ctrl_seq_rcvd, @@ -82,7 +82,7 @@ bool dtcp_rate_exceeded(struct dtcp *dtcp, int send); /* end SDK */ int pdus_sent_in_t_unit_set(struct dtcp *dtcp, uint_t s); - +int ctrl_pdu_send(struct dtcp * dtcp, pdu_type_t type, bool direct); /*FIXME: wrapper to be called by dtp in the post_worker */ int dtcp_sending_ack_policy(struct dtcp *dtcp); #endif diff --git a/kernel/dtp-ps-default.c b/kernel/dtp-ps-default.c index b5dc0fafa1..395da090e1 100644 --- a/kernel/dtp-ps-default.c +++ b/kernel/dtp-ps-default.c @@ -41,6 +41,7 @@ int default_transmission_control(struct dtp_ps * ps, struct du * du) { struct dtp * dtp; + struct dtcp * dtcp; dtp = ps->dm; if (!dtp) { @@ -48,12 +49,14 @@ int default_transmission_control(struct dtp_ps * ps, struct du * du) du_destroy(du); return -1; } + dtcp = dtp->dtcp; /* Post SDU to RMT */ LOG_DBG("defaultTxPolicy - sending to rmt"); spin_lock_bh(&dtp->sv_lock); dtp->sv->max_seq_nr_sent = pci_sequence_number_get(&du->pci); + dtcp->sv->snd_lft_win = dtp->sv->max_seq_nr_sent; spin_unlock_bh(&dtp->sv_lock); LOG_DBG("local_soft_irq_pending: %d", local_softirq_pending()); @@ -170,6 +173,7 @@ int default_receiver_inactivity_timer(struct dtp_ps * ps) dtcp->sv->rcvr_rt_wind_edge = 0; dtp->sv->rcv_left_window_edge = 0; dtp_squeue_flush(dtp); + dtcp->sv->rendezvous_rcvr = false; dtp->sv->drf_required = true; spin_unlock_bh(&dtp->sv_lock); @@ -208,6 +212,7 @@ int default_sender_inactivity_timer(struct dtp_ps * ps) snd_rt_win = dtcp->sv->snd_rt_wind_edge; next_send = dtp->sv->seq_nr_to_send; dtcp->sv->snd_rt_wind_edge = next_send + init_credit; + dtcp->sv->rendezvous_sndr = false; if (dtp->rttq) { rttq_flush(dtp->rttq); } diff --git a/kernel/dtp-utils.c b/kernel/dtp-utils.c index d87dd01ac8..b4cc2e6830 100644 --- a/kernel/dtp-utils.c +++ b/kernel/dtp-utils.c @@ -37,8 +37,6 @@ #include "rmt.h" #include "dtp-ps.h" -#define RTIMER_ENABLED 1 - /* Maximum retransmission time is 60 seconds */ #define MAX_RTX_WAIT_TIME msecs_to_jiffies(60000) @@ -223,7 +221,6 @@ void cwq_deliver(struct cwq * queue, struct dtp * dtp, struct rmt * rmt) { - struct rtxq * rtxq; struct dtcp * dtcp; struct du * tmp; bool rtx_ctrl; @@ -255,8 +252,7 @@ void cwq_deliver(struct cwq * queue, return; } if (rtx_ctrl) { - rtxq = dtp->rtxq; - if (!rtxq) { + if (!dtp->rtxq) { spin_unlock(&queue->lock); LOG_ERR("Couldn't find the RTX queue"); return; @@ -266,7 +262,7 @@ void cwq_deliver(struct cwq * queue, spin_unlock(&queue->lock); return; } - rtxq_push_ni(rtxq, tmp); + rtxq_push_ni(dtp->rtxq, tmp); } else if (dtp->rttq) { if (rttq_push(dtp->rttq, pci_sequence_number_get(du_pci(du)))) { @@ -289,6 +285,7 @@ void cwq_deliver(struct cwq * queue, } } dtp->sv->max_seq_nr_sent = pci_sequence_number_get(&du->pci); + dtcp->sv->snd_lft_win = dtp->sv->max_seq_nr_sent; spin_unlock(&queue->lock); dtp_pdu_send(dtp, rmt, du); @@ -330,35 +327,6 @@ void cwq_deliver(struct cwq * queue, return; } -/* NOTE: used only by dump_we */ -seq_num_t cwq_peek(struct cwq * queue) -{ - seq_num_t ret; - struct du * du; - - spin_lock_bh(&queue->lock); - if (rqueue_is_empty(queue->q)){ - spin_unlock_bh(&queue->lock); - return 0; - } - - du = (struct du *) rqueue_head_pop(queue->q); - if (!du) { - spin_unlock_bh(&queue->lock); - return -1; - } - - ret = pci_sequence_number_get(&du->pci); - if (rqueue_head_push_ni(queue->q, du)) { - spin_unlock_bh(&queue->lock); - du_destroy(du); - return ret; - } - spin_unlock_bh(&queue->lock); - - return ret; -} - static struct rtxq_entry * rtxq_entry_create_gfp(struct du * du, gfp_t flag) { struct rtxq_entry * tmp; @@ -415,7 +383,7 @@ static struct rtxqueue * rtxqueue_create_gfp(gfp_t flags) static struct rtxqueue * rtxqueue_create(void) { return rtxqueue_create_gfp(GFP_KERNEL); } -static int rtxqueue_flush(struct rtxqueue * q) +static void rtxqueue_flush(struct rtxqueue * q) { struct rtxq_entry * cur, * n; @@ -425,8 +393,6 @@ static int rtxqueue_flush(struct rtxqueue * q) rtxq_entry_destroy(cur); q->len --; } - - return 0; } static int rtxqueue_destroy(struct rtxqueue * q) @@ -453,7 +419,7 @@ static int rtxqueue_entries_ack(struct rtxqueue * q, seq = pci_sequence_number_get(&cur->du->pci); if (seq <= seq_num) { - LOG_DBG("Seq num acked: %u", seq); + LOG_DBG("Seq num acked: %u. Size %d", seq, q->len); rtxq_entry_destroy(cur); q->len--; } else @@ -537,19 +503,21 @@ unsigned long rtxqueue_entry_timestamp(struct rtxqueue * q, seq_num_t sn) list_for_each_entry(cur, &q->head, next) { csn = pci_sequence_number_get(&cur->du->pci); if (csn > sn) { - LOG_WARN("PDU not in rtxq (duplicate ACK). Received " - "SN: %u, RtxQ SN: %u", sn, csn); - return 0; + LOG_WARN("PDU not in rtxq. Received " + "SN: %u, RtxQ SN: %u. Size: %u", + sn, csn, q->len); + return -1; } if (csn == sn) { /* Ignore time_stamps from retransmitted PDUs */ if (cur->retries != 0) return 0; + return cur->time_stamp; } } - return 0; + return -1; } /* push in seq_num order */ @@ -580,12 +548,12 @@ static int rtxqueue_push_ni(struct rtxqueue * q, struct du * du) if (csn == psn) { LOG_ERR("Another PDU with the same seq_num %u, is in " "the rtx queue!", csn); - return 0; + return -1; } if (csn > psn) { list_add_tail(&tmp->next, &q->head); q->len++; - LOG_DBG("Last PDU with seqnum: %u push to rtxq at: %pk", + LOG_DBG("Last PDU with seqnum: %u push to rtxq at: %pk", csn, q); return 0; } @@ -595,20 +563,20 @@ static int rtxqueue_push_ni(struct rtxqueue * q, struct du * du) if (csn == psn) { LOG_ERR("Another PDU with the same seq_num is in " "the rtx queue!"); - return 0; + return -1; } if (csn > psn) { list_add(&tmp->next, &cur->next); q->len++; - LOG_DBG("Middle PDU with seqnum: %u push to " + LOG_DBG("Middle PDU with seqnum: %u push to " "rtxq at: %pk", csn, q); return 0; } } - LOG_DBG("PDU not pushed!"); + LOG_ERR("PDU not pushed!"); - return 0; + return -1; } /* Exponential backoff after each retransmission */ @@ -623,46 +591,53 @@ static unsigned long time_to_rtx(struct rtxq_entry * cur, unsigned int tr) return cur->time_stamp + rtx_wtime; } -static int rtxqueue_rtx(struct rtxqueue * q, - unsigned int tr, - struct dtp * dtp, - struct rmt * rmt, - uint_t data_rtx_max) +/* Called while holding the rtx queueu lock */ +static int rtxqueue_rtx(struct rtxq * q, + unsigned int tr, + struct dtp * dtp, + uint_t data_rtx_max) { struct rtxq_entry * cur, * n; struct du * tmp; seq_num_t seq = 0; // Used by rbfc. struct dtcp * dtcp; - int sz; - uint_t sc; + int sz, res; + uint_t sc, dropped_sn, dropped_pdus, cwq_max_size; + bool start_rv_timer; + timeout_t rv; ASSERT(q); ASSERT(dt); ASSERT(rmt); dtcp = dtp->dtcp; + dropped_pdus = 0; + dropped_sn = 0; - list_for_each_entry_safe(cur, n, &q->head, next) { + list_for_each_entry_safe(cur, n, &q->queue->head, next) { seq = pci_sequence_number_get(&cur->du->pci); + LOG_DBG("Checking RTX PDU %u, now: %lu >?< %lu + %u", seq, jiffies, cur->time_stamp, tr); + if (time_before_eq(time_to_rtx(cur, tr), jiffies)) { cur->retries++; - cur->time_stamp = jiffies; if (cur->retries >= data_rtx_max) { - LOG_ERR("Maximum number of rtx has been " - "achieved for SeqN %u. Can't " - "maintain QoS", seq); + LOG_WARN("Maximum number of rtx has been " + "achieved for SeqN %u. Dropping " + "PDU, data is lost", seq); rtxq_entry_destroy(cur); - q->len--; - q->drop_pdus++; + q->queue->len--; + q->queue->drop_pdus++; + dropped_pdus++; + if (seq > dropped_sn) + dropped_sn = seq; continue; } - if(dtp && - dtcp && - dtcp_rate_based_fctrl(dtcp->cfg)) { + if (dtp && dtcp && + dtcp_rate_based_fctrl(dtcp->cfg)) { sz = du_data_len(cur->du); sc = dtcp->sv->pdus_sent_in_time_unit; @@ -681,11 +656,15 @@ static int rtxqueue_rtx(struct rtxqueue * q, break; } } + tmp = du_dup_ni(cur->du); - if (dtp_pdu_send(dtp, - rmt, - tmp)) - continue; + + spin_unlock(&q->lock); + res = dtp_pdu_send(dtp, q->rmt, tmp); + spin_lock(&q->lock); + + if (res) continue; + LOG_DBG("Retransmitted PDU with seqN %u", seq); } else { LOG_DBG("RTX timer: from here PDUs still have time," @@ -696,6 +675,46 @@ static int rtxqueue_rtx(struct rtxqueue * q, LOG_DBG("RTXQ %pK has delivered until %u", q, seq); + start_rv_timer = false; + if (dropped_pdus) { + spin_lock_bh(&dtcp->parent->sv_lock); + + /* Move LWE */ + if (dtcp->sv->snd_lft_win < dropped_sn) + dtcp->sv->snd_lft_win = dropped_sn; + + /* TODO, check if we need to move RWE? */ + + /* If RTXQ is empty and CWQ is full, activate rendezvous */ + cwq_max_size = dtcp_max_closed_winq_length(dtcp->cfg); + if (q->queue->len == 0 && + cwq_size(dtcp->parent->cwq) == cwq_max_size) { + /* Check if rendezvous PDU needs to be sent*/ + if (!dtcp->sv->rendezvous_sndr) { + dtcp->sv->rendezvous_sndr = true; + + LOG_DBG("RV at the sender (CPU: %d)", + smp_processor_id()); + + /* Start rendezvous timer, wait for Tr to fire */ + start_rv_timer = true; + rv = jiffies_to_msecs(dtcp->parent->sv->tr); + } + } + + spin_unlock_bh(&dtcp->parent->sv_lock); + } + + if (start_rv_timer) { + LOG_DBG("Window is closed. SND LWE: %u | SND RWE: %u | TR: %u", + dtcp->sv->snd_lft_win, + dtcp->sv->snd_rt_wind_edge, + dtcp->parent->sv->tr); + + /* Send rendezvous PDU and start time */ + rtimer_start(&dtcp->parent->timers.rendezvous, rv); + } + return 0; } @@ -733,22 +752,14 @@ static void rtx_timer_func(struct timer_list * tl) tr = dtp->sv->tr; spin_lock(&q->lock); - if (rtxqueue_rtx(q->queue, - tr, - dtp, - q->rmt, + if (rtxqueue_rtx(q, tr, dtp, dtp->dtcp->cfg->rxctrl_cfg->data_retransmit_max)) LOG_ERR("RTX failed"); -#if RTIMER_ENABLED if (!rtxqueue_empty(q->queue)) rtimer_restart(&dtp->timers.rtx, tr); - LOG_DBG("RTX timer ending..."); -#endif spin_unlock(&q->lock); - - return; } int rtxq_destroy(struct rtxq * q) @@ -781,10 +792,7 @@ struct rtxq * rtxq_create(struct dtp * dtp, if (!tmp) return NULL; -#if RTIMER_ENABLED - //data->data_retransmit_max = dtcp_cfg->rxctrl_cfg->data_retransmit_max; rtimer_init(rtx_timer_func, &dtp->timers.rtx, dtp); -#endif tmp->queue = rtxqueue_create(); if (!tmp->queue) { @@ -846,27 +854,32 @@ EXPORT_SYMBOL(rtxq_entry_timestamp); int rtxq_push_ni(struct rtxq * q, struct du * du) { + int res; + spin_lock_bh(&q->lock); -#if RTIMER_ENABLED + /* is the first transmitted PDU */ rtimer_start(&q->parent->timers.rtx, q->parent->sv->tr); -#endif - rtxqueue_push_ni(q->queue, du); + + res = rtxqueue_push_ni(q->queue, du); + spin_unlock_bh(&q->lock); - return 0; + + return res; } +EXPORT_SYMBOL(rtxq_push_ni); int rtxq_flush(struct rtxq * q) { if (!q || !q->queue) return -1; -#if RTIMER_ENABLED rtimer_stop(&q->parent->timers.rtx); -#endif + spin_lock(&q->lock); rtxqueue_flush(q->queue); spin_unlock(&q->lock); + return 0; } @@ -876,17 +889,20 @@ int rtxq_ack(struct rtxq * q, seq_num_t seq_num, unsigned int tr) { + int res; + if (!q) return -1; spin_lock_bh(&q->lock); - rtxqueue_entries_ack(q->queue, seq_num); -#if RTIMER_ENABLED + + res = rtxqueue_entries_ack(q->queue, seq_num); + rtimer_restart(&q->parent->timers.rtx, tr); -#endif + spin_unlock_bh(&q->lock); - return 0; + return res; } EXPORT_SYMBOL(rtxq_ack); @@ -915,12 +931,11 @@ int rtxq_nack(struct rtxq * q, q->rmt, seq_num, data_retransmit_max); -#if RTIMER_ENABLED if (rtimer_restart(&q->parent->timers.rtx, tr)) { spin_unlock(&q->lock); return -1; } -#endif + spin_unlock(&q->lock); return 0; @@ -935,11 +950,16 @@ int dtp_pdu_send(struct dtp * dtp, /* Remote flow case */ if (pci_source(&du->pci) != pci_destination(&du->pci)) { - if (rmt_send(rmt, du)) { - LOG_ERR("Problems sending PDU to RMT"); - return -1; - } - return 0; + if (dtp->dtcp->sv->rendezvous_rcvr) { + LOG_INFO("Sending to RMT in RV at RCVR"); + } + + if (rmt_send(rmt, du)) { + LOG_ERR("Problems sending PDU to RMT"); + return -1; + } + + return 0; } /* Local flow case */ diff --git a/kernel/dtp-utils.h b/kernel/dtp-utils.h index 59ac66c72b..8bdeefb784 100644 --- a/kernel/dtp-utils.h +++ b/kernel/dtp-utils.h @@ -44,7 +44,6 @@ ssize_t cwq_size(struct cwq * q); void cwq_deliver(struct cwq * queue, struct dtp * dtp, struct rmt * rmt); -seq_num_t cwq_peek(struct cwq * queue); struct rtxq; @@ -60,8 +59,6 @@ int rtxq_drop_pdus(struct rtxq * q); unsigned long rtxq_entry_timestamp(struct rtxq * q, seq_num_t sn); int rtxq_entry_destroy(struct rtxq_entry * entry); -int rtxq_push_sn(struct rtxq * q, - seq_num_t sn); int rtxq_push_ni(struct rtxq * q, struct du * du); int rtxq_ack(struct rtxq * q, diff --git a/kernel/dtp.c b/kernel/dtp.c index 221a67f8f0..7464dd6bd4 100644 --- a/kernel/dtp.c +++ b/kernel/dtp.c @@ -493,6 +493,47 @@ static void tf_receiver_inactivity(struct timer_list * tl) return; } +/* Runs the Rendezvous timer */ +#if LINUX_VERSION_CODE < KERNEL_VERSION(4,15,0) +static void tf_rendezvous(void * data) +#else +static void tf_rendezvous(struct timer_list * tl) +#endif +{ + struct dtp * dtp; + bool start_rv_timer; + timeout_t rv; + + LOG_DBG("Running rendezvous timer..."); +#if LINUX_VERSION_CODE < KERNEL_VERSION(4,15,0) + dtp = (struct dtp *) data; +#else + dtp = from_timer(dtp, tl, timers.rendezvous); +#endif + if (!dtp) { + LOG_ERR("No dtp to work with"); + return; + } + + /* Check if rendezvous PDU needs to be send*/ + start_rv_timer = false; + spin_lock_bh(&dtp->sv_lock); + if (dtp->dtcp->sv->rendezvous_sndr) { + /* Start rendezvous timer, wait for Tr to fire */ + start_rv_timer = true; + rv = jiffies_to_msecs(dtp->sv->tr); + } + spin_unlock_bh(&dtp->sv_lock); + + if (start_rv_timer) { + /* Send rendezvous PDU and start timer */ + dtcp_rendezvous_pdu_send(dtp->dtcp); + rtimer_start(&dtp->timers.rendezvous, rv); + } + + return; +} + /* * NOTE: * AF is the factor to which A is divided in order to obtain the @@ -655,24 +696,20 @@ static void tf_a(struct timer_list * tl) LOG_ERR("sending_ack failed"); rtimer_start(&dtp->timers.a, a/AF); } - while (!ringq_is_empty(dtp->to_send)) { - struct du * pdu_ctrl; - pdu_ctrl = ringq_pop(dtp->to_send); - if (pdu_ctrl) { - dtp_pdu_send(dtp, dtp->rmt, pdu_ctrl); - } - } + + dtp_send_pending_ctrl_pdus(dtp); } else { pci = process_A_expiration(dtp, dtcp); - if (pci) pci_release(pci); -#if DTP_INACTIVITY_TIMERS_ENABLE + + if (pci) + pci_release(pci); + if (rtimer_restart(&dtp->timers.sender_inactivity, 3 * (mpl + r + a))) { LOG_ERR("Failed to start sender_inactiviy timer"); rtimer_start(&dtp->timers.a, a/AF); return; } -#endif } if (!seqq_is_empty(dtp->seqq)) { @@ -1034,6 +1071,7 @@ struct dtp * dtp_create(struct efcp * efcp, rtimer_init(tf_receiver_inactivity, &dtp->timers.receiver_inactivity, dtp); rtimer_init(tf_a, &dtp->timers.a, dtp); rtimer_init(tf_rate_window, &dtp->timers.rate_window, dtp); + rtimer_init(tf_rendezvous, &dtp->timers.rendezvous, dtp); dtp->to_post = ringq_create(TO_POST_LENGTH); if (!dtp->to_post) { @@ -1142,6 +1180,8 @@ int dtp_destroy(struct dtp * instance) rtimer_destroy(&instance->timers.receiver_inactivity); rtimer_destroy(&instance->timers.rate_window); rtimer_destroy(&instance->timers.rtx); + rtimer_destroy(&instance->timers.rendezvous); + if (instance->to_post) ringq_destroy(instance->to_post, (void (*)(void *)) du_destroy); if (instance->to_send) ringq_destroy(instance->to_send, @@ -1201,29 +1241,40 @@ static bool window_is_closed(struct dtp * dtp, return retval; } +void dtp_send_pending_ctrl_pdus(struct dtp * dtp) +{ + struct du * du_ctrl; + + while (!ringq_is_empty(dtp->to_send)) { + du_ctrl = ringq_pop(dtp->to_send); + if (du_ctrl && dtp_pdu_send(dtp, dtp->rmt, du_ctrl)) { + LOG_ERR("Problems sending DTCP Ctrl PDU"); + du_destroy(du_ctrl); + } + } +} +EXPORT_SYMBOL(dtp_send_pending_ctrl_pdus); + int dtp_write(struct dtp * instance, struct du * du) { struct dtcp * dtcp; - struct rtxq * rtxq; struct du * cdu; struct dtp_ps * ps; seq_num_t sn, csn; struct efcp * efcp; int sbytes; uint_t sc; - timeout_t mpl, r, a; + timeout_t mpl, r, a, rv; + bool start_rv_timer; efcp = instance->efcp; dtcp = instance->dtcp; -#if DTP_INACTIVITY_TIMERS_ENABLE /* Stop SenderInactivityTimer */ if (rtimer_stop(&instance->timers.sender_inactivity)) { LOG_ERR("Failed to stop timer"); } -#endif - /* Step 1: Delimiting (fragmentation/reassembly) + Protection */ /* * FIXME: The two ways of carrying out flow control @@ -1270,7 +1321,7 @@ int dtp_write(struct dtp * instance, sn = dtcp->sv->snd_lft_win; if (instance->sv->drf_flag || - ( (sn == (csn - 1)) && instance->sv->rexmsn_ctrl) ) { + ((sn == (csn - 1)) && instance->sv->rexmsn_ctrl)) { pdu_flags_t pci_flags; pci_flags = pci_flags_get(&du->pci); pci_flags |= PDU_FLAGS_DATA_RUN; @@ -1298,6 +1349,44 @@ int dtp_write(struct dtp * instance, goto stats_err_exit; } rcu_read_unlock(); + + /* Check if rendezvous PDU needs to be sent*/ + start_rv_timer = false; + spin_lock_bh(&instance->sv_lock); + + /* If there is rtx control and PDUs at the rtxQ + * don't enter the rendezvous state (DTCP will keep + * retransmitting the PDUs until acked or the + * retransmission timeout fires) + */ + if (instance->sv->rexmsn_ctrl && + rtxq_size(instance->rtxq) > 0) { + LOG_DBG("Window is closed but there are PDUs at the RTXQ"); + spin_unlock_bh(&instance->sv_lock); + return 0; + } + + /* Else, check if rendezvous PDU needs to be sent */ + if (!instance->dtcp->sv->rendezvous_sndr) { + instance->dtcp->sv->rendezvous_sndr = true; + + LOG_DBG("RV at the sender %u (CPU: %d)", csn, smp_processor_id()); + + /* Start rendezvous timer, wait for Tr to fire */ + start_rv_timer = true; + rv = jiffies_to_msecs(instance->sv->tr); + } + spin_unlock_bh(&instance->sv_lock); + + if (start_rv_timer) { + LOG_DBG("Window is closed. SND LWE: %u | SND RWE: %u | TR: %u", + instance->dtcp->sv->snd_lft_win, + instance->dtcp->sv->snd_rt_wind_edge, + instance->sv->tr); + /* Send rendezvous PDU and start time */ + rtimer_start(&instance->timers.rendezvous, rv); + } + return 0; } if(instance->sv->rate_based) { @@ -1315,18 +1404,16 @@ int dtp_write(struct dtp * instance, } } if (instance->sv->rexmsn_ctrl) { - /* FIXME: Add timer for PDU */ - rtxq = instance->rtxq; cdu = du_dup_ni(du); if (!cdu) { - LOG_ERR("Failed to copy PDU"); - LOG_ERR("PDU type: %d", pci_type(&du->pci)); - goto pdu_stats_err_exit; + LOG_ERR("Failed to copy PDU. PDU type: %d", + pci_type(&du->pci)); + goto pdu_stats_err_exit; } - if (rtxq_push_ni(rtxq, cdu)) { + if (rtxq_push_ni(instance->rtxq, cdu)) { LOG_ERR("Couldn't push to rtxq"); - goto pdu_stats_err_exit; + goto pdu_stats_err_exit; } } else if (instance->rttq) { if (rttq_push(instance->rttq, csn)) { @@ -1336,22 +1423,22 @@ int dtp_write(struct dtp * instance, if (ps->transmission_control(ps, du)) { LOG_ERR("Problems with transmission control"); - goto stats_err_exit; + goto stats_err_exit; } rcu_read_unlock(); spin_lock_bh(&instance->sv_lock); stats_inc_bytes(tx, instance->sv, sbytes); - spin_unlock_bh(&instance->sv_lock); -#if DTP_INACTIVITY_TIMERS_ENABLE + spin_unlock_bh(&instance->sv_lock); + /* Start SenderInactivityTimer */ if (rtimer_restart(&instance->timers.sender_inactivity, 3 * (mpl + r + a ))) { LOG_ERR("Failed to start sender_inactiviy timer"); - goto stats_nounlock_err_exit; + goto stats_nounlock_err_exit; return -1; } -#endif + return 0; } @@ -1452,13 +1539,15 @@ int dtp_receive(struct dtp * instance, LOG_DBG("DTP receive started..."); dtcp = instance->dtcp; - efcp = instance->efcp; + efcp = instance->efcp; spin_lock_bh(&instance->sv_lock); + a = instance->sv->A; r = instance->sv->R; mpl = instance->sv->MPL; LWE = instance->sv->rcv_left_window_edge; + rcu_read_lock(); ps = container_of(rcu_dereference(instance->base.ps), struct dtp_ps, base); @@ -1478,7 +1567,6 @@ int dtp_receive(struct dtp * instance, seq_num, smp_processor_id()); if (instance->sv->drf_required) { -#if DTP_INACTIVITY_TIMERS_ENABLE /* Start ReceiverInactivityTimer */ if (rtimer_restart(&instance->timers.receiver_inactivity, 2 * (mpl + r + a))) { @@ -1487,36 +1575,39 @@ int dtp_receive(struct dtp * instance, du_destroy(du); return -1; } -#endif + if ((pci_flags_get(&du->pci) & PDU_FLAGS_DATA_RUN)) { - instance->sv->drf_required = false; + LOG_DBG("Data Run Flag"); + + instance->sv->drf_required = false; instance->sv->rcv_left_window_edge = seq_num; dtp_squeue_flush(instance); if (instance->rttq) { rttq_flush(instance->rttq); } + spin_unlock_bh(&instance->sv_lock); + if (dtcp) { if (dtcp_sv_update(dtcp, &du->pci)) { LOG_ERR("Failed to update dtcp sv"); return -1; } } - while (!ringq_is_empty(instance->to_send)) { - struct du * du_ctrl = ringq_pop(instance->to_send); - if (du_ctrl) { - dtp_pdu_send(instance, instance->rmt, du_ctrl); - } - } + + dtp_send_pending_ctrl_pdus(instance); pdu_post(instance, du); stats_inc_bytes(rx, instance->sv, sbytes); - LOG_DBG("Data run flag DRF"); + return 0; } + LOG_ERR("Expecting DRF but not present, dropping PDU %d...", seq_num); + stats_inc(drop, instance->sv); spin_unlock_bh(&instance->sv_lock); + du_destroy(du); return 0; } @@ -1526,14 +1617,15 @@ int dtp_receive(struct dtp * instance, * no need to check presence of in_order or dtcp because in case * they are not, LWE is not updated and always 0 */ - - if ((seq_num <= LWE) || (is_fc_overrun(instance, dtcp, seq_num, sbytes))) - { + if ((seq_num <= LWE) || + (is_fc_overrun(instance, dtcp, seq_num, sbytes))) { /* Duplicate PDU or flow control overrun */ - LOG_ERR("Duplicate PDU or flow control overrun. SN: %u, LWE:%u", + LOG_ERR("Duplicate PDU or flow control overrun.SN: %u, LWE:%u", seq_num, LWE); stats_inc(drop, instance->sv); + spin_unlock_bh(&instance->sv_lock); + du_destroy(du); if (dtcp) { @@ -1549,7 +1641,6 @@ int dtp_receive(struct dtp * instance, return 0; } -#if DTP_INACTIVITY_TIMERS_ENABLE /* Start ReceiverInactivityTimer */ if (rtimer_restart(&instance->timers.receiver_inactivity, 2 * (mpl + r + a ))) { @@ -1558,7 +1649,13 @@ int dtp_receive(struct dtp * instance, du_destroy(du); return -1; } -#endif + + /* This is an acceptable data PDU, stop reliable ACK timer */ + if (dtcp->sv->rendezvous_rcvr) { + LOG_DBG("RV at receiver put to false"); + dtcp->sv->rendezvous_rcvr = false; + rtimer_stop(&dtcp->rendezvous_rcv); + } if (!a) { bool set_lft_win_edge; @@ -1585,13 +1682,7 @@ int dtp_receive(struct dtp * instance, LOG_ERR("Failed to update dtcp sv"); goto fail; } - while (!ringq_is_empty(instance->to_send)) { - struct du * du_ctrl; - du_ctrl = ringq_pop(instance->to_send); - if (du_ctrl) { - dtp_pdu_send(instance, instance->rmt, du_ctrl); - } - } + dtp_send_pending_ctrl_pdus(instance); if (!set_lft_win_edge) { du_destroy(du); return 0; @@ -1630,6 +1721,7 @@ int dtp_receive(struct dtp * instance, instance->sv->rcv_left_window_edge = seq_num; ringq_push(instance->to_post, du); } + spin_unlock_bh(&instance->sv_lock); if (dtcp) { @@ -1638,12 +1730,7 @@ int dtp_receive(struct dtp * instance, } } - while (!ringq_is_empty(instance->to_send)) { - struct du * du_ctrl = ringq_pop(instance->to_send); - if (du_ctrl) { - dtp_pdu_send(instance, instance->rmt, du_ctrl); - } - } + dtp_send_pending_ctrl_pdus(instance); if (list_empty(&instance->seqq->queue->head)) rtimer_stop(&instance->timers.a); diff --git a/kernel/dtp.h b/kernel/dtp.h index 8275b8d637..1472574098 100644 --- a/kernel/dtp.h +++ b/kernel/dtp.h @@ -28,8 +28,6 @@ #include "ps-factory.h" #include "rds/robjects.h" -#define DTP_INACTIVITY_TIMERS_ENABLE 1 - struct dtp * dtp_create(struct efcp * efcp, struct rmt * rmt, struct dtp_config * dtp_cfg, @@ -51,6 +49,8 @@ int dtp_sv_init(struct dtp * dtp, int dtp_write(struct dtp * instance, struct du * du); +void dtp_send_pending_ctrl_pdus(struct dtp * dtp); + /* DTP receives a PDU from RMT */ int dtp_receive(struct dtp * instance, struct du * du); diff --git a/kernel/efcp-str.h b/kernel/efcp-str.h index bbdc09e581..5c9f501155 100644 --- a/kernel/efcp-str.h +++ b/kernel/efcp-str.h @@ -168,7 +168,7 @@ struct dtp { * NOTE: The DTP State Vector is discarded only after and explicit * release by the AP or by the system (if the AP crashes). */ - struct dtp_sv * sv; /* The state-vector */ + struct dtp_sv * sv; /* The state-vector */ spinlock_t sv_lock; /* The state vector lock (DTP & DTCP) */ struct rina_component base; @@ -183,10 +183,11 @@ struct dtp { struct timer_list a; struct timer_list rate_window; struct timer_list rtx; + struct timer_list rendezvous; } timers; - struct robject robj; + struct robject robj; - spinlock_t lock; + spinlock_t lock; }; /* This is the DT-SV part maintained by DTCP */ @@ -280,10 +281,10 @@ struct dtcp_sv { /* Rate based both in and out-bound */ - /* Last time-instant when the credit check has been done. - * This is used by rate-based flow control mechanism. - */ - struct timespec last_time; + /* Last time-instant when the credit check has been done. + * This is used by rate-based flow control mechanism. + */ + struct timespec last_time; /* * Control of duplicated control PDUs @@ -295,6 +296,18 @@ struct dtcp_sv { uint_t rtt; uint_t srtt; uint_t rttvar; + + /* Rendezvous */ + + /* This Boolean indicates whether there is a zero-length window and a + * Rendezvous PDU has been sent. + */ + bool rendezvous_sndr; + + /* This Boolean indicates whether a Rendezvous PDU was received. The + * next DT-PDU is expected to have a DRF bit set to true. + */ + bool rendezvous_rcvr; }; struct dtcp { @@ -308,9 +321,10 @@ struct dtcp { struct rina_component base; struct dtcp_config * cfg; struct rmt * rmt; + struct timer_list rendezvous_rcv; atomic_t cpdus_in_transit; - struct robject robj; + struct robject robj; }; diff --git a/kernel/efcp.c b/kernel/efcp.c index 91c8a25061..1ef89982d0 100644 --- a/kernel/efcp.c +++ b/kernel/efcp.c @@ -671,6 +671,7 @@ cep_id_t efcp_connection_create(struct efcp_container * container, return cep_id_bad(); } efcp->dtp->rtxq = rtxq; + efcp->dtp->rttq = NULL; } else { rttq = rttq_create(); if (!rttq) { @@ -679,6 +680,7 @@ cep_id_t efcp_connection_create(struct efcp_container * container, return cep_id_bad(); } efcp->dtp->rttq = rttq; + efcp->dtp->rtxq = NULL; } efcp->dtp->efcp = efcp; diff --git a/kernel/pci.c b/kernel/pci.c index 1b4d9a9d89..ac4c009abb 100644 --- a/kernel/pci.c +++ b/kernel/pci.c @@ -37,7 +37,6 @@ #define FLAGS_SIZE 1 #define TYPE_SIZE 1 - enum pci_field_index { PCI_BASE_VERSION = 0, PCI_BASE_DST_ADD, @@ -82,6 +81,15 @@ enum pci_field_index { PCI_ACK_FC_SNDR_RATE, PCI_ACK_FC_TIME_FRAME, PCI_ACK_FC_SIZE, + /* pci_rvous */ + PCI_RVOUS_LAST_CSN_RCVD, + PCI_RVOUS_NEW_LWE, + PCI_RVOUS_NEW_RWE, + PCI_RVOUS_MY_LWE, + PCI_RVOUS_MY_RWE, + PCI_RVOUS_SNDR_RATE, + PCI_RVOUS_TIME_FRAME, + PCI_RVOUS_SIZE, /* number of fields */ PCI_FIELD_INDEX_MAX, }; @@ -169,6 +177,11 @@ ssize_t *pci_offset_table_create(struct dt_cons *dt_cons) case PCI_ACK_FC_NEW_RWE: case PCI_ACK_FC_MY_LWE: case PCI_ACK_FC_MY_RWE: + case PCI_RVOUS_LAST_CSN_RCVD: + case PCI_RVOUS_NEW_LWE: + case PCI_RVOUS_NEW_RWE: + case PCI_RVOUS_MY_LWE: + case PCI_RVOUS_MY_RWE: offset += dt_cons->seq_num_length; break; case PCI_CTRL_SN: @@ -182,17 +195,20 @@ ssize_t *pci_offset_table_create(struct dt_cons *dt_cons) case PCI_FC_SNDR_RATE: case PCI_CACK_SNDR_RATE: case PCI_ACK_FC_SNDR_RATE: + case PCI_RVOUS_SNDR_RATE: offset += dt_cons->rate_length; break; case PCI_FC_TIME_FRAME: case PCI_CACK_TIME_FRAME: case PCI_ACK_FC_TIME_FRAME: + case PCI_RVOUS_TIME_FRAME: offset += dt_cons->frame_length; break; case PCI_DT_MGMT_SIZE: case PCI_FC_SIZE: case PCI_CACK_SIZE: case PCI_ACK_SIZE: + case PCI_RVOUS_SIZE: case PCI_ACK_FC_SIZE: offset = base_offset; break; @@ -236,6 +252,7 @@ ssize_t *pci_offset_table_create(struct dt_cons *dt_cons) LOG_DBG("pci_offsets[PCI_ACK_FC_MY_RWE] = %zu", pci_offsets[PCI_ACK_FC_MY_RWE]); LOG_DBG("pci_offsets[PCI_ACK_FC_SNDR_RATE] = %zu", pci_offsets[PCI_ACK_FC_SNDR_RATE]); LOG_DBG("pci_offsets[PCI_ACK_FC_TIME_FRAME] = %zu", pci_offsets[PCI_ACK_FC_TIME_FRAME]); + LOG_DBG("pci_offsets[PCI_RVOUS_SIZE] = %zu", pci_offsets[PCI_RVOUS_SIZE]); LOG_DBG("pci_offsets[PCI_ACK_FC_SIZE] = %zu",pci_offsets[PCI_ACK_FC_SIZE]); return pci_offsets; } @@ -450,6 +467,8 @@ ssize_t pci_calculate_size(struct efcp_config *cfg, pdu_type_t type) return cfg->pci_offset_table[PCI_ACK_FC_SIZE]; case PDU_TYPE_CACK: return cfg->pci_offset_table[PCI_CACK_SIZE]; + case PDU_TYPE_RENDEZVOUS: + return cfg->pci_offset_table[PCI_RVOUS_SIZE]; default: return -1; } @@ -490,6 +509,7 @@ seq_num_t pci_control_new_rt_wind_edge(const struct pci *pci) switch (pci_type(pci)) { case PDU_TYPE_FC: PCI_GETTER(pci, PCI_FC_NEW_RWE, seq_num_length, seq_num_t); + case PDU_TYPE_RENDEZVOUS: case PDU_TYPE_CACK: PCI_GETTER(pci, PCI_CACK_NEW_RWE, seq_num_length, seq_num_t); case PDU_TYPE_ACK_AND_FC: @@ -503,6 +523,7 @@ EXPORT_SYMBOL(pci_control_new_rt_wind_edge); seq_num_t pci_control_new_left_wind_edge(const struct pci *pci) { switch (pci_type(pci)) { + case PDU_TYPE_RENDEZVOUS: case PDU_TYPE_CACK: PCI_GETTER(pci, PCI_CACK_NEW_LWE, seq_num_length, seq_num_t); case PDU_TYPE_ACK_AND_FC: @@ -518,6 +539,7 @@ seq_num_t pci_control_my_rt_wind_edge(const struct pci *pci) switch (pci_type(pci)) { case PDU_TYPE_FC: PCI_GETTER(pci, PCI_FC_MY_RWE, seq_num_length, seq_num_t); + case PDU_TYPE_RENDEZVOUS: case PDU_TYPE_CACK: PCI_GETTER(pci, PCI_CACK_MY_RWE, seq_num_length, seq_num_t); case PDU_TYPE_ACK_AND_FC: @@ -533,6 +555,7 @@ seq_num_t pci_control_my_left_wind_edge(const struct pci *pci) switch (pci_type(pci)) { case PDU_TYPE_FC: PCI_GETTER(pci, PCI_FC_MY_LWE, seq_num_length, seq_num_t); + case PDU_TYPE_RENDEZVOUS: case PDU_TYPE_CACK: PCI_GETTER(pci, PCI_CACK_MY_LWE, seq_num_length, seq_num_t); case PDU_TYPE_ACK_AND_FC: @@ -546,6 +569,7 @@ EXPORT_SYMBOL(pci_control_my_left_wind_edge); seq_num_t pci_control_last_seq_num_rcvd(const struct pci *pci) { switch (pci_type(pci)) { + case PDU_TYPE_RENDEZVOUS: case PDU_TYPE_CACK: PCI_GETTER(pci, PCI_CACK_LAST_CSN_RCVD, seq_num_length, seq_num_t); case PDU_TYPE_ACK_AND_FC: @@ -561,6 +585,7 @@ u_int32_t pci_control_sndr_rate(const struct pci *pci) switch (pci_type(pci)) { case PDU_TYPE_FC: PCI_GETTER(pci, PCI_FC_SNDR_RATE, rate_length, u_int32_t); + case PDU_TYPE_RENDEZVOUS: case PDU_TYPE_CACK: PCI_GETTER(pci, PCI_CACK_SNDR_RATE, rate_length, u_int32_t); case PDU_TYPE_ACK_AND_FC: @@ -605,6 +630,7 @@ int pci_control_new_rt_wind_edge_set(struct pci *pci, seq_num_t seq) switch (pci_type(pci)) { case PDU_TYPE_FC: PCI_SETTER(pci, PCI_FC_NEW_RWE, seq_num_length, seq); + case PDU_TYPE_RENDEZVOUS: case PDU_TYPE_CACK: PCI_SETTER(pci, PCI_CACK_NEW_RWE, seq_num_length, seq); case PDU_TYPE_ACK_AND_FC: @@ -618,6 +644,7 @@ EXPORT_SYMBOL(pci_control_new_rt_wind_edge_set); int pci_control_new_left_wind_edge_set(struct pci *pci, seq_num_t seq) { switch (pci_type(pci)) { + case PDU_TYPE_RENDEZVOUS: case PDU_TYPE_CACK: PCI_SETTER(pci, PCI_CACK_NEW_LWE, seq_num_length, seq); case PDU_TYPE_ACK_AND_FC: @@ -633,6 +660,7 @@ int pci_control_my_rt_wind_edge_set(struct pci *pci, seq_num_t seq) switch (pci_type(pci)) { case PDU_TYPE_FC: PCI_SETTER(pci, PCI_FC_MY_RWE, seq_num_length, seq); + case PDU_TYPE_RENDEZVOUS: case PDU_TYPE_CACK: PCI_SETTER(pci, PCI_CACK_MY_RWE, seq_num_length, seq); case PDU_TYPE_ACK_AND_FC: @@ -648,6 +676,7 @@ int pci_control_my_left_wind_edge_set(struct pci *pci, seq_num_t seq) switch (pci_type(pci)) { case PDU_TYPE_FC: PCI_SETTER(pci, PCI_FC_MY_LWE, seq_num_length, seq); + case PDU_TYPE_RENDEZVOUS: case PDU_TYPE_CACK: PCI_SETTER(pci, PCI_CACK_MY_LWE, seq_num_length, seq); case PDU_TYPE_ACK_AND_FC: @@ -661,6 +690,7 @@ EXPORT_SYMBOL(pci_control_my_left_wind_edge_set); int pci_control_last_seq_num_rcvd_set(struct pci *pci, seq_num_t seq) { switch (pci_type(pci)) { + case PDU_TYPE_RENDEZVOUS: case PDU_TYPE_CACK: PCI_SETTER(pci, PCI_CACK_LAST_CSN_RCVD, seq_num_length, seq); case PDU_TYPE_ACK_AND_FC: @@ -676,6 +706,7 @@ int pci_control_sndr_rate_set(struct pci *pci, u_int32_t rate) switch (pci_type(pci)) { case PDU_TYPE_FC: PCI_SETTER(pci, PCI_FC_SNDR_RATE, rate_length, rate); + case PDU_TYPE_RENDEZVOUS: case PDU_TYPE_CACK: PCI_SETTER(pci, PCI_CACK_SNDR_RATE, rate_length, rate); case PDU_TYPE_ACK_AND_FC: diff --git a/kernel/pci.h b/kernel/pci.h index fe8b5de0f0..2d9c52b374 100644 --- a/kernel/pci.h +++ b/kernel/pci.h @@ -51,6 +51,8 @@ typedef uint8_t pdu_flags_t; #define PDU_TYPE_SNACK 0xCA /* Selective NACK */ #define PDU_TYPE_SACK_AND_FC 0xCD /* Selective ACK and Flow Control */ #define PDU_TYPE_SNACK_AND_FC 0xCE /* Selective NACK and Flow Control */ +/* Rendezvous PDU */ +#define PDU_TYPE_RENDEZVOUS 0xCF /* Rendezvous */ /* Management PDUs */ #define PDU_TYPE_MGMT 0x40 /* Management */ /* Number of different PDU types */ @@ -58,29 +60,31 @@ typedef uint8_t pdu_flags_t; typedef uint8_t pdu_type_t; -#define pdu_type_is_ok(X) \ - ((X == PDU_TYPE_DT) ? true : \ - ((X == PDU_TYPE_CACK) ? true : \ - ((X == PDU_TYPE_SACK) ? true : \ - ((X == PDU_TYPE_NACK) ? true : \ - ((X == PDU_TYPE_FC) ? true : \ - ((X == PDU_TYPE_ACK) ? true : \ - ((X == PDU_TYPE_ACK_AND_FC) ? true : \ - ((X == PDU_TYPE_SACK_AND_FC) ? true : \ - ((X == PDU_TYPE_SNACK_AND_FC) ? true : \ - ((X == PDU_TYPE_MGMT) ? true : \ - false)))))))))) +#define pdu_type_is_ok(X) \ + ((X == PDU_TYPE_DT) ? true : \ + ((X == PDU_TYPE_CACK) ? true : \ + ((X == PDU_TYPE_SACK) ? true : \ + ((X == PDU_TYPE_NACK) ? true : \ + ((X == PDU_TYPE_FC) ? true : \ + ((X == PDU_TYPE_ACK) ? true : \ + ((X == PDU_TYPE_ACK_AND_FC) ? true : \ + ((X == PDU_TYPE_SACK_AND_FC) ? true : \ + ((X == PDU_TYPE_SNACK_AND_FC) ? true : \ + ((X == PDU_TYPE_RENDEZVOUS) ? true : \ + ((X == PDU_TYPE_MGMT) ? true : \ + false))))))))))) -#define pdu_type_is_control(X) \ - ((X == PDU_TYPE_CACK) ? true : \ - ((X == PDU_TYPE_SACK) ? true : \ - ((X == PDU_TYPE_NACK) ? true : \ - ((X == PDU_TYPE_FC) ? true : \ - ((X == PDU_TYPE_ACK) ? true : \ - ((X == PDU_TYPE_ACK_AND_FC) ? true : \ - ((X == PDU_TYPE_SACK_AND_FC) ? true : \ - ((X == PDU_TYPE_SNACK_AND_FC) ? true : \ - false)))))))) +#define pdu_type_is_control(X) \ + ((X == PDU_TYPE_CACK) ? true : \ + ((X == PDU_TYPE_SACK) ? true : \ + ((X == PDU_TYPE_NACK) ? true : \ + ((X == PDU_TYPE_FC) ? true : \ + ((X == PDU_TYPE_ACK) ? true : \ + ((X == PDU_TYPE_ACK_AND_FC) ? true : \ + ((X == PDU_TYPE_SACK_AND_FC) ? true : \ + ((X == PDU_TYPE_RENDEZVOUS) ? true : \ + ((X == PDU_TYPE_SNACK_AND_FC) ? true : \ + false))))))))) struct pci { unsigned char *h; /* do not move from 1st position */ diff --git a/kernel/rmt.c b/kernel/rmt.c index 624d2fd837..e0fa9e3cf7 100644 --- a/kernel/rmt.c +++ b/kernel/rmt.c @@ -1354,6 +1354,7 @@ int rmt_receive(struct rmt *rmt, case PDU_TYPE_FC: case PDU_TYPE_ACK: case PDU_TYPE_ACK_AND_FC: + case PDU_TYPE_RENDEZVOUS: case PDU_TYPE_DT: /* * (FUTURE) diff --git a/plugins/cong_avoidance/dtcp-ps-cas.c b/plugins/cong_avoidance/dtcp-ps-cas.c index 0cd371f99c..67fa6b1755 100644 --- a/plugins/cong_avoidance/dtcp-ps-cas.c +++ b/plugins/cong_avoidance/dtcp-ps-cas.c @@ -250,6 +250,7 @@ dtcp_ps_cas_create(struct rina_component * component) ps->rcvr_control_ack = NULL; ps->no_rate_slow_down = NULL; ps->no_override_default_peak = NULL; + ps->rcvr_rendezvous = NULL; return &ps->base; } diff --git a/plugins/dummy/dtcp-ps-dummy.c b/plugins/dummy/dtcp-ps-dummy.c index 4b1c4fdb6a..92a6a6ec2c 100644 --- a/plugins/dummy/dtcp-ps-dummy.c +++ b/plugins/dummy/dtcp-ps-dummy.c @@ -113,6 +113,7 @@ dtcp_ps_dummy_create(struct rina_component * component) ps->rcvr_control_ack = NULL; ps->no_rate_slow_down = NULL; ps->no_override_default_peak = NULL; + ps->rcvr_rendezvous = NULL; return &ps->base; } diff --git a/plugins/rdsr-ps/cdrr/dtcp-ps-cdrr.c b/plugins/rdsr-ps/cdrr/dtcp-ps-cdrr.c index 186ad80ae5..1f53d0ba83 100644 --- a/plugins/rdsr-ps/cdrr/dtcp-ps-cdrr.c +++ b/plugins/rdsr-ps/cdrr/dtcp-ps-cdrr.c @@ -483,6 +483,7 @@ static struct ps_base * cdrr_create(struct rina_component * component) { ps->rcvr_control_ack = NULL; ps->no_rate_slow_down = NULL; ps->no_override_default_peak = NULL; + ps->rcvr_rendezvous = NULL; ri = rkzalloc(sizeof(struct cdrr_rate_info), GFP_KERNEL); diff --git a/plugins/red/dtcp-ps-red.c b/plugins/red/dtcp-ps-red.c index ae1406b529..131acf3f16 100644 --- a/plugins/red/dtcp-ps-red.c +++ b/plugins/red/dtcp-ps-red.c @@ -147,6 +147,7 @@ dtcp_ps_red_create(struct rina_component * component) ps->rcvr_control_ack = NULL; ps->no_rate_slow_down = NULL; ps->no_override_default_peak = NULL; + ps->rcvr_rendezvous = NULL; return &ps->base; }