diff --git a/.gitignore b/.gitignore index 3e401ad6..c66719f8 100644 --- a/.gitignore +++ b/.gitignore @@ -16,4 +16,13 @@ amqp-vertx/target/* terraform.tfstate.* terraform.tfstate +# Associated with the Python code +venv* +Python/tmp/ + +# Standard Eclipse control files +.project +.settings +.classpath + diff --git a/Python/README.md b/Python/README.md index 3ede44e6..ab397979 100644 --- a/Python/README.md +++ b/Python/README.md @@ -1,62 +1,91 @@ # IBM MQ Python samples -The python samples are based on https://dsuch.github.io/pymqi/ -and have been tested with python 3.10.12,3.11.9 and 3.12.5 +These samples use a Python library for the MQI to demonstrate basic messaging operations. -Python PyMQI library uses the IBM MQ C client libraries through the MQI interface. +The Python `ibmmq` library uses the IBM MQ C client libraries through the MQI interface. -The library needs to be compiled with a C compiler which you need to have installed in your development environment. +The Python library needs to be compiled with a C compiler which you need to have installed in your development +environment. For example, on MacOS we used `XCode`, on Windows the `Desktop development with C++` module inside Visual +Studio and on Ubuntu the `gcc` GNU Compiler Collection. -For example, on MacOS we used `XCode`, on Windows the `Desktop development with C++` module inside Visual Studio and on Ubuntu the `gcc` GNU Compiler Collection. +The samples use the same configuration file as other language samples in this repository. -Install/unzip IBM MQ client +## Client and SDK installation +### MacOS +Follow Step 1 from [this page](https://developer.ibm.com/tutorials/mq-macos-dev/) to install the SDK using brew. None of +the other steps on that page are required in order to run these Python samples. -## Mac +Alternatively you can download the IBM MQ MacOS toolkit from +[here](https://public.dhe.ibm.com/ibmdl/export/pub/software/websphere/messaging/mqdev/mactoolkit/) -[IBM MQ MacOS toolkit for developers download](https://public.dhe.ibm.com/ibmdl/export/pub/software/websphere/messaging/mqdev/mactoolkit/) +### Windows +The MQ Redistributable Client for Windows can be downloaded from +[here](https://public.dhe.ibm.com/ibmdl/export/pub/software/websphere/messaging/mqdev/redist/) -Add -`/opt/mqm/bin` and -`/opt/mqm/samp/bin`, to the PATH by editing `/etc/paths` +### Linux +The MQ Redistributed Client for Linux x64 can be downloaded from +[here](https://public.dhe.ibm.com/ibmdl/export/pub/software/websphere/messaging/mqdev/redist/) -execute the following command: -`export DYLD_LIBRARY_PATH=/opt/mqm/lib64` +For other platforms, you can use the regular MQ iamges to install, at minimum, the MQ Client and SDK components. + +## IBM MQ Python package installation +You may like to work inside a Python virtual environment. If so, create and initialise that in the usual ways. +For example: + +``` +python -m venv my_venv +. my_venv/bin/activate +``` + +Then install the prerequsite package by running: `pip install ibmmq`. -## Windows +## Sample Configuration +All of the programs read a JSON-formatted configuration file. The name of the file can be given by setting the +`JSON_CONFIG` environment variable. If that is not set, the _env.json_ file from the parent directory is used. Edit the +configuration to match the configuration of the queue manager you are going to work with. -[Windows MQ redist client download](https://public.dhe.ibm.com/ibmdl/export/pub/software/websphere/messaging/mqdev/redist/) +## Running the programs +There are no parameters to any of the programs. -## Linux Ubuntu +You might need to run `setmqenv` to create environment variables pointing at your MQ installation libraries. + +On MacOS, the `DYLD_LIBRARY_PATH` will usually need to be set to include the `/opt/mqm/lib64` directory: + +`export DYLD_LIBRARY_PATH=/opt/mqm/lib64` -[Linux MQ redist client download](https://public.dhe.ibm.com/ibmdl/export/pub/software/websphere/messaging/mqdev/redist/) +If you are on Linux, you might need set the `LD_LIBRARY_PATH` to include the `/opt/mqm/lib64` directory: -For installation instructions please go to -#### [linux installation](../../mq-dev-patterns/installationDocs/linuxUbuntu-installationSteps.md) +`export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/mqm/lib64` +See [here](https://www.ibm.com/docs/en/ibm-mq/latest?topic=reference-setmqenv-set-mq-environment) for +more information about `setmqenv`. -## Run samples +On some systems, you might need to explicitly use the `python3` command instead of `python`. -To run the examples cd to the Python directory, and install the prerequsites by running : +### Put/Get +The `basicput` application places a short string message onto the queue. -`pip install pymqi` +`python ./basicput` -### Put / Get -`python basicput.py` +The `basicget` application reads all messages from the queue and displays the contents. -and +`python ./basicget` -`python basicget.py` +### Publish/Subscribe +Run these samples as a pair. -### Publish / Subscribe -`python basicpublish.py` +Start the `basicsubcribe` program in one window (or in the background) and immediately afterwards start the +`basicpublish` program in another window. -and +`python ./basicsubscribe` -`python basicsubscribe.py` +`python ./basicpublish` -### Request / Response +### Request/Response +Run these samples as a pair. -`python basicrequest.py` +Start the `basicresponse` program in one window (or in the background) and immediately afterwards start the +`basicrequest` program in another window. -and +`python ./basicresponse` -`python basicresponse.py` +`python ./basicrequest` \ No newline at end of file diff --git a/Python/__init__.py b/Python/__init__.py index 8b137891..556a7056 100644 --- a/Python/__init__.py +++ b/Python/__init__.py @@ -1 +1,5 @@ +# pylint: disable=invalid-name +# pylint doesn't like us being in a "Python" subdirectory, so disable that check +"""This file is deliberately left blank. +""" diff --git a/Python/basicget.py b/Python/basicget.py index 3ece2137..ea986478 100644 --- a/Python/basicget.py +++ b/Python/basicget.py @@ -1,5 +1,8 @@ +"""Example of getting a message from an IBM MQ queue +""" + # -*- coding: utf-8 -*- -# Copyright 2019 IBM +# Copyright 2019, 2025 IBM # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,122 +15,115 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from utils.env import EnvStore -import os -import json -import pymqi import logging +import ibmmq as mq +from utils.env import EnvStore + logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) +logger = logging.getLogger('Get') + +WAIT_INTERVAL = 5 # Seconds +def connect(index): + """ Establish connection to MQ Queue Manager """ + logger.info('Establishing connection with MQ Server') -# function to establish connection to MQ Queue Manager + # Set credentials if they have been provided + csp = mq.CSP() + csp.CSPUserId = EnvStore.getenv_value(EnvStore.APP_USER, index) + csp.CSPPassword = EnvStore.getenv_value(EnvStore.APP_PASSWORD, index) + if csp.CSPUserId is None: + csp = None -def connect(): - logger.info('Establising Connection with MQ Server') try: cd = None - if not EnvStore.ccdtCheck(): - logger.info('CCDT URL export is not set, will be using json envrionment client connections settings') + if not EnvStore.is_ccdt_available(): + logger.info('CCDT URL export is not set, will be using json environment client connections settings') - cd = pymqi.CD(Version=pymqi.CMQXC.MQCD_VERSION_11) + cd = mq.CD(Version=mq.CMQXC.MQCD_VERSION_11) cd.ChannelName = MQDetails[EnvStore.CHANNEL] cd.ConnectionName = conn_info - cd.ChannelType = pymqi.CMQC.MQCHT_CLNTCONN - cd.TransportType = pymqi.CMQC.MQXPT_TCP + cd.ChannelType = mq.CMQXC.MQCHT_CLNTCONN + cd.TransportType = mq.CMQXC.MQXPT_TCP - logger.info('Checking Cypher details') + logger.info('Checking cipher details') # If a cipher is set then set the TLS settings if MQDetails[EnvStore.CIPHER]: - logger.info('Making use of Cypher details') + logger.info('Making use of cipher details') cd.SSLCipherSpec = MQDetails[EnvStore.CIPHER] - - # Key repository is not specified in CCDT so look in envrionment settings + # Key repository is not specified in CCDT so look in environment settings # Create an empty SCO object - sco = pymqi.SCO() + sco = mq.SCO() if MQDetails[EnvStore.KEY_REPOSITORY]: logger.info('Setting Key repository') sco.KeyRepository = MQDetails[EnvStore.KEY_REPOSITORY] - #options = pymqi.CMQC.MQPMO_NO_SYNCPOINT | pymqi.CMQC.MQPMO_NEW_MSG_ID | pymqi.CMQC.MQPMO_NEW_CORREL_ID - options = pymqi.CMQC.MQPMO_NEW_CORREL_ID - - qmgr = pymqi.QueueManager(None) + qmgr = mq.QueueManager(None) qmgr.connect_with_options(MQDetails[EnvStore.QMGR], - user=credentials[EnvStore.USER], - password=credentials[EnvStore.PASSWORD], - opts=options, cd=cd, sco=sco) + csp=csp, + cd=cd, sco=sco) return qmgr - except pymqi.MQMIError as e: - logger.error("Error connecting") - logger.error(e) + except mq.MQMIError as e: + logger.error('Error connecting: %s', e) return None -# function to establish connection to Queue - - -def getQueue(): - logger.info('Connecting to Queue') +def get_queue(): + """Establish access to a Queue""" + logger.info('Opening a Queue') try: - # Works with single call, but object Descriptor - # provides other options - # q = pymqi.Queue(qmgr, MQDetails[EnvStore.QUEUE_NAME]) - q = pymqi.Queue(qmgr) + q = mq.Queue(qmgr) - od = pymqi.OD() + od = mq.OD() od.ObjectName = MQDetails[EnvStore.QUEUE_NAME] - odOptions = pymqi.CMQC.MQOO_INPUT_AS_Q_DEF - q.open(od, odOptions) + open_options = mq.CMQC.MQOO_INPUT_AS_Q_DEF + q.open(od, open_options) return q - except pymqi.MQMIError as e: - logger.error("Error getting queue") - logger.error(e) + except mq.MQMIError as e: + logger.error('Error opening queue: %s', e) return None -# function to get message from Queue - - -def getMessages(): +def get_messages(): + """Get messages from the queue""" logger.info('Attempting gets from Queue') + # Message Descriptor - md = pymqi.MD() + md = mq.MD() # Get Message Options # MQGMO_NO_PROPERTIES indicates that JMS headers are to be stripped # off the message during the get. This can also be done by calling - # .get_no_jms on the queue instead of .get - gmo = pymqi.GMO() - gmo.Options = pymqi.CMQC.MQGMO_WAIT | \ - pymqi.CMQC.MQGMO_FAIL_IF_QUIESCING | \ - pymqi.CMQC.MQGMO_NO_PROPERTIES + # get_no_jms() on the queue instead of get(). + gmo = mq.GMO() + gmo.Options = (mq.CMQC.MQGMO_WAIT | + mq.CMQC.MQGMO_FAIL_IF_QUIESCING | + mq.CMQC.MQGMO_NO_SYNCPOINT | + mq.CMQC.MQGMO_NO_PROPERTIES) - gmo.WaitInterval = 5000 # 5 seconds + gmo.WaitInterval = WAIT_INTERVAL * 1000 # Convert to milliseconds keep_running = True while keep_running: try: # Reset the MsgId, CorrelId & GroupId so that we can reuse # the same 'md' object again. - md.MsgId = pymqi.CMQC.MQMI_NONE - md.CorrelId = pymqi.CMQC.MQCI_NONE - md.GroupId = pymqi.CMQC.MQGI_NONE + md.MsgId = mq.CMQC.MQMI_NONE + md.CorrelId = mq.CMQC.MQCI_NONE + md.GroupId = mq.CMQC.MQGI_NONE # Wait up to to gmo.WaitInterval for a new message. # message = queue.get_no_jms(None, md, gmo) message = queue.get(None, md, gmo) # Process the message here.. - msgObject = json.loads(message.decode()) - logger.info('Have message from Queue') - logger.info(msgObject) + logger.info('Have message from input queue: %s', message.decode()) - except pymqi.MQMIError as e: - if e.comp == pymqi.CMQC.MQCC_FAILED and e.reason == pymqi.CMQC.MQRC_NO_MSG_AVAILABLE: + except mq.MQMIError as e: + if e.comp == mq.CMQC.MQCC_FAILED and e.reason == mq.CMQC.MQRC_NO_MSG_AVAILABLE: # No messages, we should more on to next connection endpoint if there is one. logger.info('No more messages found on this connection') keep_running = False @@ -145,57 +141,43 @@ def getMessages(): logger.info('Have received a keyboard interrupt') keep_running = False - -def buildMQDetails(index): +def build_mq_details(index): + """Create the connection details for the queue manager""" for key in [EnvStore.QMGR, EnvStore.QUEUE_NAME, EnvStore.CHANNEL, EnvStore.HOST, EnvStore.PORT, EnvStore.KEY_REPOSITORY, EnvStore.CIPHER]: - MQDetails[key] = EnvStore.getEnvValue(key, index) - - -def setCredentials(index): - credentials[EnvStore.USER] = EnvStore.getEnvValue(EnvStore.APP_USER, index) - credentials[EnvStore.PASSWORD] = EnvStore.getEnvValue(EnvStore.APP_PASSWORD, index) + MQDetails[key] = EnvStore.getenv_value(key, index) - -# Application Logic starts here -logger.info("Application is Starting") +# Application logic starts here +logger.info('Application "BasicGet" is starting') envStore = EnvStore() -envStore.setEnv() +envStore.set_env() MQDetails = {} -credentials = {} - -logger.info('Credentials are set') -#logger.info(credentials) - -#conn_info = "%s(%s)" % (MQDetails[EnvStore.HOST], MQDetails[EnvStore.PORT]) -#conn_info = EnvStore.getConnection(EnvStore.HOST, EnvStore.PORT) qmgr = None queue = None -numEndPoints = envStore.getEndpointCount() -logger.info('There are %d connections' % numEndPoints) - +numEndPoints = envStore.get_endpoint_count() +logger.info('There are %d connections', numEndPoints) -for index, conn_info in envStore.getNextConnectionString(): - logger.info('Using Connection String %s' % conn_info) +# Loop through the connection options. If one succeeds, do the +# work and then quit. +for index, conn_info in envStore.get_next_connection_string(): + logger.info('Trying connection: %s', conn_info) - buildMQDetails(index) - setCredentials(index) + build_mq_details(index) - qmgr = connect() - if (qmgr): - queue = getQueue() - if (queue): - getMessages() - queue.close() + qmgr = connect(index) + if qmgr is not None: + queue = get_queue() + if queue is not None: + get_messages() + queue.close() - if (qmgr): qmgr.disconnect() + break - MQDetails.clear() - credentials.clear() +MQDetails.clear() -logger.info("Application is closing") +logger.info('Application is ending') diff --git a/Python/basicpublish.py b/Python/basicpublish.py index f35b5221..88355313 100644 --- a/Python/basicpublish.py +++ b/Python/basicpublish.py @@ -1,5 +1,7 @@ +"""Example of publishing a message to an IBM MQ topic +""" # -*- coding: utf-8 -*- -# Copyright 2019 IBM +# Copyright 2019, 2025 IBM # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,135 +14,122 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from utils.env import EnvStore -import os + import json import datetime -import pymqi - import logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) +import ibmmq as mq +from utils.env import EnvStore -# function to establish connection to MQ Queue Manager +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger('Pub') def connect(): - logger.info('Establising Connection with MQ Server') + """ Establish connection to MQ Queue Manager """ + logger.info('Establishing connection with MQ Server') try: cd = None - if not EnvStore.ccdtCheck(): - logger.info('CCDT URL export is not set, will be using json envrionment client connections settings') + if not EnvStore.is_ccdt_available(): + logger.info('CCDT URL export is not set, will be using json environment client connections settings') - cd = pymqi.CD(Version=pymqi.CMQXC.MQCD_VERSION_11) + cd = mq.CD(Version=mq.CMQXC.MQCD_VERSION_11) cd.ChannelName = MQDetails[EnvStore.CHANNEL] cd.ConnectionName = conn_info - cd.ChannelType = pymqi.CMQC.MQCHT_CLNTCONN - cd.TransportType = pymqi.CMQC.MQXPT_TCP + cd.ChannelType = mq.CMQXC.MQCHT_CLNTCONN + cd.TransportType = mq.CMQXC.MQXPT_TCP - logger.info('Checking Cypher details') + logger.info('Checking cipher details') # If a cipher is set then set the TLS settings if MQDetails[EnvStore.CIPHER]: - logger.info('Making use of Cypher details') + logger.info('Making use of cipher details') cd.SSLCipherSpec = MQDetails[EnvStore.CIPHER] - - # Key repository is not specified in CCDT so look in envrionment settings + # Key repository is not specified in CCDT so look in environment settings # Create an empty SCO object - sco = pymqi.SCO() + sco = mq.SCO() if MQDetails[EnvStore.KEY_REPOSITORY]: logger.info('Setting Key repository') sco.KeyRepository = MQDetails[EnvStore.KEY_REPOSITORY] - #options = pymqi.CMQC.MQPMO_NO_SYNCPOINT | pymqi.CMQC.MQPMO_NEW_MSG_ID | pymqi.CMQC.MQPMO_NEW_CORREL_ID - options = pymqi.CMQC.MQPMO_NEW_CORREL_ID + qmgr = mq.QueueManager(None) + + # Set credentials + csp = mq.CSP() + csp.CSPUserId = EnvStore.getenv_value(EnvStore.APP_USER) + csp.CSPPassword = EnvStore.getenv_value(EnvStore.APP_PASSWORD) + if csp.CSPUserId is None: + csp = None - qmgr = pymqi.QueueManager(None) qmgr.connect_with_options(MQDetails[EnvStore.QMGR], - user=credentials[EnvStore.USER], - password=credentials[EnvStore.PASSWORD], - opts=options, cd=cd, sco=sco) + csp=csp, + cd=cd, sco=sco) return qmgr - except pymqi.MQMIError as e: - logger.error("Error connecting") - logger.error(e) - return None + except mq.MQMIError as e: + logger.error('Error connecting: %s', e) + return None -# function to establish connection to Queue -def getTopic(): - logger.info('Connecting to Topic') +def get_topic(): + """Establish access to a Topic""" + logger.info('Opening a topic') try: - t = pymqi.Topic(qmgr, topic_string=MQDetails[EnvStore.TOPIC_NAME]) - t.open(open_opts=pymqi.CMQC.MQOO_OUTPUT) + t = mq.Topic(qmgr, topic_string=MQDetails[EnvStore.TOPIC_NAME]) + t.open(open_opts=mq.CMQC.MQOO_OUTPUT) return t - except pymqi.MQMIError as e: - logger.error("Error getting topic") - logger.error(e) + except mq.MQMIError as e: + logger.error('Error opening topic: %s', e) return None -# function to put message onto Queue - - -def publishMessage(): - logger.info('Attempting publish to Topic') +def publish_message(): + """Publish a message""" + logger.info('Attempting publish to topic') try: - md = pymqi.MD() - md.Format = pymqi.CMQC.MQFMT_STRING - # queue.put(json.dumps(msgObject).encode()) - # queue.put(json.dumps(msgObject)) - # topic.pub(str(json.dumps(msgObject))) - topic.pub(EnvStore.stringForVersion(json.dumps(msgObject)), md) - - logger.info("Publish message successful") - except pymqi.MQMIError as e: - logger.error("Error in publish to topic") - logger.error(e) - - -def buildMQDetails(): + md = mq.MD() + md.Format = mq.CMQC.MQFMT_STRING + msg = str(json.dumps(msg_object)) + + pmo = mq.PMO() + pmo.Options = mq.CMQC.MQPMO_NO_SYNCPOINT + topic.pub(msg, md, pmo) + logger.info('Publish message successful: %s', msg) + except mq.MQMIError as e: + logger.error('Error in publish to topic: %s', e) + +def build_mq_details(): + """Create the connection details for the queue manager""" for key in [EnvStore.QMGR, EnvStore.CHANNEL, EnvStore.HOST, EnvStore.PORT, EnvStore.KEY_REPOSITORY, EnvStore.CIPHER, EnvStore.TOPIC_NAME]: - MQDetails[key] = EnvStore.getEnvValue(key) - + MQDetails[key] = EnvStore.getenv_value(key) # Application Logic starts here -logger.info("Application is Starting") +logger.info('Application "BasicPublish" is starting') envStore = EnvStore() -envStore.setEnv() +envStore.set_env() MQDetails = {} -credentials = { - EnvStore.USER: EnvStore.getEnvValue(EnvStore.APP_USER), - EnvStore.PASSWORD: EnvStore.getEnvValue(EnvStore.APP_PASSWORD) -} - -buildMQDetails() -logger.info('Credentials are set') -#logger.info(credentials) +build_mq_details() -#conn_info = "%s(%s)" % (MQDetails[EnvStore.HOST], MQDetails[EnvStore.PORT]) -conn_info = EnvStore.getConnection(EnvStore.HOST, EnvStore.PORT) +conn_info = EnvStore.get_connection(EnvStore.HOST, EnvStore.PORT) -msgObject = { - 'Greeting': "Hello from Python! " + str(datetime.datetime.now()) +msg_object = { + 'Greeting': 'Hello from Python! ' + str(datetime.datetime.now()) } qmgr = None topic = None qmgr = connect() -if (qmgr): - topic = getTopic() - # queue.put(message.encode()) -if (topic): - publishMessage() +if qmgr is not None: + topic = get_topic() +if topic is not None: + publish_message() topic.close() -if (qmgr): +if qmgr is not None: qmgr.disconnect() -logger.info("Application is closing") +logger.info('Application is ending') diff --git a/Python/basicput.py b/Python/basicput.py index 9422cb45..4899e952 100644 --- a/Python/basicput.py +++ b/Python/basicput.py @@ -1,5 +1,7 @@ +"""Example of putting a message to an IBM MQ queue +""" # -*- coding: utf-8 -*- -# Copyright 2018, 2022 IBM Corp. +# Copyright 2018, 2025 IBM Corp. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,146 +15,128 @@ # See the License for the specific language governing permissions and # limitations under the License. - -from utils.env import EnvStore -import os import json import datetime -import pymqi - import logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) +import ibmmq as mq +from utils.env import EnvStore + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger('Put') -# function to establish connection to MQ Queue Manager def connect(): - logger.info('Establising Connection with MQ Server') + """ Establish connection to MQ Queue Manager """ + logger.info('Establishing connection with MQ Server') try: cd = None - if not EnvStore.ccdtCheck(): - logger.info('CCDT URL export is not set, will be using json envrionment client connections settings') + if not EnvStore.is_ccdt_available(): + logger.info('CCDT URL export is not set, will be using json environment client connections settings') - cd = pymqi.CD(Version=pymqi.CMQXC.MQCD_VERSION_11) + cd = mq.CD(Version=mq.CMQXC.MQCD_VERSION_11) cd.ChannelName = MQDetails[EnvStore.CHANNEL] cd.ConnectionName = conn_info - cd.ChannelType = pymqi.CMQC.MQCHT_CLNTCONN - cd.TransportType = pymqi.CMQC.MQXPT_TCP + cd.ChannelType = mq.CMQXC.MQCHT_CLNTCONN + cd.TransportType = mq.CMQXC.MQXPT_TCP - logger.info('Checking Cypher details') + logger.info('Checking cipher details') # If a cipher is set then set the TLS settings if MQDetails[EnvStore.CIPHER]: - logger.info('Making use of Cypher details') + logger.info('Making use of cipher details') cd.SSLCipherSpec = MQDetails[EnvStore.CIPHER] - # Key repository is not specified in CCDT so look in envrionment settings + # Key repository is not specified in CCDT so look in environment settings # Create an empty SCO object - sco = pymqi.SCO() + sco = mq.SCO() if MQDetails[EnvStore.KEY_REPOSITORY]: logger.info('Setting Key repository') sco.KeyRepository = MQDetails[EnvStore.KEY_REPOSITORY] - #options = pymqi.CMQC.MQPMO_NO_SYNCPOINT | pymqi.CMQC.MQPMO_NEW_MSG_ID | pymqi.CMQC.MQPMO_NEW_CORREL_ID - options = pymqi.CMQC.MQPMO_NEW_CORREL_ID + qmgr = mq.QueueManager(None) + + # Set credentials + csp = mq.CSP() + csp.CSPUserId = EnvStore.getenv_value(EnvStore.APP_USER) + csp.CSPPassword = EnvStore.getenv_value(EnvStore.APP_PASSWORD) + if csp.CSPUserId is None: + csp = None - qmgr = pymqi.QueueManager(None) - qmgr.connect_with_options(MQDetails[EnvStore.QMGR], - user=credentials[EnvStore.USER], - password=credentials[EnvStore.PASSWORD], - opts=options, cd=cd, sco=sco) + csp=csp, cd=cd, sco=sco) return qmgr - except pymqi.MQMIError as e: - logger.error("Error connecting") - logger.error(e) + except mq.MQMIError as e: + logger.error('Error connecting: %s', e) return None - -# function to establish connection to Queue -def getQueue(): - logger.info('Connecting to Queue') +def get_queue(): + """Establish access to queue""" + logger.info('Opening queue') try: - # Can do this in one line, but with an Object Descriptor - # can or in more options. - # q = pymqi.Queue(qmgr, MQDetails[EnvStore.QUEUE_NAME]) - q = pymqi.Queue(qmgr) + # Different ways to open a queue. + q = mq.Queue(qmgr) - od = pymqi.OD() + od = mq.OD() od.ObjectName = MQDetails[EnvStore.QUEUE_NAME] - q.open(od, pymqi.CMQC.MQOO_OUTPUT) + q.open(od, mq.CMQC.MQOO_OUTPUT) return q - except pymqi.MQMIError as e: - logger.error("Error getting queue") - logger.error(e) + except mq.MQMIError as e: + logger.error('Error opening queue: %s', e) return None -# function to put message onto Queue -def putMessage(): - logger.info('Attempting put to Queue') +def put_message(): + """Put message onto queue""" + logger.info('Attempting put to queue') try: - md = pymqi.MD() - md.Format = pymqi.CMQC.MQFMT_STRING - # queue.put(json.dumps(msgObject).encode()) - # queue.put(json.dumps(msgObject)) - # queue.put(str(json.dumps(msgObject))) - #queue.put(bytes(json.dumps(msgObject), 'utf-8')) - queue.put(EnvStore.stringForVersion(json.dumps(msgObject)),md) + md = mq.MD() + md.Format = mq.CMQC.MQFMT_STRING + msg = str(json.dumps(msg_object)) - logger.info("Put message successful") - except pymqi.MQMIError as e: - logger.error("Error in put to queue") - logger.error(e) + pmo = mq.PMO() + pmo.Options = mq.CMQC.MQPMO_NO_SYNCPOINT + queue.put(msg, md, pmo) + logger.info('Put message successful: %s', msg) + except mq.MQMIError as e: + logger.error('Error in put to queue: %s', e) -def buildMQDetails(): +def build_mq_details(): + """Create the connection details for the queue manager""" for key in [EnvStore.QMGR, EnvStore.QUEUE_NAME, EnvStore.CHANNEL, EnvStore.HOST, EnvStore.PORT, EnvStore.KEY_REPOSITORY, EnvStore.CIPHER]: - MQDetails[key] = EnvStore.getEnvValue(key) - + MQDetails[key] = EnvStore.getenv_value(key) # Application Logic starts here -logger.info("Application is Starting") +logger.info('Application "BasicPut" is starting') envStore = EnvStore() -envStore.setEnv() +envStore.set_env() MQDetails = {} -credentials = { - EnvStore.USER: EnvStore.getEnvValue(EnvStore.APP_USER), - EnvStore.PASSWORD: EnvStore.getEnvValue(EnvStore.APP_PASSWORD) -} - -buildMQDetails() - -logger.info('Credentials are set') -#logger.info(credentials) - -conn_info = EnvStore.getConnection(EnvStore.HOST, EnvStore.PORT) -#conn_info = "%s(%s)" % (MQDetails[EnvStore.HOST], MQDetails[EnvStore.PORT]) -logger.info('Connection is %s' % conn_info) +build_mq_details() +conn_info = EnvStore.get_connection(EnvStore.HOST, EnvStore.PORT) +logger.info('Connection is %s', conn_info) -msgObject = { - 'Greeting': "Hello from Python! " + str(datetime.datetime.now()) +msg_object = { + 'Greeting': 'Hello from Python! ' + str(datetime.datetime.now()) } qmgr = None queue = None qmgr = connect() -if (qmgr): - queue = getQueue() - # queue.put(message.encode()) -if (queue): - putMessage() +if qmgr is not None: + queue = get_queue() +if queue is not None: + put_message() queue.close() -if (qmgr): +if qmgr is not None: qmgr.disconnect() -logger.info("Application is closing") +logger.info('Application is ending') diff --git a/Python/basicrequest.py b/Python/basicrequest.py index 5bdc94c9..b3307727 100644 --- a/Python/basicrequest.py +++ b/Python/basicrequest.py @@ -1,5 +1,8 @@ +"""Example of using the request/response pattern with IBM MQ. +This is the requesting part. +""" # -*- coding: utf-8 -*- -# Copyright 2018, 2022 IBM Corp. +# Copyright 2018, 2025 IBM Corp. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,155 +15,156 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from utils.env import EnvStore -import os + import json import datetime import random -import pymqi - import logging +import ibmmq as mq + +from utils.env import EnvStore + logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) +logger = logging.getLogger('Req') +WAIT_INTERVAL = 5 # seconds -# function to establish connection to MQ Queue Manager def connect(): - logger.info('Establising Connection with MQ Server') + """ Establish connection to MQ Queue Manager """ + + logger.info('Establishing connection with MQ Server') try: cd = None - if not EnvStore.ccdtCheck(): - logger.info('CCDT URL export is not set, will be using json envrionment client connections settings') + if not EnvStore.is_ccdt_available(): + logger.info('CCDT URL export is not set, will be using json environment client connections settings') - cd = pymqi.CD(Version=pymqi.CMQXC.MQCD_VERSION_11) + cd = mq.CD(Version=mq.CMQXC.MQCD_VERSION_11) cd.ChannelName = MQDetails[EnvStore.CHANNEL] cd.ConnectionName = conn_info - cd.ChannelType = pymqi.CMQC.MQCHT_CLNTCONN - cd.TransportType = pymqi.CMQC.MQXPT_TCP + cd.ChannelType = mq.CMQXC.MQCHT_CLNTCONN + cd.TransportType = mq.CMQXC.MQXPT_TCP - logger.info('Checking Cypher details') + logger.info('Checking cipher details') # If a cipher is set then set the TLS settings if MQDetails[EnvStore.CIPHER]: - logger.info('Making use of Cypher details') + logger.info('Making use of cipher details') cd.SSLCipherSpec = MQDetails[EnvStore.CIPHER] - # Key repository is not specified in CCDT so look in envrionment settings + # Key repository is not specified in CCDT so look in environment settings # Create an empty SCO object - sco = pymqi.SCO() + sco = mq.SCO() if MQDetails[EnvStore.KEY_REPOSITORY]: logger.info('Setting Key repository') sco.KeyRepository = MQDetails[EnvStore.KEY_REPOSITORY] - #options = pymqi.CMQC.MQPMO_NO_SYNCPOINT | pymqi.CMQC.MQPMO_NEW_MSG_ID | pymqi.CMQC.MQPMO_NEW_CORREL_ID - options = pymqi.CMQC.MQPMO_NEW_CORREL_ID + # Set credentials + csp = mq.CSP() + csp.CSPUserId = EnvStore.getenv_value(EnvStore.APP_USER) + csp.CSPPassword = EnvStore.getenv_value(EnvStore.APP_PASSWORD) + if csp.CSPUserId is None: + csp = None - qmgr = pymqi.QueueManager(None) - qmgr.connect_with_options(MQDetails[EnvStore.QMGR], - user=credentials[EnvStore.USER], - password=credentials[EnvStore.PASSWORD], - opts=options, cd=cd, sco=sco) + qmgr = mq.QueueManager(None) + qmgr.connect_with_options(MQDetails[EnvStore.QMGR], csp=csp, + cd=cd, sco=sco) return qmgr - except pymqi.MQMIError as e: - logger.error("Error connecting") - logger.error(e) + except mq.MQMIError as e: + logger.error('Error connecting: %s', e) return None +def get_queue(): + """Establish access to a Queue""" -# function to establish connection to Queue -def getQueue(): - logger.info('Connecting to Queue') + logger.info('Opening queue') try: - # Can do this in one line, but with an Object Descriptor - # can or in more options. - # q = pymqi.Queue(qmgr, MQDetails[EnvStore.QUEUE_NAME]) - q = pymqi.Queue(qmgr) + q = mq.Queue(qmgr) - od = pymqi.OD() + od = mq.OD() od.ObjectName = MQDetails[EnvStore.QUEUE_NAME] - q.open(od, pymqi.CMQC.MQOO_OUTPUT) - logger.info('Connected to queue ' + str(MQDetails[EnvStore.QUEUE_NAME])) + q.open(od, mq.CMQC.MQOO_OUTPUT) + logger.info('Opened queue %s', str(MQDetails[EnvStore.QUEUE_NAME])) return q - except pymqi.MQMIError as e: - logger.error("Error getting queue") - logger.error(e) - return None - -# function to establish connection to Queue + except mq.MQMIError as e: + logger.error('Error opening queue: %s', e) + return None -def getDynamicQueue(): - logger.info('Connecting to Dynmic Queue') +def get_dynamic_queue(): + """create a Dynamic queue, used for replies, by opening a Model queue""" + logger.info('Opening model queue') try: - # Dynamic queue's object descriptor. - dyn_od = pymqi.OD() - logger.info(MQDetails[EnvStore.MODEL_QUEUE_NAME]) - logger.info(MQDetails[EnvStore.DYNAMIC_QUEUE_PREFIX]) - dyn_od.ObjectName = MQDetails[EnvStore.MODEL_QUEUE_NAME] - dyn_od.DynamicQName = MQDetails[EnvStore.DYNAMIC_QUEUE_PREFIX] + # Model queue's object descriptor. + od = mq.OD() + od.ObjectName = MQDetails[EnvStore.MODEL_QUEUE_NAME] + od.DynamicQName = MQDetails[EnvStore.DYNAMIC_QUEUE_PREFIX] # Open the dynamic queue. - dyn_input_open_options = pymqi.CMQC.MQOO_INPUT_EXCLUSIVE - dyn_queue = pymqi.Queue(qmgr, dyn_od, dyn_input_open_options) - logger.info("CREATED DYN QUEUE: " + str(dyn_queue)) - dynamicQueueName = dyn_od.ObjectName.strip() - logger.info('Dynamic Queue Details are') - logger.info(dynamicQueueName) - - return dyn_queue, dynamicQueueName - - except pymqi.MQMIError as e: - logger.error("Error getting queue") - logger.error(e) + open_options = mq.CMQC.MQOO_INPUT_EXCLUSIVE + dynamic_queue_object = mq.Queue(qmgr, od, open_options) + + # An alternative could be to use dynamic_queue_object.get_name() + dynamic_queue_name = od.ObjectName.strip() + logger.info('Created dynamic queue called %s', dynamic_queue_name) + + return dynamic_queue_object, dynamic_queue_name + + except mq.MQMIError as e: + logger.error('Error opening queue: %s', e) return None +def put_message(): + """Put message onto queue""" -# function to put message onto Queue -def putMessage(): - logger.info('Attempting put to Queue') + logger.info('Attempting put to queue') try: - # queue.put(json.dumps(msgObject).encode()) - # queue.put(json.dumps(msgObject)) - # Prepare a Message Descriptor for the request message. - logger.info('Dynamic Queue Name is ') - logger.info(dynamic['name']) - md = pymqi.MD() + # Set the ReplyToQ as the dynamic queue we just created + md = mq.MD() md.ReplyToQ = dynamic['name'] - md.MsgType = pymqi.CMQC.MQMT_REQUEST - md.Format = pymqi.CMQC.MQFMT_STRING + md.MsgType = mq.CMQC.MQMT_REQUEST + md.Format = mq.CMQC.MQFMT_STRING + + # Tell the responder how to set correlators + report_options = mq.CMQC.MQRO_COPY_MSG_ID_TO_CORREL_ID + md.ReportOptions = report_options + + pmo = mq.PMO() + pmo.Options = mq.CMQC.MQPMO_NO_SYNCPOINT - # Send the message and ReplyToQ destination - queue.put(EnvStore.stringForVersion(json.dumps(msgObject)), md) - - logger.info("Put message successful") - #logger.info(md.CorrelID) - return md.MsgId, md.CorrelId - # return md.CorrelId - except pymqi.MQMIError as e: - logger.error("Error in put to queue") - logger.error(e) + # Send the message + msg = str(json.dumps(msg_object)) + queue.put(msg, md, pmo) -# Function to wait for resonse on reply to Queue + logger.info('Put message successful: %s', msg) + return md.MsgId + except mq.MQMIError as e: + logger.error('Error in put to queue: %s', e) -def awaitResponse(msgId, correlId): + return None + +def await_response(msgid): + """Wait for a response on the replyToQ. + Use the CorrelId to ensure it corresponds to the original request. + """ logger.info('Attempting get from Reply Queue') # Message Descriptor - md = pymqi.MD() - md.MsgId = msgId - md.CorrelId = correlId + md = mq.MD() + # Set the field we are using to connect the reply to the request based on + # the request's ReportOptions + md.CorrelId = msgid # Get Message Options - gmo = pymqi.GMO() - gmo.Options = pymqi.CMQC.MQGMO_WAIT | \ - pymqi.CMQC.MQGMO_FAIL_IF_QUIESCING | \ - pymqi.CMQC.MQGMO_NO_PROPERTIES - gmo.WaitInterval = 5000 # 5 seconds - #gmo.MatchOptions = pymqi.CMQC.MQMO_MATCH_MSG_ID - gmo.MatchOptions = pymqi.CMQC.MQMO_MATCH_CORREL_ID - gmo.Version = pymqi.CMQC.MQGMO_VERSION_2 + gmo = mq.GMO() + gmo.Options = (mq.CMQC.MQGMO_WAIT | + mq.CMQC.MQGMO_FAIL_IF_QUIESCING | + mq.CMQC.MQGMO_NO_SYNCPOINT | + mq.CMQC.MQGMO_NO_PROPERTIES) + gmo.WaitInterval = WAIT_INTERVAL * 1000 # Convert to milliseconds + gmo.MatchOptions = mq.CMQC.MQMO_MATCH_CORREL_ID + gmo.Version = mq.CMQC.MQGMO_VERSION_2 keep_running = True while keep_running: @@ -169,17 +173,15 @@ def awaitResponse(msgId, correlId): message = dynamic['queue'].get(None, md, gmo) # Process the message here.. - msgObject = json.loads(message.decode()) - logger.info('Have reply message from Queue') - logger.info(msgObject) + logger.info('Have message from reply queue: %s', message.decode()) # Not expecting any more messages keep_running = False - except pymqi.MQMIError as e: - if e.comp == pymqi.CMQC.MQCC_FAILED and e.reason == pymqi.CMQC.MQRC_NO_MSG_AVAILABLE: + except mq.MQMIError as e: + if e.comp == mq.CMQC.MQCC_FAILED and e.reason == mq.CMQC.MQRC_NO_MSG_AVAILABLE: # No messages, that's OK, we can ignore it. - pass + keep_running = False else: # Some other error condition. raise @@ -195,40 +197,34 @@ def awaitResponse(msgId, correlId): keep_running = False -def buildMQDetails(): +def build_mq_details(): + """Create the connection details for the queue manager""" for key in [EnvStore.QMGR, EnvStore.QUEUE_NAME, EnvStore.CHANNEL, EnvStore.HOST, EnvStore.PORT, EnvStore.MODEL_QUEUE_NAME, EnvStore.DYNAMIC_QUEUE_PREFIX, EnvStore.KEY_REPOSITORY, EnvStore.CIPHER]: - MQDetails[key] = EnvStore.getEnvValue(key) + MQDetails[key] = EnvStore.getenv_value(key) # Application Logic starts here -logger.info("Application is Starting") +logger.info('Application "BasicRequest" is starting') envStore = EnvStore() -envStore.setEnv() +envStore.set_env() MQDetails = {} -credentials = { - EnvStore.USER: EnvStore.getEnvValue(EnvStore.APP_USER), - EnvStore.PASSWORD: EnvStore.getEnvValue(EnvStore.APP_PASSWORD) -} - -buildMQDetails() -logger.info('Credentials are set') -#logger.info(credentials) +build_mq_details() -#conn_info = "%s(%s)" % (MQDetails[EnvStore.HOST], MQDetails[EnvStore.PORT]) -conn_info = EnvStore.getConnection(EnvStore.HOST, EnvStore.PORT) +conn_info = EnvStore.get_connection(EnvStore.HOST, EnvStore.PORT) -msgObject = { - 'Greeting': "Hello from Python! " + str(datetime.datetime.now()), +msg_object = { + 'Greeting': 'Hello from Python! ' + str(datetime.datetime.now()), 'value': random.randint(1, 101) } qmgr = None queue = None +tdq = None dynamic = { 'queue': None, 'name': None @@ -237,25 +233,26 @@ def buildMQDetails(): correlid = None qmgr = connect() -if (qmgr): - queue = getQueue() - -if (queue): - dynamic['queue'], dynamic['name'] = getDynamicQueue() - -if (dynamic['queue']): - logger.info('Checking dynamic Queue Name') - logger.info(dynamic['name']) - msgid, correlid = putMessage() - if msgid: - awaitResponse(msgid, correlid) +if qmgr is not None: + queue = get_queue() + +if queue is not None: + reply_queue = get_dynamic_queue() + if reply_queue is not None: + dynamic['queue'] = reply_queue[0] + dynamic['name'] = reply_queue[1] + +if reply_queue is not None: + msgid = put_message() + if msgid is not None: + await_response(msgid) dynamic['queue'].close() -if (queue): +if queue is not None: queue.close() -if (qmgr): +if qmgr is not None: qmgr.disconnect() -logger.info("Application is closing") +logger.info('Application is ending') diff --git a/Python/basicresponse.py b/Python/basicresponse.py index 08d8aee5..e76f08f5 100644 --- a/Python/basicresponse.py +++ b/Python/basicresponse.py @@ -1,5 +1,8 @@ +"""Example of using the request/response pattern with IBM MQ. +This is the responding part. +""" # -*- coding: utf-8 -*- -# Copyright 2018, 2022 IBM Corp. +# Copyright 2018, 2025 IBM Corp. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,232 +16,260 @@ # See the License for the specific language governing permissions and # limitations under the License. -from utils.env import EnvStore -import os import json import datetime import random import math -import pymqi - import logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) +import ibmmq as mq + +from utils.env import EnvStore +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger('Rsp') -# function to establish connection to MQ Queue Manager +WAIT_INTERVAL = 5 # seconds def connect(): - logger.info('Establising Connection with MQ Server') + """ Establish connection to MQ Queue Manager """ + + logger.info('Establishing connection with MQ Server') try: cd = None - if not EnvStore.ccdtCheck(): - logger.info('CCDT URL export is not set, will be using json envrionment client connections settings') + if not EnvStore.is_ccdt_available(): + logger.info('CCDT URL export is not set, will be using json environment client connections settings') - cd = pymqi.CD(Version=pymqi.CMQXC.MQCD_VERSION_11) + cd = mq.CD(Version=mq.CMQXC.MQCD_VERSION_11) cd.ChannelName = MQDetails[EnvStore.CHANNEL] cd.ConnectionName = conn_info - cd.ChannelType = pymqi.CMQC.MQCHT_CLNTCONN - cd.TransportType = pymqi.CMQC.MQXPT_TCP + cd.ChannelType = mq.CMQXC.MQCHT_CLNTCONN + cd.TransportType = mq.CMQXC.MQXPT_TCP - logger.info('Checking Cypher details') + logger.info('Checking cipher details') # If a cipher is set then set the TLS settings if MQDetails[EnvStore.CIPHER]: - logger.info('Making use of Cypher details') + logger.info('Making use of cipher details') cd.SSLCipherSpec = MQDetails[EnvStore.CIPHER] - # Key repository is not specified in CCDT so look in envrionment settings + # Key repository is not specified in CCDT so look in environment settings # Create an empty SCO object - sco = pymqi.SCO() + sco = mq.SCO() if MQDetails[EnvStore.KEY_REPOSITORY]: logger.info('Setting Key repository') sco.KeyRepository = MQDetails[EnvStore.KEY_REPOSITORY] - #options = pymqi.CMQC.MQPMO_NO_SYNCPOINT | pymqi.CMQC.MQPMO_NEW_MSG_ID | pymqi.CMQC.MQPMO_NEW_CORREL_ID - options = pymqi.CMQC.MQPMO_NEW_CORREL_ID + # Set credentials + csp = mq.CSP() + csp.CSPUserId = EnvStore.getenv_value(EnvStore.APP_USER) + csp.CSPPassword = EnvStore.getenv_value(EnvStore.APP_PASSWORD) + if csp.CSPUserId is None: + csp = None - qmgr = pymqi.QueueManager(None) + qmgr = mq.QueueManager(None) qmgr.connect_with_options(MQDetails[EnvStore.QMGR], - user=credentials[EnvStore.USER], - password=credentials[EnvStore.PASSWORD], - opts=options, cd=cd, sco=sco) + csp=csp, + cd=cd, sco=sco) return qmgr - except pymqi.MQMIError as e: - logger.error("Error connecting") - logger.error(e) + except mq.MQMIError as e: + logger.error('Error connecting: %s', e) return None -# function to establish connection to Queue - +def get_queue(queue_name): + """Access the input queue""" -def getQueue(queueName, forInput): - logger.info('Connecting to Queue') + logger.info('Opening queue') try: - # Works with single call, but object Descriptor - # provides other options - # q = pymqi.Queue(qmgr, MQDetails[EnvStore.QUEUE_NAME]) - q = pymqi.Queue(qmgr) - od = pymqi.OD() - od.ObjectName = queueName - - if (forInput): - odOptions = pymqi.CMQC.MQOO_INPUT_AS_Q_DEF - else: - od.ObjectType = pymqi.CMQC.MQOT_Q - odOptions = pymqi.CMQC.MQOO_OUTPUT + q = mq.Queue(qmgr) + od = mq.OD() + od.ObjectName = queue_name + open_options = mq.CMQC.MQOO_INPUT_AS_Q_DEF - q.open(od, odOptions) + q.open(od, open_options) return q - except pymqi.MQMIError as e: - logger.error("Error getting queue") - logger.error(e) + except mq.MQMIError as e: + logger.error('Error opening queue: %s', e) return None -# function to get message from Queue - - -def getMessages(qmgr): - logger.info('Attempting gets from Queue') - # Message Descriptor - +def get_messages(qmgr): + """Get messages from the input queue""" + logger.info('Attempting gets from queue') # Get Message Options - gmo = pymqi.GMO() - gmo.Options = pymqi.CMQC.MQGMO_WAIT | pymqi.CMQC.MQGMO_FAIL_IF_QUIESCING | pymqi.CMQC.MQGMO_SYNCPOINT - gmo.WaitInterval = 5000 # 5 seconds + gmo = mq.GMO() + + # Use the SYNCPOINT option so that any inbound request is in the same transaction as the + # response message + gmo.Options = mq.CMQC.MQGMO_WAIT | mq.CMQC.MQGMO_FAIL_IF_QUIESCING | mq.CMQC.MQGMO_SYNCPOINT + gmo.WaitInterval = WAIT_INTERVAL * 1000 # convert to milliseconds keep_running = True - - while keep_running: - backoutCounter = 0 - ok = True - msgObject = None + ok = True + + while keep_running and ok: + backout_counter = 0 + msg_object = None + + md = mq.MD() try: # Reset the MsgId, CorrelId & GroupId so that we can reuse # the same 'md' object again. - md = pymqi.MD() - md.MsgId = pymqi.CMQC.MQMI_NONE - md.CorrelId = pymqi.CMQC.MQCI_NONE - md.GroupId = pymqi.CMQC.MQGI_NONE - + md.MsgId = mq.CMQC.MQMI_NONE + md.CorrelId = mq.CMQC.MQCI_NONE + md.GroupId = mq.CMQC.MQGI_NONE + # Wait up to to gmo.WaitInterval for a new message. message = queue.get(None, md, gmo) - backoutCounter = md.BackoutCount + backout_counter = md.BackoutCount # Process the message here.. - msgObject = json.loads(message.decode()) - logger.info('Have message from Queue') - logger.info(msgObject) - ok = respondToRequest(md, msgObject) - - except pymqi.MQMIError as e: - if e.comp == pymqi.CMQC.MQCC_FAILED and e.reason == pymqi.CMQC.MQRC_NO_MSG_AVAILABLE: - # No messages, that's OK, we can ignore it. - ok = True + msg_object = json.loads(message.decode()) + logger.info('Have message from input queue: %s', message.decode()) + ok = respond_to_request(md, msg_object) + + except mq.MQMIError as e: + if e.comp == mq.CMQC.MQCC_FAILED and e.reason == mq.CMQC.MQRC_NO_MSG_AVAILABLE: + # No messages, that's OK, but we will now end the program. Might want to + # keep running forever in a real application, but for this sample, we will + # now give up. + keep_running = False + ok = False else: # Some other error condition. - ok = False + ok = False except (UnicodeDecodeError, ValueError) as e: logger.info('Message is not valid json') logger.info(e) logger.info(message) ok = False - continue except KeyboardInterrupt: logger.info('Have received a keyboard interrupt') keep_running = False - except: - ok = False - - - if ok == True: - #Commiting - qmgr.commit() - elif ok == False: - keep_running=rollback(qmgr, md, msgObject, backoutCounter) - - - - - -def rollback(qmgr , md, msg, backoutCounter): - # get the backout queue from the Environment --> fix this - BACKOUT_QUEUE = MQDetails[EnvStore.BACKOUT_QUEUE] - - ok = False - - # if the backout counter is greater than 5 - # handle possible poisoning message scenario - if (backoutCounter >= 5): - logger.info("POSIONING MESSAGE DETECTED! ") - logger.info("REDIRECTING THE MESSAGE TO THE BACKOUT QUEUE " + str(BACKOUT_QUEUE)) - backoutQueue = getQueue(BACKOUT_QUEUE, False) + if ok: + # Committing the GET and PUT as part of the same transaction + qmgr.commit() + else: + keep_running = rollback(qmgr, md, msg_object, backout_counter) + +def rollback(qmgr, md, msg, backout_counter): + """Deal with a problem processing the message. + If there have been multiple attempts at processing the same message, + send it to a separate backout queue. + """ + # Get the backout queue from the Environment. In a production + # system, you would probably find the backout queue from the BOQNAME attribute + # of the input queue. But for simplicity here, we're reading it from an external + # configuration. + backout_queue = MQDetails[EnvStore.BACKOUT_QUEUE] + + ok = False + pmo = mq.PMO() + pmo.Options = mq.CMQC.MQPMO_SYNCPOINT + + # if the backout counter is greater than 5, + # handle possible poisoned message scenario by redirecting the + # the message to another queue. + if backout_counter >= 5: + logger.info('Poison message detected!') try: - msg = EnvStore.stringForVersion(json.dumps(msg)) - backoutQueue.put(msg,md) - qmgr.commit() + msg = backout_queue.stringForVersion(json.dumps(msg)) + qmgr.put1(backout_queue, msg, md, pmo) + qmgr.commit() ok = True - logger.info("Message sent to the backout queue" + str(BACKOUT_QUEUE)) - except: - logger.info("Error on redirecting the message") + logger.info('Message sent to the backout queue: %s', str(backout_queue)) + except mq.MQMIError as e: + logger.info('Error on redirecting the message : %s', e) ok = False - else: - + else: try: - qmgr.backout() + qmgr.backout() ok = True - except: - logger.error("Error on rollback") + except mq.MQMIError as e: + logger.error('Error on rollback: %s', e) ok = False - + return ok - +def respond_to_request(in_md, msg_object): + """Repond to the request. + Set correlators based on the inbound request's report options + """ + out_md = mq.MD() + pmo = mq.PMO() + + # Make the response part of the same transaction as the request + pmo.Options |= mq.CMQC.MQPMO_SYNCPOINT + + od = mq.OD() + + # This value is a bit-field so we can use bitwise operations to test it. + ro = in_md.Report & (mq.CMQC.MQRO_COPY_MSG_ID_TO_CORREL_ID | + mq.CMQC.MQRO_PASS_MSG_ID | + mq.CMQC.MQRO_PASS_CORREL_ID | + mq.CMQC.MQRO_NEW_MSG_ID) + + # The default behaviour is to copy the inbound MsgId into the outbound CorrelId and create a new MsgId + if (ro & mq.CMQC.MQRO_COPY_MSG_ID_TO_CORREL_ID != 0) or (ro & mq.CMQC.MQRO_NEW_MSG_ID != 0) or ro == 0: + out_md.CorrelId = in_md.MsgId + pmo.Options |= mq.CMQC.MQPMO_NEW_MSG_ID + + # But there are options to allow a direct return of the MsgId and/or CorrelId + if ro & mq.CMQC.MQRO_PASS_MSG_ID != 0: + out_md.MsgId = in_md.MsgId + + if ro & mq.CMQC.MQRO_PASS_CORREL_ID != 0: + out_md.CorrelId = in_md.CorrelId + + # Also set report options that should be inherited + if in_md.Report & mq.CMQC.MQRO_PASS_DISCARD_AND_EXPIRY != 0: + out_md.Expiry = in_md.Expiry + if in_md.Report & mq.CMQC.MQRO_DISCARD_MSG != 0: + out_md.Report = mq.CMQC.MQRO_DISCARD_MSG + else: + out_md.Report = mq.CMQC.MQRO_NONE + else: + out_md.Report = mq.CMQC.MQRO_NONE -def respondToRequest(md, msgObject): - # Create a response message descriptor with the CorrelId - # set to the value of MsgId of the original request message. - response_md = pymqi.MD() - response_md.CorrelId = md.CorrelId - response_md.MsgId = md.MsgId - response_md.Format = pymqi.CMQC.MQFMT_STRING - response_md.ReplyToQ= md.ReplyToQ + # Set the reply message to be the same persistence as input + out_md.MsgType = mq.CMQC.MQMT_REPLY + out_md.Persistence = in_md.Persistence + out_md.Format = in_md.Format - msgReply = { - 'Greeting': "Reply from Python! " + str(datetime.datetime.now()), + msg_reply = { + 'Greeting': 'Reply from Python! ' + str(datetime.datetime.now()), 'value': random.randint(1, 101) } - replyQueue = getQueue(response_md.ReplyToQ, False) - - if (msgObject['value']): - msgReply['value'] = performCalc(msgObject['value']) + if msg_object['value'] is not None: + msg_reply['value'] = perform_calc(msg_object['value']) + + od.ObjectName = in_md.ReplyToQ + od.ObjectQMgrName = in_md.ReplyToQMgr try: - replyQueue.put(EnvStore.stringForVersion(json.dumps(msgReply)), response_md) + qmgr.put1(od, str(json.dumps(msg_reply)), out_md, pmo) return True - except: - #Roll back on exception + except mq.MQMIError as e: + # Returning False will cause the calling function to backout the operation + logger.error('Error putting message to reply queue: %s', e) return False - - -def performCalc(n): - sqRoot = math.floor(math.sqrt(n)) +def perform_calc(n): + """Do something with the input value to show that it has been processed""" + sq_root = math.floor(math.sqrt(n)) a = [] i = 2 j = 1 - while (sqRoot <= n and i <= sqRoot): - if (0 == n % i): + while i <= sq_root <= n: + if 0 == n % i: a.append(i) n /= i else: @@ -246,46 +277,37 @@ def performCalc(n): i += j return a - -def buildMQDetails(): +def build_mq_details(): + """Create the connection details for the queue manager""" for key in [EnvStore.QMGR, EnvStore.QUEUE_NAME, EnvStore.CHANNEL, EnvStore.HOST, EnvStore.PORT, EnvStore.KEY_REPOSITORY, EnvStore.CIPHER, EnvStore.BACKOUT_QUEUE]: - MQDetails[key] = EnvStore.getEnvValue(key) + MQDetails[key] = EnvStore.getenv_value(key) # Application Logic starts here -logger.info("Application is Starting") +logger.info('Application "BasicResponse" is starting') envStore = EnvStore() -envStore.setEnv() +envStore.set_env() MQDetails = {} -credentials = { - EnvStore.USER: EnvStore.getEnvValue(EnvStore.APP_USER), - EnvStore.PASSWORD: EnvStore.getEnvValue(EnvStore.APP_PASSWORD) -} +build_mq_details() -buildMQDetails() - -logger.info('Credentials are set') -#logger.info(credentials) - -#conn_info = "%s(%s)" % (MQDetails[EnvStore.HOST], MQDetails[EnvStore.PORT]) -conn_info = EnvStore.getConnection(EnvStore.HOST, EnvStore.PORT) +conn_info = EnvStore.get_connection(EnvStore.HOST, EnvStore.PORT) qmgr = None queue = None qmgr = connect() -if (qmgr): - queue = getQueue(MQDetails[EnvStore.QUEUE_NAME], True) - -if (queue): - getMessages(qmgr) +if qmgr is not None: + queue = get_queue(MQDetails[EnvStore.QUEUE_NAME]) + +if queue is not None: + get_messages(qmgr) queue.close() -if (qmgr): +if qmgr is not None: qmgr.disconnect() -logger.info("Application is closing") +logger.info('Application is ending') diff --git a/Python/basicsubscribe.py b/Python/basicsubscribe.py index dda909c2..3d8846f3 100644 --- a/Python/basicsubscribe.py +++ b/Python/basicsubscribe.py @@ -1,5 +1,7 @@ +"""Example of subscribing to an IBM MQ topic. +""" # -*- coding: utf-8 -*- -# Copyright 2019 IBM +# Copyright 2019,2025 IBM # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,122 +14,120 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from utils.env import EnvStore -import os -import json -import pymqi import logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) +import ibmmq as mq +from utils.env import EnvStore -# function to establish connection to MQ Queue Manager +WAIT_INTERVAL = 5 # seconds + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger('Sub') def connect(): - logger.info('Establising Connection with MQ Server') + """Establish connection to the queue manager""" + logger.info('Establishing connection with MQ Server') try: cd = None - if not EnvStore.ccdtCheck(): - logger.info('CCDT URL export is not set, will be using json envrionment client connections settings') + if not EnvStore.is_ccdt_available(): + logger.info('CCDT URL export is not set, will be using json environment client connections settings') - cd = pymqi.CD(Version=pymqi.CMQXC.MQCD_VERSION_11) + cd = mq.CD(Version=mq.CMQXC.MQCD_VERSION_11) cd.ChannelName = MQDetails[EnvStore.CHANNEL] cd.ConnectionName = conn_info - cd.ChannelType = pymqi.CMQC.MQCHT_CLNTCONN - cd.TransportType = pymqi.CMQC.MQXPT_TCP + cd.ChannelType = mq.CMQXC.MQCHT_CLNTCONN + cd.TransportType = mq.CMQXC.MQXPT_TCP - logger.info('Checking Cypher details') + logger.info('Checking cipher details') # If a cipher is set then set the TLS settings if MQDetails[EnvStore.CIPHER]: - logger.info('Making use of Cypher details') + logger.info('Making use of cipher details') cd.SSLCipherSpec = MQDetails[EnvStore.CIPHER] - # Key repository is not specified in CCDT so look in envrionment settings + # Key repository is not specified in CCDT so look in environment settings # Create an empty SCO object - sco = pymqi.SCO() + sco = mq.SCO() if MQDetails[EnvStore.KEY_REPOSITORY]: logger.info('Setting Key repository') sco.KeyRepository = MQDetails[EnvStore.KEY_REPOSITORY] - #options = pymqi.CMQC.MQPMO_NO_SYNCPOINT | pymqi.CMQC.MQPMO_NEW_MSG_ID | pymqi.CMQC.MQPMO_NEW_CORREL_ID - options = pymqi.CMQC.MQPMO_NEW_CORREL_ID + # Set credentials + csp = mq.CSP() + csp.CSPUserId = EnvStore.getenv_value(EnvStore.APP_USER) + csp.CSPPassword = EnvStore.getenv_value(EnvStore.APP_PASSWORD) + if csp.CSPUserId is None: + csp = None - qmgr = pymqi.QueueManager(None) + qmgr = mq.QueueManager(None) qmgr.connect_with_options(MQDetails[EnvStore.QMGR], - user=credentials[EnvStore.USER], - password=credentials[EnvStore.PASSWORD], - opts=options, cd=cd, sco=sco) + csp=csp, + cd=cd, sco=sco) return qmgr - except pymqi.MQMIError as e: - logger.error("Error connecting") - logger.error(e) + except mq.MQMIError as e: + logger.error('Error connecting: %s', e) return None -# function to establish connection to Topic - - -def getSubscription(): - logger.info('Connecting to Subscription') +def get_subscription(): + """Get access to the topic via a subscription. + Using a MANAGED subscription means that the queue manager will create and + return the queue to which publications are delivered. + """ + logger.info('Creating subscription') try: - sub_desc = pymqi.SD() - sub_desc["Options"] = pymqi.CMQC.MQSO_CREATE + pymqi.CMQC.MQSO_RESUME + \ - pymqi.CMQC.MQSO_DURABLE + pymqi.CMQC.MQSO_MANAGED - sub_desc.set_vs("SubName", "MySub") - sub_desc.set_vs("ObjectString", MQDetails[EnvStore.TOPIC_NAME]) - - sub = pymqi.Subscription(qmgr) + sub_desc = mq.SD() + sub_desc.Options = mq.CMQC.MQSO_CREATE | \ + mq.CMQC.MQSO_MANAGED | \ + mq.CMQC.MQSO_NON_DURABLE | \ + mq.CMQC.MQSO_FAIL_IF_QUIESCING + sub_desc.set_vs('ObjectString', MQDetails[EnvStore.TOPIC_NAME]) + + sub = mq.Subscription(qmgr) sub.sub(sub_desc=sub_desc) return sub - except pymqi.MQMIError as e: - logger.error("Error getting queue") - logger.error(e) + except mq.MQMIError as e: + logger.error('Error opening queue: %s', e) return None -# function to get messages from subscription - +def get_messages(): + """Get publications from the subscription queue""" + logger.info('Attempting gets from subscription queue') -def getMessages(): - logger.info('Attempting gets from Subscription') + get_options = mq.CMQC.MQGMO_NO_SYNCPOINT | \ + mq.CMQC.MQGMO_FAIL_IF_QUIESCING | \ + mq.CMQC.MQGMO_WAIT | \ + mq.CMQC.MQGMO_NO_PROPERTIES - subOptions = pymqi.CMQC.MQGMO_NO_SYNCPOINT + \ - pymqi.CMQC.MQGMO_FAIL_IF_QUIESCING + \ - pymqi.CMQC.MQGMO_WAIT + \ - pymqi.CMQC.MQGMO_NO_PROPERTIES - - gmo = pymqi.GMO(Options=subOptions) - gmo["WaitInterval"] = 30 * 1000 + gmo = mq.GMO(Options=get_options) + gmo.WaitInterval = WAIT_INTERVAL * 1000 # convert to milliseconds # Message Descriptor - md = pymqi.MD() + md = mq.MD() keep_running = True while keep_running: try: # Reset the MsgId, CorrelId & GroupId so that we can reuse # the same 'md' object again. - md.MsgId = pymqi.CMQC.MQMI_NONE - md.CorrelId = pymqi.CMQC.MQCI_NONE - md.GroupId = pymqi.CMQC.MQGI_NONE + md.MsgId = mq.CMQC.MQMI_NONE + md.CorrelId = mq.CMQC.MQCI_NONE + md.GroupId = mq.CMQC.MQGI_NONE - #message = subscription.get(None, pymqi.md(), gmo) message = subscription.get(None, md, gmo) # Process the message here.. - msgObject = json.loads(message.decode()) - logger.info('Have message from Queue') - logger.info(msgObject) - - except pymqi.MQMIError as e: - if e.comp == pymqi.CMQC.MQCC_FAILED and e.reason == pymqi.CMQC.MQRC_NO_MSG_AVAILABLE: - # No messages, that's OK, we can ignore it. - pass + logger.info('Have publication on topic: %s', message.decode()) + + except mq.MQMIError as e: + if e.comp == mq.CMQC.MQCC_FAILED and e.reason == mq.CMQC.MQRC_NO_MSG_AVAILABLE: + # No messages, that's OK, we can just exit + keep_running = False else: # Some other error condition. raise - except (UnicodeDecodeError, ValueError) as e: + except (UnicodeDecodeError, ValueError) as e: logger.info('Message is not valid json') logger.info(e) logger.info(message) @@ -138,44 +138,35 @@ def getMessages(): keep_running = False -def buildMQDetails(): +def build_mq_details(): + """Create the connection details for the queue manager""" for key in [EnvStore.QMGR, EnvStore.TOPIC_NAME, EnvStore.CHANNEL, EnvStore.HOST, EnvStore.PORT, EnvStore.KEY_REPOSITORY, EnvStore.CIPHER]: - MQDetails[key] = EnvStore.getEnvValue(key) + MQDetails[key] = EnvStore.getenv_value(key) # Application Logic starts here -logger.info("Application is Starting") +logger.info('Application "BasicSubscribe" is starting') envStore = EnvStore() -envStore.setEnv() +envStore.set_env() MQDetails = {} -credentials = { - EnvStore.USER: EnvStore.getEnvValue(EnvStore.APP_USER), - EnvStore.PASSWORD: EnvStore.getEnvValue(EnvStore.APP_PASSWORD) -} +build_mq_details() -buildMQDetails() - -logger.info('Credentials are set') -#logger.info(credentials) - -#conn_info = "%s(%s)" % (MQDetails[EnvStore.HOST], MQDetails[EnvStore.PORT]) -conn_info = EnvStore.getConnection(EnvStore.HOST, EnvStore.PORT) +conn_info = EnvStore.get_connection(EnvStore.HOST, EnvStore.PORT) qmgr = None subscription = None qmgr = connect() -if (qmgr): - subscription = getSubscription() -if (subscription): - getMessages() - subscription.close( - sub_close_options=pymqi.CMQC.MQCO_KEEP_SUB, close_sub_queue=True) - -if (qmgr): +if qmgr is not None: + subscription = get_subscription() +if subscription is not None: + get_messages() + subscription.close(close_sub_queue=True) + +if qmgr is not None: qmgr.disconnect() -logger.info("Application is closing") +logger.info('Application is ending') diff --git a/Python/utils/__init__.py b/Python/utils/__init__.py index 8b137891..e3207758 100644 --- a/Python/utils/__init__.py +++ b/Python/utils/__init__.py @@ -1 +1,2 @@ - +"""This file left intentionally blank +""" diff --git a/Python/utils/env.py b/Python/utils/env.py index 89a6d87a..d001c9a2 100644 --- a/Python/utils/env.py +++ b/Python/utils/env.py @@ -1,5 +1,6 @@ +"""Setup the environment for connections to IBM MQ""" # -*- coding: utf-8 -*- -# Copyright 2019 IBM +# Copyright 2019, 2025 IBM # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,15 +16,17 @@ import os import json -import sys import logging -logger = logging.getLogger(__name__) +from typing import Any, Generator, Union +logger = logging.getLogger(__name__) -class EnvStore(object): +# The env variable has an unknown structure here because it's created from JSON. So we disable some linter checks +# pylint: disable=unsubscriptable-object,unsupported-membership-test +class EnvStore(): """ - Load Envrionment Exports from local store + Load configuration from local store """ env = None @@ -48,106 +51,99 @@ class EnvStore(object): FILEPREFIX = "file://" def __init__(self): - super(EnvStore, self).__init__() + super().__init__() if EnvStore.env is None: module_dir = os.path.dirname(__file__) file_path = os.path.join(module_dir, '../../', 'env.json') - logger.info( - "Looking for file %s for envrionment variables" % file_path) + # See if there's an override environment variable for the config file try: - with open(file_path) as f: + file_path = os.environ['JSON_CONFIG'] + except KeyError: + # If the env variable was not set, that's OK. Use the default. + pass + + logger.info("Looking for config file: %s", file_path) + try: + with open(file_path, encoding='utf-8') as f: EnvStore.env = json.loads(f.read()) - # Python 2 - except IOError as e: - logger.info( - 'I/O error reading file ({0}): {1}' % (e.errno, e.strerror)) - except ValueError: - logger.info('Parsing error') - except: - logger.info('Unexpected error:') - # Python 3 - # except FileNotFoundError: - # logger.info("Envrionment File was not found") - - def checkEndPointIsList(self): - if (EnvStore.env - and EnvStore.MQ_ENDPOINTS in EnvStore.env - and isinstance( EnvStore.env[EnvStore.MQ_ENDPOINTS], list)): - return True + except Exception: + logger.info('Error reading/parsing file: %s', file_path) + raise + + def is_endpoint_list(self) -> bool: + """Do we have a list of endpoints?""" + if EnvStore.env is not None \ + and EnvStore.MQ_ENDPOINTS in EnvStore.env \ + and isinstance(EnvStore.env[EnvStore.MQ_ENDPOINTS], list): + return True return False - def setEnv(self): - if self.checkEndPointIsList(): - logger.info('Have File so ready to set envrionment variables') + def set_env(self) -> None: + """Set the attributes from the configuration file as environment variables. + Most values come from only the first block of details in the JSON file. + """ + if self.is_endpoint_list(): + logger.info('Have file, so ready to set environment variables for configuration') for e in EnvStore.env[EnvStore.MQ_ENDPOINTS][0]: os.environ[e] = EnvStore.env[EnvStore.MQ_ENDPOINTS][0][e] if EnvStore.PASSWORD not in e: - logger.info('Checking %s value is %s ' % (e, EnvStore.env[EnvStore.MQ_ENDPOINTS][0][e])) - # Check if there are multiple endpoints defined + logger.debug('Checking %s value is %s ', e, EnvStore.env[EnvStore.MQ_ENDPOINTS][0][e]) + # Check if there are multiple endpoints defined. If so, build a string containing all of them. if len(EnvStore.env[EnvStore.MQ_ENDPOINTS]) > 0: - os.environ[EnvStore.CONNECTION_STRING] = self.buildConnectionString(EnvStore.env[EnvStore.MQ_ENDPOINTS]) + os.environ[EnvStore.CONNECTION_STRING] = self.build_connection_string(EnvStore.env[EnvStore.MQ_ENDPOINTS]) else: - logger.info('No envrionment variables to set') + logger.info('No environment variables to set') - def buildConnectionString(self, points): + def build_connection_string(self, points: list) -> str: + """Return the CONNAME string built from the configuration values""" logger.info('Building a connection string') - l = [] + conn_string = [] for point in points: if EnvStore.HOST in point and EnvStore.PORT in point: p = '%s(%s)' % (point[EnvStore.HOST], point[EnvStore.PORT]) - logger.info('endpoint is %s' % p) - l.append(p) - s = ','.join(l) - logger.info('Connection string is %s' % s) + logger.info('endpoint is %s', p) + conn_string.append(p) + s = ','.join(conn_string) + logger.info('Connection string is %s', s) return s - def getEndpointCount(self): - if self.checkEndPointIsList(): + def get_endpoint_count(self) -> int: + """How many endpoints are configured""" + if self.is_endpoint_list(): return len(EnvStore.env[EnvStore.MQ_ENDPOINTS]) return 1 - def getNextConnectionString(self): + def get_next_connection_string(self) -> Generator[Any, int, str]: + """Return the next in the list""" for i, p in enumerate(EnvStore.env[EnvStore.MQ_ENDPOINTS]): - info = "%s(%s)" % (p[EnvStore.HOST], p[EnvStore.PORT]) - if sys.version_info[0] < 3: - yield i, str(info) - else: - yield i, bytes(info, 'utf-8') - + info = "%s(%s)" % (p[EnvStore.HOST], p[EnvStore.PORT]) + yield i, str(info) - # function to retrieve variable from Envrionment + # function to retrieve variable from Environment @staticmethod - def getEnvValue(key, index = 0): + def getenv_value(key: str, index: int = 0) -> Union[str, None]: + """Return the value of an attribute either from the environment variable or configuration file. + If no index is given, the returned value comes from the first connection's entry in the JSON file. + """ v = os.getenv(key) if index == 0 else EnvStore.env[EnvStore.MQ_ENDPOINTS][index].get(key) - if sys.version_info[0] < 3: - return str(v) if v else None - else: - return bytes(v, 'utf-8') if v else None + return str(v) if v else None @staticmethod - def getConnection(host, port): + def get_connection(host: str, port: str) -> str: + """Return the ConnName directly""" info = os.getenv(EnvStore.CONNECTION_STRING) if not info: - info = "%s(%s)" % (os.getenv(host), os.getenv(port)) - if sys.version_info[0] < 3: - return str(info) - else: - return bytes(info, 'utf-8') - - @staticmethod - def stringForVersion(data): - if sys.version_info[0] < 3: - return str(data) - else: - return bytes(data, 'utf-8') + info = "%s(%s)" % (os.getenv(host), os.getenv(port)) + return str(info) @staticmethod - def ccdtCheck(): - fPath = EnvStore.getEnvValue(EnvStore.CCDT) - if fPath: - ccdtFile = fPath if not fPath.startswith(EnvStore.FILEPREFIX) else fPath[len(EnvStore.FILEPREFIX):] - if os.path.isfile(ccdtFile): - logger.info('CCDT file found at %s ' % ccdtFile) + def is_ccdt_available() -> bool: + """Is there a CCDT configured""" + file_path = EnvStore.getenv_value(EnvStore.CCDT) + if file_path: + ccdt_file = file_path if not file_path.startswith(EnvStore.FILEPREFIX) else file_path[len(EnvStore.FILEPREFIX):] + if os.path.isfile(ccdt_file): + logger.info('CCDT file found at %s ', ccdt_file) return True return False