From 8890a6c76904897aecf9fabfb21d22ccbb2edf4f Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Thu, 24 Feb 2022 21:56:03 -0700 Subject: [PATCH] Test fix --- salt/transport/tcp.py | 48 +++++++++++-------- salt/transport/zeromq.py | 27 +++++++---- .../zeromq/test_pub_server_channel.py | 37 ++++++++------ tests/pytests/unit/transport/test_tcp.py | 20 ++++---- tests/unit/transport/test_ipc.py | 2 + 5 files changed, 80 insertions(+), 54 deletions(-) diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py index 9665b98f4f81..f00b3c40eb6a 100644 --- a/salt/transport/tcp.py +++ b/salt/transport/tcp.py @@ -1422,7 +1422,7 @@ class PubServer(salt.ext.tornado.tcpserver.TCPServer): TCP publisher """ - def __init__(self, opts, io_loop=None): + def __init__(self, opts, io_loop=None, pack_publish=lambda _: _): super().__init__(ssl_options=opts.get("ssl")) self.io_loop = io_loop self.opts = opts @@ -1449,6 +1449,10 @@ def __init__(self, opts, io_loop=None): ) else: self.event = None + self._pack_publish = pack_publish + + def pack_publish(self, load): + return self._pack_publish(load) def close(self): if self._closing: @@ -1557,6 +1561,7 @@ def handle_stream(self, stream, address): @salt.ext.tornado.gen.coroutine def publish_payload(self, package, _): log.debug("TCP PubServer sending payload: %s", package) + payload = self.pack_publish(package) payload = salt.transport.frame.frame_msg(package["payload"]) to_remove = [] @@ -1632,7 +1637,9 @@ def _publish_daemon(self, **kwargs): self.io_loop = salt.ext.tornado.ioloop.IOLoop.current() # Spin up the publisher - pub_server = PubServer(self.opts, io_loop=self.io_loop) + pub_server = PubServer( + self.opts, io_loop=self.io_loop, pack_publish=self.pack_publish + ) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) _set_tcp_keepalive(sock, self.opts) @@ -1675,10 +1682,7 @@ def pre_fork(self, process_manager, kwargs=None): """ process_manager.add_process(self._publish_daemon, kwargs=kwargs) - def publish(self, load): - """ - Publish "load" to minions - """ + def pack_publish(self, load): payload = {"enc": "aes"} load["serial"] = salt.master.SMaster.get_serial() crypticle = salt.crypt.Crypticle( @@ -1689,20 +1693,6 @@ def publish(self, load): master_pem_path = os.path.join(self.opts["pki_dir"], "master.pem") log.debug("Signing data packet") payload["sig"] = salt.crypt.sign_message(master_pem_path, payload["load"]) - # Use the Salt IPC server - if self.opts.get("ipc_mode", "") == "tcp": - pull_uri = int(self.opts.get("tcp_master_publish_pull", 4514)) - else: - pull_uri = os.path.join(self.opts["sock_dir"], "publish_pull.ipc") - # TODO: switch to the actual asynchronous interface - # pub_sock = salt.transport.ipc.IPCMessageClient(self.opts, io_loop=self.io_loop) - pub_sock = salt.utils.asynchronous.SyncWrapper( - salt.transport.ipc.IPCMessageClient, - (pull_uri,), - loop_kwarg="io_loop", - ) - pub_sock.connect() - int_payload = {"payload": salt.payload.dumps(payload)} # add some targeting stuff for lists only (for now) @@ -1719,5 +1709,21 @@ def publish(self, load): int_payload["topic_lst"] = match_ids else: int_payload["topic_lst"] = load["tgt"] + return int_payload + + def publish(self, load): + """ + Publish "load" to minions + """ # Send it over IPC! - pub_sock.send(int_payload) + if self.opts.get("ipc_mode", "") == "tcp": + pull_uri = int(self.opts.get("tcp_master_publish_pull", 4514)) + else: + pull_uri = os.path.join(self.opts["sock_dir"], "publish_pull.ipc") + pub_sock = salt.utils.asynchronous.SyncWrapper( + salt.transport.ipc.IPCMessageClient, + (pull_uri,), + loop_kwarg="io_loop", + ) + pub_sock.connect() + pub_sock.send(load) diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py index 6e65a44ea2fc..9e61b2325545 100644 --- a/salt/transport/zeromq.py +++ b/salt/transport/zeromq.py @@ -939,6 +939,8 @@ def _publish_daemon(self, log_queue=None): try: log.debug("Publish daemon getting data from puller %s", pull_uri) package = pull_sock.recv() + package = salt.payload.loads(package) + package = self.pack_publish(package) log.debug("Publish daemon received payload. size=%d", len(package)) unpacked_package = salt.payload.unpackage(package) @@ -1031,8 +1033,8 @@ def pub_connect(self): """ if self.pub_sock: self.pub_close() - ctx = zmq.Context.instance() - self._sock_data.sock = ctx.socket(zmq.PUSH) + self._sock_data._ctx = zmq.Context() + self._sock_data.sock = self._sock_data._ctx.socket(zmq.PUSH) self.pub_sock.setsockopt(zmq.LINGER, -1) if self.opts.get("ipc_mode", "") == "tcp": pull_uri = "tcp://127.0.0.1:{}".format( @@ -1054,14 +1056,10 @@ def pub_close(self): if hasattr(self._sock_data, "sock"): self._sock_data.sock.close() delattr(self._sock_data, "sock") + if hasattr(self._sock_data, "_ctx"): + self._sock_data._ctx.destroy() - def publish(self, load): - """ - Publish "load" to minions. This send the load to the publisher daemon - process with does the actual sending to minions. - - :param dict load: A load to be sent across the wire to minions - """ + def pack_publish(self, load): payload = {"enc": "aes"} load["serial"] = salt.master.SMaster.get_serial() crypticle = salt.crypt.Crypticle( @@ -1094,9 +1092,18 @@ def publish(self, load): load.get("jid", None), len(payload), ) + return payload + + def publish(self, load): + """ + Publish "load" to minions. This send the load to the publisher daemon + process with does the actual sending to minions. + + :param dict load: A load to be sent across the wire to minions + """ if not self.pub_sock: self.pub_connect() - self.pub_sock.send(payload) + self.pub_sock.send(salt.payload.dumps(load)) log.debug("Sent payload to publish daemon.") diff --git a/tests/pytests/functional/transport/zeromq/test_pub_server_channel.py b/tests/pytests/functional/transport/zeromq/test_pub_server_channel.py index 5633bceafec3..e7033f810a20 100644 --- a/tests/pytests/functional/transport/zeromq/test_pub_server_channel.py +++ b/tests/pytests/functional/transport/zeromq/test_pub_server_channel.py @@ -127,6 +127,15 @@ def __init__(self, master_config, minion_config, **collector_kwargs): self.minion_config = minion_config self.collector_kwargs = collector_kwargs self.aes_key = salt.crypt.Crypticle.generate_key_string() + salt.master.SMaster.secrets["aes"] = { + "secret": multiprocessing.Array( + ctypes.c_char, + salt.utils.stringutils.to_bytes(self.aes_key), + ), + "serial": multiprocessing.Value( + ctypes.c_longlong, lock=False # We'll use the lock from 'secret' + ), + } self.process_manager = salt.utils.process.ProcessManager( name="ZMQ-PubServer-ProcessManager" ) @@ -145,15 +154,6 @@ def __init__(self, master_config, minion_config, **collector_kwargs): ) def run(self): - salt.master.SMaster.secrets["aes"] = { - "secret": multiprocessing.Array( - ctypes.c_char, - salt.utils.stringutils.to_bytes(self.aes_key), - ), - "serial": multiprocessing.Value( - ctypes.c_longlong, lock=False # We'll use the lock from 'secret' - ), - } try: while True: payload = self.queue.get() @@ -247,12 +247,16 @@ def test_issue_36469_tcp(salt_master, salt_minion): https://github.com/saltstack/salt/issues/36469 """ - def _send_small(server_channel, sid, num=10): + def _send_small(opts, sid, num=10): + server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts) for idx in range(num): load = {"tgt_type": "glob", "tgt": "*", "jid": "{}-s{}".format(sid, idx)} server_channel.publish(load) + time.sleep(0.3) + server_channel.close_pub() - def _send_large(server_channel, sid, num=10, size=250000 * 3): + def _send_large(opts, sid, num=10, size=250000 * 3): + server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts) for idx in range(num): load = { "tgt_type": "glob", @@ -261,16 +265,19 @@ def _send_large(server_channel, sid, num=10, size=250000 * 3): "xdata": "0" * size, } server_channel.publish(load) + time.sleep(0.3) + server_channel.close_pub() opts = dict(salt_master.config.copy(), ipc_mode="tcp", pub_hwm=0) send_num = 10 * 4 expect = [] with PubServerChannelProcess(opts, salt_minion.config.copy()) as server_channel: + assert "aes" in salt.master.SMaster.secrets with ThreadPoolExecutor(max_workers=4) as executor: - executor.submit(_send_small, server_channel, 1) - executor.submit(_send_large, server_channel, 2) - executor.submit(_send_small, server_channel, 3) - executor.submit(_send_large, server_channel, 4) + executor.submit(_send_small, opts, 1) + executor.submit(_send_large, opts, 2) + executor.submit(_send_small, opts, 3) + executor.submit(_send_large, opts, 4) expect.extend(["{}-s{}".format(a, b) for a in range(10) for b in (1, 3)]) expect.extend(["{}-l{}".format(a, b) for a in range(10) for b in (2, 4)]) results = server_channel.collector.results diff --git a/tests/pytests/unit/transport/test_tcp.py b/tests/pytests/unit/transport/test_tcp.py index d003797d29c9..3b6e17547256 100644 --- a/tests/pytests/unit/transport/test_tcp.py +++ b/tests/pytests/unit/transport/test_tcp.py @@ -210,15 +210,17 @@ def test_tcp_pub_server_channel_publish_filtering(temp_salt_master): SyncWrapper.return_value = wrap # try simple publish with glob tgt_type - channel.publish({"test": "value", "tgt_type": "glob", "tgt": "*"}) - payload = wrap.send.call_args[0][0] + payload = channel.pack_publish( + {"test": "value", "tgt_type": "glob", "tgt": "*"} + ) # verify we send it without any specific topic assert "topic_lst" not in payload # try simple publish with list tgt_type - channel.publish({"test": "value", "tgt_type": "list", "tgt": ["minion01"]}) - payload = wrap.send.call_args[0][0] + payload = channel.pack_publish( + {"test": "value", "tgt_type": "list", "tgt": ["minion01"]} + ) # verify we send it with correct topic assert "topic_lst" in payload @@ -226,8 +228,9 @@ def test_tcp_pub_server_channel_publish_filtering(temp_salt_master): # try with syndic settings opts["order_masters"] = True - channel.publish({"test": "value", "tgt_type": "list", "tgt": ["minion01"]}) - payload = wrap.send.call_args[0][0] + payload = channel.pack_publish( + {"test": "value", "tgt_type": "list", "tgt": ["minion01"]} + ) # verify we send it without topic for syndics assert "topic_lst" not in payload @@ -257,8 +260,9 @@ def test_tcp_pub_server_channel_publish_filtering_str_list(temp_salt_master): check_minions.return_value = {"minions": ["minion02"]} # try simple publish with list tgt_type - channel.publish({"test": "value", "tgt_type": "list", "tgt": "minion02"}) - payload = wrap.send.call_args[0][0] + payload = channel.pack_publish( + {"test": "value", "tgt_type": "list", "tgt": "minion02"} + ) # verify we send it with correct topic assert "topic_lst" in payload diff --git a/tests/unit/transport/test_ipc.py b/tests/unit/transport/test_ipc.py index f663d941b592..79b49f940602 100644 --- a/tests/unit/transport/test_ipc.py +++ b/tests/unit/transport/test_ipc.py @@ -40,6 +40,8 @@ class IPCMessagePubSubCase(salt.ext.tornado.testing.AsyncTestCase): def setUp(self): super().setUp() self.opts = {"ipc_write_buffer": 0} + if not os.path.exists(RUNTIME_VARS.TMP): + os.mkdir(RUNTIME_VARS.TMP) self.socket_path = os.path.join(RUNTIME_VARS.TMP, "ipc_test.ipc") self.pub_channel = self._get_pub_channel() self.sub_channel = self._get_sub_channel()