diff --git a/conf/pafd-example.conf b/conf/pafd-example.conf index 0d34a50..c673bdd 100644 --- a/conf/pafd-example.conf +++ b/conf/pafd-example.conf @@ -15,6 +15,9 @@ # the effects of a server failure. domains: - name: domain0 # The name is optional, and used only for logging. + idle: # Maximum idle time limits (v3 clients only) + min: 10 # Lower limit. + max: 60 # Upper limit. sockets: # A list of server sockets for this domain. Having # multiple sockets (using different transport protocol) # may be useful is some client are local, and some diff --git a/doc/man/pafd.8.md b/doc/man/pafd.8.md index 85440a6..1e56786 100644 --- a/doc/man/pafd.8.md +++ b/doc/man/pafd.8.md @@ -140,10 +140,18 @@ the *name* key. This domain name is only used for logging and documentation and is not seen in the by clients to the Pathfinder server. -The default max client idle time (in seconds) may by configured by -adding a *max_idle_time* key to the domain dictionary. The default is -30 seconds. The actual maximum idle time may be lower (e.g., if a -client has published services with a low time-to-live [TTL]). +The max idle time for Pathfinder Protocol v3 clients may be controlled +on a per-domain basis by configuring a dictionary *idle* with +either/both of the *max* and *min* keys. + +*max* represents an upper bound for the maximum idle time (in seconds) +that will ever be employed for any client. This maximum value is also +used as the initial value for clients connecting to that domain. The +actual maximum idle time may be lower (e.g., if a client has published +services with a low time-to-live [TTL]). *max* default is 30 seconds. + +*min* represents a lower bound for the maximum idle time. The default +value is 4 seconds. *min* may not be set lower than 1 second. Each element of the *sockets* list is a socket. A socket is either the address in the form of a string (in XCM format), or a dictionary, @@ -176,10 +184,14 @@ command-line options for details. Configuration file example: domains: - - addrs: # Domain which may be access via two server sockets + - name: domain0 + addrs: # Domain which may be access via two server sockets - tls:*:4711 - ux:local - - addrs: # Second domain, only available at one socket + - name: domain1 + idle: + max: 15 + addrs: # Second domain, only available at one socket - tls:192.168.1.59:5711 resources: diff --git a/paf/conf.py b/paf/conf.py index 41569dc..f72c46e 100644 --- a/paf/conf.py +++ b/paf/conf.py @@ -12,7 +12,8 @@ DEFAULT_LOG_SYSLOG = True DEFAULT_LOG_FACILITY = logging.handlers.SysLogHandler.LOG_DAEMON DEFAULT_LOG_FILTER = logging.INFO -DEFAULT_MAX_IDLE_TIME = 30 +DEFAULT_IDLE_MIN = 4 +DEFAULT_IDLE_MAX = 30 class Error(Exception): @@ -26,6 +27,12 @@ def __init__(self, dict_path, dict_key): path(dict_path, dict_key)) +class DuplicateFieldError(Error): + def __init__(self, dict_path, dict_key): + Error.__init__(self, "parameter '%s' was used in combination " + "with one of its aliases" % path(dict_path, dict_key)) + + class FormatError(Error): def __init__(self, field_name, illegal_value, valid_values=None): message = "invalid %s: '%s'" % (field_name, illegal_value) @@ -185,9 +192,9 @@ def __repr__(self): class DomainConf: - def __init__(self, name, max_idle_time): + def __init__(self, name, idle_limit): self.name = name - self.max_idle_time = max_idle_time + self.idle_limit = idle_limit self.sockets = [] def add_socket(self, addr, tls_attrs={}): @@ -195,11 +202,16 @@ def add_socket(self, addr, tls_attrs={}): def __str__(self): s = {} + if self.name is not None: s["name"] = self.name + s["sockets"] = self.sockets - s["max_idle_time"] = self.max_idle_time + s["idle"] = { + "min": self.idle_limit.idle_min, + "max": self.idle_limit.idle_max + } return str(s) @@ -210,8 +222,10 @@ def __init__(self): self.domains = [] self.resources = ResourcesConf() - def add_domain(self, name=None, max_idle_time=DEFAULT_MAX_IDLE_TIME): - domain_conf = DomainConf(name, max_idle_time) + def add_domain(self, name=None, + idle_limit=sd.IdleLimit(DEFAULT_IDLE_MIN, + DEFAULT_IDLE_MAX)): + domain_conf = DomainConf(name, idle_limit) self.domains.append(domain_conf) return domain_conf @@ -241,9 +255,19 @@ def assure_type(value, value_type, path): "'%s')" % (path, type(value), value_type)) -def dict_lookup(dict_value, dict_key, value_type, dict_path, required=False, +def dict_lookup(dict_value, dict_keys, value_type, dict_path, required=False, default=None): - value = dict_value.get(dict_key) + if (isinstance(dict_keys, str)): + dict_keys = [dict_keys] + + value = None + + for dict_key in dict_keys: + if dict_key in dict_value: + if value is not None: + raise DuplicateFieldError(dict_path, dict_key) + value = dict_value.get(dict_key) + if value is None: if required: assert default is None @@ -291,20 +315,33 @@ def domains_populate(conf, domains, path): name = dict_lookup(domain, "name", str, domain_path, required=False) - max_idle_time = dict_lookup(domain, "max_idle_time", int, - domain_path, default=DEFAULT_MAX_IDLE_TIME, - required=False) + idle_min = DEFAULT_IDLE_MIN + + # 'max_idle_time' is a legacy name for 'idle_max' + idle_max = dict_lookup(domain, "max_idle_time", int, + domain_path, default=DEFAULT_IDLE_MAX, + required=False) + + idle = domain.get("idle") + if idle is not None: + idle_path = "%s.idle" % domain_path + + idle_min = dict_lookup(idle, "min", int, idle_path, + default=DEFAULT_IDLE_MIN, required=False) + + if "max_idle_time" in domain and "max" in idle: + raise DuplicateFieldError(domain_path, "max_idle_time") + + idle_max = dict_lookup(idle, "max", int, idle_path, + default=DEFAULT_IDLE_MAX, required=False) - # 'addrs' is an alternative name, supported for backward - # compatibility reasons - sockets = dict_lookup(domain, "addrs", list, domain_path, - required=False) + # 'addrs' is a legacy name for 'sockets' + sockets = dict_lookup(domain, ["sockets", "addrs"], list, domain_path, + required=True) - if sockets is None: - sockets = dict_lookup(domain, "sockets", list, domain_path, - required=True) + idle_limit = sd.IdleLimit(idle_min, idle_max) - domain_conf = conf.add_domain(name, max_idle_time) + domain_conf = conf.add_domain(name, idle_limit) for socket_num, socket in enumerate(sockets): socket_path = "%s.sockets[%d]" % (domain_path, socket_num) diff --git a/paf/daemon.py b/paf/daemon.py index c8f9519..d04bf90 100644 --- a/paf/daemon.py +++ b/paf/daemon.py @@ -80,7 +80,7 @@ def run(conf, hook=None): user = conf.resources.user.resources total = conf.resources.total.resources server = paf.server.create(domain.name, domain.sockets, user, - total, domain.max_idle_time, event_loop) + total, domain.idle_limit, event_loop) servers.append(server) if hook is not None: diff --git a/paf/sd.py b/paf/sd.py index fd1f2c9..54a7a34 100644 --- a/paf/sd.py +++ b/paf/sd.py @@ -152,6 +152,27 @@ def transfer(self, from_user_id, to_user_id, resource_type): raise e +MIN_IDLE_MIN = 1 + + +class IdleLimit: + def __init__(self, idle_min, idle_max): + if idle_min < MIN_IDLE_MIN: + raise ValueError("Lower bound for max idle time must be set " + ">= %d" % MIN_IDLE_MIN) + if idle_min > idle_max: + raise ValueError("Max idle time lower bound must be equal to " + "or lower than the upper bound") + self.idle_min = idle_min + self.idle_max = idle_max + + def limit(self, value): + return min(self.idle_max, max(self.idle_min, value)) + + def idle_default(self): + return self.idle_max + + class Subscription: def __init__(self, sub_id, filter, client_id, user_id, match_cb): self.sub_id = sub_id @@ -291,7 +312,6 @@ def assure_not_connected(fun): WARNING_THRESHOLD = 0.5 WARNING_JITTER = 0.1 -MIN_IDLE_TIME = 4 class IdleState(enum.Enum): @@ -306,10 +326,10 @@ def jitter(base, max_jitter): class Connection: - def __init__(self, client, timer_manager, max_idle_time, idle_cb): + def __init__(self, client, timer_manager, idle_limit, idle_cb): self.client = client self.timer_manager = timer_manager - self.max_idle_time = max_idle_time + self.idle_limit = idle_limit self.idle_cb = idle_cb self.subscriptions = {} self.services = {} @@ -318,7 +338,7 @@ def __init__(self, client, timer_manager, max_idle_time, idle_cb): self.idle_state = IdleState.ACTIVE self.idle_timer = None self.last_seen = time.time() - if max_idle_time is not None: + if idle_limit is not None: self.install_idle_warning_timer() def client_id(self): @@ -374,38 +394,37 @@ def active(self): if self.idle_state != IdleState.TIMED_OUT: self.idle_state = IdleState.ACTIVE - if self.max_idle_time is not None: + if self.idle_limit is not None: self.install_idle_warning_timer() self.last_seen = time.time() def check_idle(self): - assert self.max_idle_time is not None + assert self.idle_limit is not None if self.idle_state == IdleState.ACTIVE: self.issue_idle_warning() def install_idle_timer(self, t): - assert self.max_idle_time is not None + assert self.idle_limit is not None self.uninstall_idle_timer() self.idle_timer = \ self.timer_manager.add(self.idle_timer_fired, t, relative=True) - def idle_time(self): - if self.max_idle_time is not None: - t = self.max_idle_time + def max_idle_time(self): + if self.idle_limit is not None: if len(self.services) > 0: - t = min(t, self.get_lowest_ttl()) - t = max(t, MIN_IDLE_TIME) - return t + return self.idle_limit.limit(self.get_lowest_ttl()) + else: + return self.idle_limit.idle_default() def install_idle_warning_timer(self): - warning_time = jitter(WARNING_THRESHOLD * self.idle_time(), + warning_time = jitter(WARNING_THRESHOLD * self.max_idle_time(), WARNING_JITTER) self.install_idle_timer(warning_time) def install_idle_timeout_timer(self): - self.install_idle_timer((1 - WARNING_THRESHOLD) * self.idle_time()) + self.install_idle_timer((1 - WARNING_THRESHOLD) * self.max_idle_time()) def uninstall_idle_timer(self): if self.idle_timer is not None: @@ -507,7 +526,7 @@ def is_stale(self): return False return True - def connect(self, user_id, max_idle_time, conn_idle_cb): + def connect(self, user_id, idle_limit, conn_idle_cb): if self.is_connected(): # The active connection may be down but this has not yet # noticed by the server. @@ -522,7 +541,7 @@ def connect(self, user_id, max_idle_time, conn_idle_cb): self.db.add_client(self) self.active_connection = \ - Connection(self, self.timer_manager, max_idle_time, conn_idle_cb) + Connection(self, self.timer_manager, idle_limit, conn_idle_cb) @assure_connected def disconnect(self): @@ -737,15 +756,14 @@ def __init__(self, name, timer_manager, max_user_resources, self.db = DB() self.orphan_timers = {} - def client_connect(self, client_id, user_id, max_idle_time, - conn_idle_cb): + def client_connect(self, client_id, user_id, idle_limit, conn_idle_cb): client = self.db.get_client(client_id) if client is None: client = Client(client_id, user_id, self.db, self.resource_manager, self.timer_manager) - client.connect(user_id, max_idle_time, conn_idle_cb) + client.connect(user_id, idle_limit, conn_idle_cb) def client_disconnect(self, client_id): client = self._get_connected_client(client_id) diff --git a/paf/server.py b/paf/server.py index 644ef82..b6dcd0a 100644 --- a/paf/server.py +++ b/paf/server.py @@ -144,7 +144,7 @@ def is_two_way(self): class Connection: def __init__(self, sd, conn_sock, event_loop, server, handshake_cb, - max_idle_time, idle_cb, term_cb): + idle_limit, idle_cb, term_cb): self.client_id = None self.proto_version = None self.conn_addr = conn_sock.get_attr("xcm.remote_addr") @@ -155,7 +155,7 @@ def __init__(self, sd, conn_sock, event_loop, server, handshake_cb, self.event_loop = event_loop self.server = server self.handshake_cb = handshake_cb - self.max_idle_time = max_idle_time + self.idle_limit = idle_limit self.idle_cb = idle_cb self.term_cb = term_cb self.update_source() @@ -344,22 +344,22 @@ def hello_request(self, ta, client_id, min_version, max_version): if self.proto_version is not None: try: if self.proto_version >= 3: - max_idle_time = self.max_idle_time + idle_limit = self.idle_limit else: - max_idle_time = None + idle_limit = None self.sd.client_connect(self.client_id, user_id, - max_idle_time, self.idle_cb) + self.idle_limit, self.idle_cb) self.debug("Handshake producedure finished for client from " "\"%s\"." % self.conn_addr, LogCategory.PROTOCOL) self.debug("Protocol version %d is selected." % self.proto_version, LogCategory.PROTOCOL) - if max_idle_time is not None: + if idle_limit is not None: self.debug("Initial max idle time is %d s." % - max_idle_time, LogCategory.PROTOCOL) - self.max_idle_time = max_idle_time + idle_limit.idle_default(), LogCategory.PROTOCOL) + self.idle_limit = idle_limit self.handshaked = True self.handshake_cb(self) @@ -594,7 +594,7 @@ def clients_request(self, ta): extended = self.proto_version >= 3 for conn in self.server.client_connections.values(): - idle = now - self.sd.client_last_seen(conn.client_id) + idle_time = now - self.sd.client_last_seen(conn.client_id) optargs = {} if conn.is_tracked(): @@ -602,7 +602,7 @@ def clients_request(self, ta): if extended: yield ta.notify(conn.client_id, conn.conn_addr, - int(conn.connect_time), idle, + int(conn.connect_time), idle_time, conn.proto_version, **optargs) else: yield ta.notify(conn.client_id, conn.conn_addr, @@ -691,11 +691,11 @@ def time_out(self): class Server: def __init__(self, name, sockets, max_user_resources, max_total_resources, - max_idle_time, event_loop): + idle_limit, event_loop): self.timer_manager = paf.timer.TimerManager(self.timer_changed) self.sd = sd.ServiceDiscovery(name, self.timer_manager, max_user_resources, max_total_resources) - self.max_idle_time = max_idle_time + self.idle_limit = idle_limit self.event_loop = event_loop self.server_socks = {} for socket in sockets: @@ -803,7 +803,7 @@ def sock_activate(self): self.update_source(source) conn = Connection(self.sd, conn_sock, self.event_loop, self, self.conn_handshake_completed, - self.max_idle_time, self.client_idle, + self.idle_limit, self.client_idle, self.conn_terminated) self.clientless_connections.add(conn) self.schedule_clean_out() @@ -875,6 +875,6 @@ def terminate(self): def create(name, sockets, max_user_resources, max_total_resources, - max_idle_time, event_loop): + idle_limit, event_loop): return Server(name, sockets, max_user_resources, max_total_resources, - max_idle_time, event_loop) + idle_limit, event_loop) diff --git a/test/test_paf.py b/test/test_paf.py index d93b2ef..8dbb9a5 100644 --- a/test/test_paf.py +++ b/test/test_paf.py @@ -109,10 +109,11 @@ def random_addr(): class Domain: - def __init__(self, name, addrs, max_idle_time): + def __init__(self, name, addrs, idle_min, idle_max): self.name = name self.addrs = addrs - self.max_idle_time = max_idle_time + self.idle_min = idle_min + self.idle_max = idle_max self.file = "%s/%s" % (DOMAINS_DIR, self.name) self.set_mapped_addr(self.default_addr()) @@ -165,14 +166,14 @@ def random_domain(self): def default_domain(self): return self.domains[0] - def configure_domain(self, name, addrs, max_idle_time=None): + def configure_domain(self, name, addrs, idle_min=None, idle_max=None): if isinstance(addrs, str): addrs = [addrs] - if max_idle_time is not None: + if idle_max is not None or idle_min is not None: self.use_config_file = True - domain = Domain(name, addrs, max_idle_time) + domain = Domain(name, addrs, idle_min, idle_max) self.domains.append(domain) return domain @@ -250,8 +251,17 @@ def _write_config_file(self, use_tls_attrs): if random_bool(): domain_conf["name"] = "domain-%d" % random.randint(0, 10000) - if domain.max_idle_time is not None: - domain_conf["max_idle_time"] = domain.max_idle_time + if domain.idle_min is not None or domain.idle_max is not None: + if random_bool() and domain.idle_min is None: + domain_conf["max_idle_time"] = domain.idle_max + else: + idle = { + } + if domain.idle_min is not None: + idle["min"] = domain.idle_min + if domain.idle_min is not None: + idle["max"] = domain.idle_max + domain_conf["idle"] = idle domains_conf.append(domain_conf) conf["domains"] = domains_conf @@ -516,7 +526,8 @@ def limited_subscriptions_server(request): server.stop() -IMPATIENT_MAX_IDLE_TIME = 4 +IMPATIENT_IDLE_MIN = 2 +IMPATIENT_IDLE_MAX = 4 @pytest.fixture(scope='function') @@ -524,7 +535,8 @@ def impatient_server(request): server = server_by_request(request) if not server.supports(ServerFeature.PROTO_V3): pytest.skip("Server does not support protocol version 3") - server.configure_random_domain(1, max_idle_time=IMPATIENT_MAX_IDLE_TIME) + server.configure_random_domain(1, idle_min=IMPATIENT_IDLE_MIN, + idle_max=IMPATIENT_IDLE_MAX) server.start() yield server server.stop() @@ -732,7 +744,7 @@ def test_server_tracking_client(impatient_server): # Verify a track query is being sent start = time.time() - timeout = IMPATIENT_MAX_IDLE_TIME + timeout = IMPATIENT_IDLE_MAX query_time = timeout * 0.5 wait(conn, criteria=lambda: track_recorder.count_notifications() > 0) @@ -777,7 +789,7 @@ def test_client_activity_avoids_track_queries(impatient_server): wait(conn, criteria=track_recorder.accepted) iter = 10 - timeout = (IMPATIENT_MAX_IDLE_TIME + 1) / iter + timeout = (IMPATIENT_IDLE_MAX + 1) / iter for _ in range(iter): wait(conn, timeout=timeout) conn.ping()