2
2
# Copyright (c) Microsoft Corporation.
3
3
# Licensed under the MIT License.
4
4
# ------------------------------------
5
+ import base64
5
6
import os
6
7
import sys
7
8
@@ -82,20 +83,31 @@ def live_service_principal():
82
83
}
83
84
84
85
85
- def get_certificate_parameters (content , password_protected_content , password , extension ):
86
- # type: (bytes, bytes, str, str) -> dict
86
+ def get_certificate_parameters (content : bytes , extension : str ) -> dict :
87
87
current_directory = os .path .dirname (__file__ )
88
88
parameters = {
89
89
"cert_bytes" : content ,
90
90
"cert_path" : os .path .join (current_directory , "certificate." + extension ),
91
+ }
92
+
93
+ try :
94
+ with open (parameters ["cert_path" ], "wb" ) as f :
95
+ f .write (parameters ["cert_bytes" ])
96
+ except IOError as ex :
97
+ pytest .skip ("Failed to write a file: {}" .format (ex ))
98
+
99
+ return parameters
100
+
101
+
102
+ def get_certificate_with_password_parameters (password_protected_content : bytes , password : str , extension : str ) -> dict :
103
+ current_directory = os .path .dirname (__file__ )
104
+ parameters = {
91
105
"cert_with_password_bytes" : password_protected_content ,
92
106
"cert_with_password_path" : os .path .join (current_directory , "certificate-with-password." + extension ),
93
107
"password" : password ,
94
108
}
95
109
96
110
try :
97
- with open (parameters ["cert_path" ], "wb" ) as f :
98
- f .write (parameters ["cert_bytes" ])
99
111
with open (parameters ["cert_with_password_path" ], "wb" ) as f :
100
112
f .write (parameters ["cert_with_password_bytes" ])
101
113
except IOError as ex :
@@ -110,31 +122,45 @@ def live_pem_certificate(live_service_principal):
110
122
password_protected_content = os .environ .get ("PEM_CONTENT_PASSWORD_PROTECTED" )
111
123
password = os .environ .get ("CERTIFICATE_PASSWORD" )
112
124
113
- if content and password_protected_content and password :
114
- parameters = get_certificate_parameters (
115
- content .encode ("utf-8" ), password_protected_content .encode ("utf-8" ), password , "pem"
125
+ cert_info = {}
126
+
127
+ if content :
128
+ content = content .replace ("\\ n" , "\r \n " )
129
+ parameters = get_certificate_parameters (content .encode ("utf-8" ), "pem" )
130
+ cert_info .update (parameters )
131
+
132
+ if password_protected_content and password :
133
+ parameters = get_certificate_with_password_parameters (
134
+ password_protected_content .encode ("utf-8" ), password , "pem"
116
135
)
117
- return dict ( live_service_principal , ** parameters )
136
+ cert_info . update ( parameters )
118
137
138
+ if cert_info :
139
+ return dict (live_service_principal , ** cert_info )
119
140
pytest .skip ("Missing PEM certificate configuration" )
120
141
121
142
122
143
@pytest .fixture ()
123
144
def live_pfx_certificate (live_service_principal ):
124
145
# PFX bytes arrive base64 encoded because Key Vault secrets have string values
125
- encoded_content = os .environ .get ("PFX_CONTENT " )
146
+ encoded_content = os .environ .get ("PFX_CONTENTS " )
126
147
encoded_password_protected_content = os .environ .get ("PFX_CONTENT_PASSWORD_PROTECTED" )
127
148
password = os .environ .get ("CERTIFICATE_PASSWORD" )
128
149
129
- if encoded_content and encoded_password_protected_content and password :
130
- import base64
150
+ cert_info = {}
131
151
152
+ if encoded_content :
132
153
content = base64 .b64decode (encoded_content .encode ("utf-8" ))
133
- password_protected_content = base64 .b64decode (encoded_password_protected_content .encode ("utf-8" ))
154
+ parameters = get_certificate_parameters (content , "pfx" )
155
+ cert_info .update (parameters )
134
156
135
- parameters = get_certificate_parameters (content , password_protected_content , password , "pfx" )
136
- return dict (live_service_principal , ** parameters )
157
+ if encoded_password_protected_content and password :
158
+ password_protected_content = base64 .b64decode (encoded_password_protected_content .encode ("utf-8" ))
159
+ parameters = get_certificate_with_password_parameters (password_protected_content , password , "pfx" )
160
+ cert_info .update (parameters )
137
161
162
+ if cert_info :
163
+ return dict (live_service_principal , ** cert_info )
138
164
pytest .skip ("Missing PFX certificate configuration" )
139
165
140
166
0 commit comments