diff --git a/keycloak-client/lib/keycloak-clients.yaml b/keycloak-client/lib/keycloak-clients.yaml new file mode 100644 index 00000000..4f300438 --- /dev/null +++ b/keycloak-client/lib/keycloak-clients.yaml @@ -0,0 +1,723 @@ +apiVersion: argoproj.io/v1alpha1 +kind: WorkflowTemplate +metadata: + name: keycloak-client + namespace: argo +spec: + templates: + - name: delete-client + inputs: + parameters: + - name: server_url + value: http://tks-console-dev.taco-cat.xyz/auth/ + - name: target_realm_name + value: test3 + - name: target_client_id + value: test-client + - name: keycloak_credential_secret_name + value: keycloak + - name: keycloak_credential_secret_namespace + value: keycloak + script: + command: + - python3 + image: harbor-cicd.taco-cat.xyz/dev/python-keycloak-cli:v0.1.0 + source: |2 + + from keycloak import KeycloakOpenID, KeycloakAdmin, KeycloakOpenIDConnection + import requests + from kubernetes import client, config + import sys + import base64 + input_params = {'server_url': '{{inputs.parameters.server_url}}', 'target_realm_name': '{{inputs.parameters.target_realm_name}}', 'target_client_id': '{{inputs.parameters.target_client_id}}', 'keycloak_credential_secret_name': '{{inputs.parameters.keycloak_credential_secret_name}}', 'keycloak_credential_secret_namespace': '{{inputs.parameters.keycloak_credential_secret_namespace}}'} + + def get_kubernetes_api(local=False): + if local: + import os + kubeconfig_path = os.path.expandvars('$HOME/.kube/config') + config.load_kube_config(config_file=kubeconfig_path) + else: + config.load_incluster_config() + return client.CoreV1Api() + + def get_secret(k8s_client, secret_name, secret_namespace): + secret_obj = k8s_client.read_namespaced_secret(name=secret_name, namespace=secret_namespace) + encoded_data = secret_obj.data.get('admin-password') + decoded_data = base64.b64decode(encoded_data).decode('utf-8') + return decoded_data + + def delete_client(url, realm_name, client_id, token): + if (url[(- 1)] == '/'): + url = url[:(- 1)] + path = f'/admin/realms/{realm_name}/clients/{client_id}' + headers = {'Content-Type': 'application/json', 'Authorization': ('Bearer ' + token['access_token'])} + response = requests.delete((url + path), headers=headers) + if (response.status_code == 204): + print(f'delete client {client_id} success') + elif (response.status_code == 404): + raise Exception(response.text) + else: + raise Exception(response.text) + k8s_client = get_kubernetes_api(local=False) + try: + secret_name = input_params['keycloak_credential_secret_name'] + secret_namespace = input_params['keycloak_credential_secret_namespace'] + secret = get_secret(k8s_client, secret_name, secret_namespace) + print(f'get secret "{secret_name}" in "{secret_namespace}" namespace') + except Exception as e: + print(e) + print(f'failed to get secret "{secret_name}" in "{secret_namespace}" namespace') + sys.exit(1) + keycloak_connection = KeycloakOpenIDConnection(server_url=input_params['server_url'], client_id='admin-cli', realm_name=input_params['target_realm_name'], user_realm_name='master', username='admin', password=secret, verify=True) + keycloak_openid = KeycloakOpenID(server_url=input_params['server_url'], client_id='admin-cli', realm_name='master') + try: + keycloak_admin = KeycloakAdmin(connection=keycloak_connection) + print(f"login to {input_params['server_url']} success") + except Exception as e: + print(e) + print(f"login to {input_params['server_url']} failed") + sys.exit(1) + try: + try: + hashed_client_id = keycloak_admin.get_client_id(client_id=input_params['target_client_id']) + print(f"""hashed_client_id of client id "{input_params['target_client_id']}" is "{hashed_client_id}".""") + except Exception as inner_e: + print(inner_e) + raise Exception('get client failed') + try: + delete_client(url=input_params['server_url'], realm_name=input_params['target_realm_name'], client_id=hashed_client_id, token=keycloak_admin.connection.token) + except Exception as inner_e: + print(inner_e) + raise Exception('delete client on keycloak failed') + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + except Exception as e: + print(e) + print(f"""delete client "{input_params['target_client_id']}" failed""") + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + sys.exit(1) + - name: create-client-role + inputs: + parameters: + - name: server_url + value: https://tks-console-dev.taco-cat.xyz/auth/ + - name: target_realm_name + value: test3 + - name: target_client_id + value: test-client2 + - name: keycloak_credential_secret_name + value: keycloak + - name: keycloak_credential_secret_namespace + value: keycloak + - name: client_role_name + value: admin + script: + command: + - python3 + image: harbor-cicd.taco-cat.xyz/dev/python-keycloak-cli:v0.1.0 + source: |2 + + from keycloak import KeycloakOpenIDConnection, KeycloakAdmin, KeycloakOpenID + from kubernetes import client, config + import sys + import base64 + input_params = {'server_url': '{{inputs.parameters.server_url}}', 'target_realm_name': '{{inputs.parameters.target_realm_name}}', 'target_client_id': '{{inputs.parameters.target_client_id}}', 'keycloak_credential_secret_name': '{{inputs.parameters.keycloak_credential_secret_name}}', 'keycloak_credential_secret_namespace': '{{inputs.parameters.keycloak_credential_secret_namespace}}', 'client_role_name': '{{inputs.parameters.client_role_name}}'} + + def get_kubernetes_api(local=False): + if local: + import os + kubeconfig_path = os.path.expandvars('$HOME/dev_kubeconfig/kubeconfig') + config.load_kube_config(config_file=kubeconfig_path) + else: + config.load_incluster_config() + return client.CoreV1Api() + + def get_secret(k8s_client, secret_name, secret_namespace): + secret_obj = k8s_client.read_namespaced_secret(name=secret_name, namespace=secret_namespace) + encoded_data = secret_obj.data.get('admin-password') + decoded_data = base64.b64decode(encoded_data).decode('utf-8') + return decoded_data + k8s_client = get_kubernetes_api(local=False) + try: + secret_name = input_params['keycloak_credential_secret_name'] + secret_namespace = input_params['keycloak_credential_secret_namespace'] + secret = get_secret(k8s_client, secret_name, secret_namespace) + print(f'get secret "{secret_name}" in "{secret_namespace}" namespace') + except Exception as e: + print(e) + print(f'failed to get secret "{secret_name}" in "{secret_namespace}" namespace') + sys.exit(1) + keycloak_connection = KeycloakOpenIDConnection(server_url=input_params['server_url'], client_id='admin-cli', realm_name=input_params['target_realm_name'], user_realm_name='master', username='admin', password=secret, verify=True) + keycloak_openid = KeycloakOpenID(server_url=input_params['server_url'], client_id='admin-cli', realm_name='master') + try: + keycloak_admin = KeycloakAdmin(connection=keycloak_connection) + print(f"login to {input_params['server_url']} success") + except Exception as e: + print(e) + print(f"login to {input_params['server_url']} failed") + sys.exit(1) + try: + try: + hashed_client_id = keycloak_admin.get_client_id(client_id=input_params['target_client_id']) + print(f"""hashed_client_id of client id "{input_params['target_client_id']}" is "{hashed_client_id}".""") + except Exception as inner_e: + print(inner_e) + raise Exception(f"""get client id "{input_params['target_client_id']} failed""") + try: + role_name = input_params['client_role_name'] + keycloak_admin.create_client_role(client_role_id=hashed_client_id, payload={'name': role_name, 'clientRole': True}) + print(f"""create client role "{role_name}" in client "{input_params['target_client_id']}" success""") + except Exception as inner_e: + print(inner_e) + raise Exception('create client role on keycloak failed') + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + except Exception as e: + print(e) + print(f"""create client role {role_name} in client "{input_params['target_client_id']}" failed""") + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + sys.exit(1) + - name: append-client-redirect-uri + inputs: + parameters: + - name: server_url + value: http://tks-console-dev.taco-cat.xyz/auth/ + - name: target_realm_name + value: test3 + - name: target_client_id + value: k8s-oidc7 + - name: keycloak_credential_secret_name + value: keycloak + - name: keycloak_credential_secret_namespace + value: keycloak + - name: redirect_uri + value: aaaa + script: + command: + - python3 + image: harbor-cicd.taco-cat.xyz/dev/python-keycloak-cli:v0.1.0 + source: |2 + + from keycloak import KeycloakOpenID, KeycloakAdmin, KeycloakOpenIDConnection + from kubernetes import client, config + import sys + import base64 + input_params = {'server_url': '{{inputs.parameters.server_url}}', 'target_realm_name': '{{inputs.parameters.target_realm_name}}', 'target_client_id': '{{inputs.parameters.target_client_id}}', 'keycloak_credential_secret_name': '{{inputs.parameters.keycloak_credential_secret_name}}', 'keycloak_credential_secret_namespace': '{{inputs.parameters.keycloak_credential_secret_namespace}}', 'redirect_uri': '{{inputs.parameters.redirect_uri}}'} + + def get_kubernetes_api(local=False): + if local: + import os + kubeconfig_path = os.path.expandvars('$HOME/.kube/config') + config.load_kube_config(config_file=kubeconfig_path) + else: + config.load_incluster_config() + return client.CoreV1Api() + + def get_secret(k8s_client, secret_name, secret_namespace): + secret_obj = k8s_client.read_namespaced_secret(name=secret_name, namespace=secret_namespace) + encoded_data = secret_obj.data.get('admin-password') + decoded_data = base64.b64decode(encoded_data).decode('utf-8') + return decoded_data + k8s_client = get_kubernetes_api(local=False) + try: + secret_name = input_params['keycloak_credential_secret_name'] + secret_namespace = input_params['keycloak_credential_secret_namespace'] + secret = get_secret(k8s_client, secret_name, secret_namespace) + print(f'get secret "{secret_name}" in "{secret_namespace}" namespace') + except Exception as e: + print(e) + print(f'failed to get secret "{secret_name}" in "{secret_namespace}" namespace') + sys.exit(1) + keycloak_connection = KeycloakOpenIDConnection(server_url=input_params['server_url'], client_id='admin-cli', realm_name=input_params['target_realm_name'], user_realm_name='master', username='admin', password=secret, verify=True) + keycloak_openid = KeycloakOpenID(server_url=input_params['server_url'], client_id='admin-cli', realm_name='master') + try: + keycloak_admin = KeycloakAdmin(connection=keycloak_connection) + print(f"login to {input_params['server_url']} success") + except Exception as e: + print(e) + print(f"login to {input_params['server_url']} failed") + sys.exit(1) + try: + try: + hashed_client_id = keycloak_admin.get_client_id(input_params['target_client_id']) + print(f"""hashed_client_id of client id "{input_params['target_client_id']}" is "{hashed_client_id}".""") + client = keycloak_admin.get_client(client_id=hashed_client_id) + existing_redirect_uris = client['redirectUris'] + except Exception as inner_e: + print(inner_e) + raise Exception(f"""get client id "{input_params['target_client_id']} failed""") + try: + redirect_uri = input_params['redirect_uri'] + if (redirect_uri in existing_redirect_uris): + print(f""""{redirect_uri}" already exists in client "{input_params['target_client_id']}".""") + else: + existing_redirect_uris.append(redirect_uri) + client['redirectUris'] = existing_redirect_uris + keycloak_admin.update_client(client_id=hashed_client_id, payload=client) + print(f"""append "{redirect_uri}" in client "{input_params['target_client_id']}" success""") + except Exception as inner_e: + print(inner_e) + raise Exception('update client on keycloak failed') + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + except Exception as e: + print(e) + print(f"""append redirect uri "{input_params['redirect_uri']}" to client "{input_params['target_client_id']}" failed""") + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + sys.exit(1) + - name: remove-client-redirect-uri + inputs: + parameters: + - name: server_url + value: http://tks-console-dev.taco-cat.xyz/auth/ + - name: target_realm_name + value: test3 + - name: target_client_id + value: k8s-oidc7 + - name: keycloak_credential_secret_name + value: keycloak + - name: keycloak_credential_secret_namespace + value: keycloak + - name: redirect_uri + value: aaaa + script: + command: + - python3 + image: harbor-cicd.taco-cat.xyz/dev/python-keycloak-cli:v0.1.0 + source: |2 + + from keycloak import KeycloakOpenID, KeycloakAdmin, KeycloakOpenIDConnection + from kubernetes import client, config + import sys + import base64 + input_params = {'server_url': '{{inputs.parameters.server_url}}', 'target_realm_name': '{{inputs.parameters.target_realm_name}}', 'target_client_id': '{{inputs.parameters.target_client_id}}', 'keycloak_credential_secret_name': '{{inputs.parameters.keycloak_credential_secret_name}}', 'keycloak_credential_secret_namespace': '{{inputs.parameters.keycloak_credential_secret_namespace}}', 'redirect_uri': '{{inputs.parameters.redirect_uri}}'} + + def get_kubernetes_api(local=False): + if local: + import os + kubeconfig_path = os.path.expandvars('$HOME/.kube/config') + config.load_kube_config(config_file=kubeconfig_path) + else: + config.load_incluster_config() + return client.CoreV1Api() + + def get_secret(k8s_client, secret_name, secret_namespace): + secret_obj = k8s_client.read_namespaced_secret(name=secret_name, namespace=secret_namespace) + encoded_data = secret_obj.data.get('admin-password') + decoded_data = base64.b64decode(encoded_data).decode('utf-8') + return decoded_data + k8s_client = get_kubernetes_api(local=False) + try: + secret_name = input_params['keycloak_credential_secret_name'] + secret_namespace = input_params['keycloak_credential_secret_namespace'] + secret = get_secret(k8s_client, secret_name, secret_namespace) + print(f'get secret "{secret_name}" in "{secret_namespace}" namespace') + except Exception as e: + print(e) + print(f'failed to get secret "{secret_name}" in "{secret_namespace}" namespace') + sys.exit(1) + keycloak_connection = KeycloakOpenIDConnection(server_url=input_params['server_url'], client_id='admin-cli', realm_name=input_params['target_realm_name'], user_realm_name='master', username='admin', password=secret, verify=True) + keycloak_openid = KeycloakOpenID(server_url=input_params['server_url'], client_id='admin-cli', realm_name='master') + try: + keycloak_admin = KeycloakAdmin(connection=keycloak_connection) + print(f"login to {input_params['server_url']} success") + except Exception as e: + print(e) + print(f"login to {input_params['server_url']} failed") + sys.exit(1) + try: + try: + hashed_client_id = keycloak_admin.get_client_id(input_params['target_client_id']) + print(f"""hashed_client_id of client id "{input_params['target_client_id']}" is "{hashed_client_id}".""") + client = keycloak_admin.get_client(client_id=hashed_client_id) + existing_redirect_uris = client['redirectUris'] + except Exception as inner_e: + print(inner_e) + raise Exception(f"""get client id "{input_params['target_client_id']} failed""") + try: + if (input_params['redirect_uri'] not in existing_redirect_uris): + print(f"""redirect-uri "{input_params['redirect_uri']}" not exist in client "{hashed_client_id}".""") + else: + existing_redirect_uris.remove(input_params['redirect_uri']) + client['redirectUris'] = existing_redirect_uris + keycloak_admin.update_client(client_id=hashed_client_id, payload=client) + print(f"""remove redirect-uri "{input_params['redirect_uri']}" in client "{hashed_client_id}" success""") + except Exception as inner_e: + print(inner_e) + raise Exception(f'remove redirect-uri in client {hashed_client_id} on keycloak failed') + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + except Exception as e: + print(e) + print(f"""remove redirect uri "{input_params['redirect_uri']}" to client "{input_params['target_client_id']}" failed""") + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + sys.exit(1) + - name: create-client-scope-mapper-client-role + inputs: + parameters: + - name: server_url + value: http://tks-console-dev.taco-cat.xyz/auth/ + - name: target_realm_name + value: test3 + - name: target_client_id + value: k8s-oidc6 + - name: keycloak_credential_secret_name + value: keycloak + - name: keycloak_credential_secret_namespace + value: keycloak + - name: mapper_name + value: k8s-role-mapper + - name: claim_name + value: groups + - name: add_to_access_token + value: false + - name: add_to_id_token + value: true + - name: add_to_userinfo + value: false + script: + command: + - python3 + image: harbor-cicd.taco-cat.xyz/dev/python-keycloak-cli:v0.1.0 + source: |2 + + from keycloak import KeycloakOpenIDConnection, KeycloakAdmin, KeycloakOpenID + import requests + from kubernetes import client, config + import sys + import base64 + input_params = {'server_url': '{{inputs.parameters.server_url}}', 'target_realm_name': '{{inputs.parameters.target_realm_name}}', 'target_client_id': '{{inputs.parameters.target_client_id}}', 'keycloak_credential_secret_name': '{{inputs.parameters.keycloak_credential_secret_name}}', 'keycloak_credential_secret_namespace': '{{inputs.parameters.keycloak_credential_secret_namespace}}', 'mapper_name': '{{inputs.parameters.mapper_name}}', 'claim_name': '{{inputs.parameters.claim_name}}', 'add_to_access_token': '{{inputs.parameters.add_to_access_token}}', 'add_to_id_token': '{{inputs.parameters.add_to_id_token}}', 'add_to_userinfo': '{{inputs.parameters.add_to_userinfo}}'} + + def get_kubernetes_api(local=False): + if local: + import os + kubeconfig_path = os.path.expandvars('$HOME/.kube/config') + config.load_kube_config(config_file=kubeconfig_path) + else: + config.load_incluster_config() + return client.CoreV1Api() + + def get_secret(k8s_client, secret_name, secret_namespace): + secret_obj = k8s_client.read_namespaced_secret(name=secret_name, namespace=secret_namespace) + encoded_data = secret_obj.data.get('admin-password') + decoded_data = base64.b64decode(encoded_data).decode('utf-8') + return decoded_data + + def create_client_scope_mapper(url, realm_name, client_id, hashed_client_id, token, mapper_name): + if (url[(- 1)] == '/'): + url = url[:(- 1)] + path = f'/admin/realms/{realm_name}/clients/{hashed_client_id}/protocol-mappers/models' + headers = {'Content-Type': 'application/json', 'Authorization': ('Bearer ' + token['access_token'])} + data = {'name': mapper_name, 'protocol': 'openid-connect', 'protocolMapper': 'oidc-usermodel-client-role-mapper', 'config': {'usermodel.clientRoleMapping.clientId': client_id, 'claim.name': input_params['claim_name'], 'access.token.claim': input_params['add_to_access_token'], 'id.token.claim': input_params['add_to_id_token'], 'userinfo.token.claim': input_params['add_to_userinfo'], 'multivalued': 'true', 'jsonType.label': 'String'}} + response = requests.post((url + path), headers=headers, json=data) + if (response.status_code == 201): + print(f'create client scope mapper {client_id} success') + elif (response.status_code == 409): + raise Exception(response.text) + else: + raise Exception(response.text) + k8s_client = get_kubernetes_api(local=False) + try: + secret_name = input_params['keycloak_credential_secret_name'] + secret_namespace = input_params['keycloak_credential_secret_namespace'] + secret = get_secret(k8s_client, secret_name, secret_namespace) + print(f'get secret "{secret_name}" in "{secret_namespace}" namespace') + except Exception as e: + print(e) + print(f'failed to get secret "{secret_name}" in "{secret_namespace}" namespace') + sys.exit(1) + keycloak_connection = KeycloakOpenIDConnection(server_url=input_params['server_url'], client_id='admin-cli', realm_name=input_params['target_realm_name'], user_realm_name='master', username='admin', password=secret, verify=True) + keycloak_openid = KeycloakOpenID(server_url=input_params['server_url'], client_id='admin-cli', realm_name='master') + try: + keycloak_admin = KeycloakAdmin(connection=keycloak_connection) + print(f"login to {input_params['server_url']} success") + except Exception as e: + print(e) + print(f"login to {input_params['server_url']} failed") + sys.exit(1) + try: + try: + hashed_client_id = keycloak_admin.get_client_id(client_id=input_params['target_client_id']) + print(f"""hashed_client_id of client id "{input_params['target_client_id']}" is "{hashed_client_id}".""") + except Exception as inner_e: + print(inner_e) + raise Exception(f"""get client id "{input_params['target_client_id']} failed""") + try: + create_client_scope_mapper(url=input_params['server_url'], realm_name=input_params['target_realm_name'], client_id=input_params['target_client_id'], hashed_client_id=hashed_client_id, token=keycloak_admin.connection.token, mapper_name=input_params['mapper_name']) + print(f"""create client scope mapper "{input_params['mapper_name']}" in client "{input_params['target_client_id']} success""") + except Exception as inner_e: + print(inner_e) + raise Exception('create client role on keycloak failed') + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + except Exception as e: + print(e) + print(f"""create client scope mapper "{input_params['mapper_name']}" in client "{input_params['target_client_id']} failed""") + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + sys.exit(1) + - name: delete-client-role + inputs: + parameters: + - name: server_url + value: http://tks-console-dev.taco-cat.xyz/auth/ + - name: target_realm_name + value: test3 + - name: target_client_id + value: k8s-oidc6 + - name: keycloak_credential_secret_name + value: keycloak + - name: keycloak_credential_secret_namespace + value: keycloak + - name: client_role_name + value: admin + script: + command: + - python3 + image: harbor-cicd.taco-cat.xyz/dev/python-keycloak-cli:v0.1.0 + source: |2 + + from keycloak import KeycloakOpenIDConnection, KeycloakAdmin, KeycloakOpenID + from kubernetes import client, config + import sys + import base64 + input_params = {'server_url': '{{inputs.parameters.server_url}}', 'target_realm_name': '{{inputs.parameters.target_realm_name}}', 'target_client_id': '{{inputs.parameters.target_client_id}}', 'keycloak_credential_secret_name': '{{inputs.parameters.keycloak_credential_secret_name}}', 'keycloak_credential_secret_namespace': '{{inputs.parameters.keycloak_credential_secret_namespace}}', 'client_role_name': '{{inputs.parameters.client_role_name}}'} + + def get_kubernetes_api(local=False): + if local: + import os + kubeconfig_path = os.path.expandvars('$HOME/.kube/config') + config.load_kube_config(config_file=kubeconfig_path) + else: + config.load_incluster_config() + return client.CoreV1Api() + + def get_secret(k8s_client, secret_name, secret_namespace): + secret_obj = k8s_client.read_namespaced_secret(name=secret_name, namespace=secret_namespace) + encoded_data = secret_obj.data.get('admin-password') + decoded_data = base64.b64decode(encoded_data).decode('utf-8') + return decoded_data + k8s_client = get_kubernetes_api(local=False) + try: + secret_name = input_params['keycloak_credential_secret_name'] + secret_namespace = input_params['keycloak_credential_secret_namespace'] + secret = get_secret(k8s_client, secret_name, secret_namespace) + print(f'get secret "{secret_name}" in "{secret_namespace}" namespace') + except Exception as e: + print(e) + print(f'failed to get secret "{secret_name}" in "{secret_namespace}" namespace') + sys.exit(1) + keycloak_connection = KeycloakOpenIDConnection(server_url=input_params['server_url'], client_id='admin-cli', realm_name=input_params['target_realm_name'], user_realm_name='master', username='admin', password=secret, verify=True) + keycloak_openid = KeycloakOpenID(server_url=input_params['server_url'], client_id='admin-cli', realm_name='master') + try: + keycloak_admin = KeycloakAdmin(connection=keycloak_connection) + print(f"login to {input_params['server_url']} success") + except Exception as e: + print(e) + print(f"login to {input_params['server_url']} failed") + sys.exit(1) + try: + try: + hashed_client_id = keycloak_admin.get_client_id(client_id=input_params['target_client_id']) + print(f"""hashed_client_id of client id "{input_params['target_client_id']}" is "{hashed_client_id}".""") + except Exception as inner_e: + print(inner_e) + raise Exception(f"""get client id "{input_params['target_client_id']} failed""") + try: + role_name = input_params['client_role_name'] + keycloak_admin.delete_client_role(client_role_id=hashed_client_id, role_name=role_name) + print(f"""create client role {role_name} in client "{input_params['target_client_id']}" success""") + except Exception as inner_e: + print(inner_e) + raise Exception('create client role on keycloak failed') + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + except Exception as e: + print(e) + print(f"""create client role {role_name} in client "{input_params['target_client_id']}" failed""") + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + sys.exit(1) + - name: create-client + inputs: + parameters: + - name: server_url + value: http://tks-console-dev.taco-cat.xyz/auth/ + - name: target_realm_name + value: test3 + - name: target_client_id + value: test-client2 + - name: keycloak_credential_secret_name + value: keycloak + - name: keycloak_credential_secret_namespace + value: keycloak + script: + command: + - python3 + image: harbor-cicd.taco-cat.xyz/dev/python-keycloak-cli:v0.1.0 + source: |2 + + from keycloak import KeycloakOpenID + import requests + from kubernetes import client, config + import sys + import base64 + input_params = {'server_url': '{{inputs.parameters.server_url}}', 'target_realm_name': '{{inputs.parameters.target_realm_name}}', 'target_client_id': '{{inputs.parameters.target_client_id}}', 'keycloak_credential_secret_name': '{{inputs.parameters.keycloak_credential_secret_name}}', 'keycloak_credential_secret_namespace': '{{inputs.parameters.keycloak_credential_secret_namespace}}'} + + def get_kubernetes_api(local=False): + if local: + import os + kubeconfig_path = os.path.expandvars('$HOME/.kube/config') + config.load_kube_config(config_file=kubeconfig_path) + else: + config.load_incluster_config() + return client.CoreV1Api() + + def get_secret(k8s_client, secret_name, secret_namespace): + secret_obj = k8s_client.read_namespaced_secret(name=secret_name, namespace=secret_namespace) + encoded_data = secret_obj.data.get('admin-password') + decoded_data = base64.b64decode(encoded_data).decode('utf-8') + return decoded_data + + def create_client(url, realm_name, client_id, token): + if (url[(- 1)] == '/'): + url = url[:(- 1)] + path = f'/admin/realms/{realm_name}/clients' + headers = {'Content-Type': 'application/json', 'Authorization': ('Bearer ' + token['access_token'])} + data = {'clientId': client_id, 'enabled': True, 'publicClient': True, 'protocol': 'openid-connect', 'standardFlowEnabled': True, 'implicitFlowEnabled': False, 'directAccessGrantsEnabled': True, 'serviceAccountsEnabled': False, 'authorizationServicesEnabled': False, 'fullScopeAllowed': True} + response = requests.post((url + path), headers=headers, json=data) + if (response.status_code == 201): + print(f'create client {client_id} success') + elif (response.status_code == 409): + raise Exception(response.text) + else: + raise Exception(response.text) + k8s_client = get_kubernetes_api(local=False) + try: + secret_name = input_params['keycloak_credential_secret_name'] + secret_namespace = input_params['keycloak_credential_secret_namespace'] + secret = get_secret(k8s_client, secret_name, secret_namespace) + print(f'get secret "{secret_name}" in "{secret_namespace}" namespace') + except Exception as e: + print(e) + print(f'failed to get secret "{secret_name}" in "{secret_namespace}" namespace') + sys.exit(1) + keycloak_openid = KeycloakOpenID(server_url=input_params['server_url'], client_id='admin-cli', realm_name='master') + token = keycloak_openid.token(grant_type='password', username='admin', password=secret) + try: + create_client(input_params['server_url'], input_params['target_realm_name'], input_params['target_client_id'], token) + print(f"""create client "{input_params['target_client_id']}" success""") + keycloak_openid.logout(token['refresh_token']) + except Exception as e: + print(e) + print('create client failed') + keycloak_openid.logout(token['refresh_token']) + sys.exit(1) + - name: update-client-secret + inputs: + parameters: + - name: server_url + value: http://tks-console-dev.taco-cat.xyz/auth/ + - name: target_realm_name + value: test3 + - name: target_client_id + value: test-client2 + - name: keycloak_credential_secret_name + value: keycloak + - name: keycloak_credential_secret_namespace + value: keycloak + - name: client_role_name + value: admin + - name: client_secret_enabled + value: 'false' + - name: client_secret_value + value: test + script: + command: + - python3 + image: harbor-cicd.taco-cat.xyz/dev/python-keycloak-cli:v0.1.0 + source: |2 + + from keycloak import KeycloakOpenID, KeycloakAdmin, KeycloakOpenIDConnection + import requests + from kubernetes import client, config + import sys + import base64 + import json + input_params = {'server_url': '{{inputs.parameters.server_url}}', 'target_realm_name': '{{inputs.parameters.target_realm_name}}', 'target_client_id': '{{inputs.parameters.target_client_id}}', 'keycloak_credential_secret_name': '{{inputs.parameters.keycloak_credential_secret_name}}', 'keycloak_credential_secret_namespace': '{{inputs.parameters.keycloak_credential_secret_namespace}}', 'client_role_name': '{{inputs.parameters.client_role_name}}', 'client_secret_enabled': '{{inputs.parameters.client_secret_enabled}}', 'client_secret_value': '{{inputs.parameters.client_secret_value}}'} + + def get_kubernetes_api(local=False): + if local: + import os + kubeconfig_path = os.path.expandvars('$HOME/.kube/config') + config.load_kube_config(config_file=kubeconfig_path) + else: + config.load_incluster_config() + return client.CoreV1Api() + + def get_secret(k8s_client, secret_name, secret_namespace): + secret_obj = k8s_client.read_namespaced_secret(name=secret_name, namespace=secret_namespace) + encoded_data = secret_obj.data.get('admin-password') + decoded_data = base64.b64decode(encoded_data).decode('utf-8') + return decoded_data + + def create_client(url, realm_name, client_id, token): + if (url[(- 1)] == '/'): + url = url[:(- 1)] + path = f'/admin/realms/{realm_name}/clients' + headers = {'Content-Type': 'application/json', 'Authorization': ('Bearer ' + token['access_token'])} + data = {'clientId': client_id, 'enabled': True, 'publicClient': True, 'protocol': 'openid-connect', 'standardFlowEnabled': True, 'implicitFlowEnabled': False, 'directAccessGrantsEnabled': True, 'serviceAccountsEnabled': False, 'authorizationServicesEnabled': False, 'fullScopeAllowed': True} + response = requests.post((url + path), headers=headers, json=data) + if (response.status_code == 201): + print(f'create client {client_id} success') + elif (response.status_code == 409): + raise Exception(response.text) + else: + raise Exception(response.text) + k8s_client = get_kubernetes_api(local=False) + try: + secret_name = input_params['keycloak_credential_secret_name'] + secret_namespace = input_params['keycloak_credential_secret_namespace'] + secret = get_secret(k8s_client, secret_name, secret_namespace) + print(f'get secret "{secret_name}" in "{secret_namespace}" namespace') + except Exception as e: + print(e) + print(f'failed to get secret "{secret_name}" in "{secret_namespace}" namespace') + sys.exit(1) + keycloak_connection = KeycloakOpenIDConnection(server_url=input_params['server_url'], client_id='admin-cli', realm_name=input_params['target_realm_name'], user_realm_name='master', username='admin', password=secret, verify=True) + keycloak_openid = KeycloakOpenID(server_url=input_params['server_url'], client_id='admin-cli', realm_name='master') + try: + keycloak_admin = KeycloakAdmin(connection=keycloak_connection) + print(f"login to {input_params['server_url']} success") + except Exception as e: + print(e) + print(f"login to {input_params['server_url']} failed") + sys.exit(1) + try: + try: + hashed_client_id = keycloak_admin.get_client_id(input_params['target_client_id']) + print(f"""hashed_client_id of client id "{input_params['target_client_id']}" is "{hashed_client_id}"""") + client = keycloak_admin.get_client(client_id=hashed_client_id) + except Exception as inner_e: + print(inner_e) + raise Exception(f"""get client id "{input_params['target_client_id']} failed""") + try: + if (input_params['client_secret_enabled'] == 'true'): + if (input_params['client_secret_value'] == 'null'): + print('null') + client['publicClient'] = False + keycloak_admin.update_client(client_id=hashed_client_id, payload=client) + print(json.dumps(keycloak_admin.get_client(client_id=hashed_client_id), indent=4, sort_keys=True)) + else: + client['publicClient'] = False + keycloak_admin.update_client(client_id=hashed_client_id, payload=client) + client_secret = input_params['client_secret_value'] + client['secret'] = client_secret + keycloak_admin.update_client(client_id=hashed_client_id, payload=client) + print(client_secret) + else: + client['publicClient'] = True + keycloak_admin.update_client(client_id=hashed_client_id, payload=client) + print(f"""delete client secret of client id "{input_params['target_client_id']}" on keycloak""") + except Exception as inner_e: + print(inner_e) + raise Exception(f"""update client id "{input_params['target_client_id']} failed""") + print(f"""update client "{input_params['target_client_id']}" success""") + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + except Exception as e: + print(e) + print('create client failed') + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + sys.exit(1) diff --git a/keycloak-client/lib/keycloak-realms.yaml b/keycloak-client/lib/keycloak-realms.yaml new file mode 100644 index 00000000..2010111a --- /dev/null +++ b/keycloak-client/lib/keycloak-realms.yaml @@ -0,0 +1,137 @@ +apiVersion: argoproj.io/v1alpha1 +kind: WorkflowTemplate +metadata: + name: keycloak-realm + namespace: argo +spec: + templates: + - name: delete-realm + inputs: + parameters: + - name: server_url + value: http://tks-console-dev.taco-cat.xyz/auth/ + - name: target_realm_name + value: test4 + - name: keycloak_credential_secret_name + value: keycloak + - name: keycloak_credential_secret_namespace + value: keycloak + script: + command: + - python3 + image: harbor-cicd.taco-cat.xyz/dev/python-keycloak-cli:v0.1.0 + source: |2 + + from keycloak import KeycloakOpenIDConnection, KeycloakAdmin, KeycloakOpenID + from kubernetes import client, config + import sys + import base64 + input_params = {'server_url': '{{inputs.parameters.server_url}}', 'target_realm_name': '{{inputs.parameters.target_realm_name}}', 'keycloak_credential_secret_name': '{{inputs.parameters.keycloak_credential_secret_name}}', 'keycloak_credential_secret_namespace': '{{inputs.parameters.keycloak_credential_secret_namespace}}'} + + def get_kubernetes_api(local=False): + if local: + import os + kubeconfig_path = os.path.expandvars('$HOME/.kube/config') + config.load_kube_config(config_file=kubeconfig_path) + else: + config.load_incluster_config() + return client.CoreV1Api() + + def get_secret(k8s_client, secret_name, secret_namespace): + secret_obj = k8s_client.read_namespaced_secret(name=secret_name, namespace=secret_namespace) + encoded_data = secret_obj.data.get('admin-password') + decoded_data = base64.b64decode(encoded_data).decode('utf-8') + return decoded_data + k8s_client = get_kubernetes_api() + try: + secret_name = input_params['keycloak_credential_secret_name'] + secret_namespace = input_params['keycloak_credential_secret_namespace'] + secret = get_secret(k8s_client, secret_name, secret_namespace) + print(f'get secret "{secret_name}" in "{secret_namespace}" namespace') + except Exception as e: + print(e) + print(f'failed to get secret "{secret_name}" in "{secret_namespace}" namespace') + sys.exit(1) + keycloak_connection = KeycloakOpenIDConnection(server_url=input_params['server_url'], client_id='admin-cli', realm_name='master', username='admin', password=secret, verify=True) + keycloak_openid = KeycloakOpenID(server_url=input_params['server_url'], client_id='admin-cli', realm_name='master') + try: + keycloak_admin = KeycloakAdmin(connection=keycloak_connection) + print(f"login to {input_params['server_url']} success") + except Exception as e: + print(e) + print(f"login to {input_params['server_url']} failed") + sys.exit(1) + try: + keycloak_admin.delete_realm(realm_name=input_params['target_realm_name']) + print(f"delete realm {input_params['target_realm_name']} success") + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + except Exception as e: + print(e) + print(f"delete realm {input_params['target_realm_name']} failed") + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + sys.exit(1) + - name: create-realm + inputs: + parameters: + - name: server_url + value: http://tks-console-dev.taco-cat.xyz/auth/ + - name: target_realm_name + value: test4 + - name: keycloak_credential_secret_name + value: keycloak + - name: keycloak_credential_secret_namespace + value: keycloak + script: + command: + - python3 + image: harbor-cicd.taco-cat.xyz/dev/python-keycloak-cli:v0.1.0 + source: |2 + + from keycloak import KeycloakOpenIDConnection, KeycloakAdmin, KeycloakOpenID + from kubernetes import client, config + import sys + import base64 + input_params = {'server_url': '{{inputs.parameters.server_url}}', 'target_realm_name': '{{inputs.parameters.target_realm_name}}', 'keycloak_credential_secret_name': '{{inputs.parameters.keycloak_credential_secret_name}}', 'keycloak_credential_secret_namespace': '{{inputs.parameters.keycloak_credential_secret_namespace}}'} + + def get_kubernetes_api(local=False): + if local: + import os + kubeconfig_path = os.path.expandvars('$HOME/.kube/config') + config.load_kube_config(config_file=kubeconfig_path) + else: + config.load_incluster_config() + return client.CoreV1Api() + + def get_secret(k8s_client, secret_name, secret_namespace): + secret_obj = k8s_client.read_namespaced_secret(name=secret_name, namespace=secret_namespace) + encoded_data = secret_obj.data.get('admin-password') + decoded_data = base64.b64decode(encoded_data).decode('utf-8') + return decoded_data + k8s_client = get_kubernetes_api() + try: + secret_name = input_params['keycloak_credential_secret_name'] + secret_namespace = input_params['keycloak_credential_secret_namespace'] + secret = get_secret(k8s_client, secret_name, secret_namespace) + print(f'get secret "{secret_name}" in "{secret_namespace}" namespace') + except Exception as e: + print(e) + print(f'failed to get secret "{secret_name}" in "{secret_namespace}" namespace') + sys.exit(1) + keycloak_connection = KeycloakOpenIDConnection(server_url=input_params['server_url'], client_id='admin-cli', realm_name='master', username='admin', password=secret, verify=True) + keycloak_openid = KeycloakOpenID(server_url=input_params['server_url'], client_id='admin-cli', realm_name='master') + try: + keycloak_admin = KeycloakAdmin(connection=keycloak_connection) + print(f"login to {input_params['server_url']} success") + except Exception as e: + print(e) + print(f"login to {input_params['server_url']} failed") + sys.exit(1) + try: + keycloak_admin.create_realm({'realm': input_params['target_realm_name'], 'enabled': True, 'displayName': input_params['target_realm_name']}) + print(f"create realm {input_params['target_realm_name']} success") + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + except Exception as e: + print(e) + print('create realm failed') + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + sys.exit(1) diff --git a/keycloak-client/lib/keycloak-users.yaml b/keycloak-client/lib/keycloak-users.yaml new file mode 100644 index 00000000..20d888eb --- /dev/null +++ b/keycloak-client/lib/keycloak-users.yaml @@ -0,0 +1,353 @@ +apiVersion: argoproj.io/v1alpha1 +kind: WorkflowTemplate +metadata: + name: keycloak-user + namespace: argo +spec: + templates: + - name: unassign-client-role-to-user + inputs: + parameters: + - name: server_url + value: http://tks-console-dev.taco-cat.xyz/auth/ + - name: target_realm_name + value: test3 + - name: target_client_id + value: k8s-oidc6 + - name: keycloak_credential_secret_name + value: keycloak + - name: keycloak_credential_secret_namespace + value: keycloak + - name: client_role_name + value: admin + - name: user_name + value: user1 + script: + command: + - python3 + image: harbor-cicd.taco-cat.xyz/dev/python-keycloak-cli:v0.1.0 + source: |2 + + from keycloak import KeycloakOpenIDConnection, KeycloakAdmin, KeycloakOpenID + import requests + from kubernetes import client, config + import sys + import base64 + import json + input_params = {'server_url': '{{inputs.parameters.server_url}}', 'target_realm_name': '{{inputs.parameters.target_realm_name}}', 'target_client_id': '{{inputs.parameters.target_client_id}}', 'keycloak_credential_secret_name': '{{inputs.parameters.keycloak_credential_secret_name}}', 'keycloak_credential_secret_namespace': '{{inputs.parameters.keycloak_credential_secret_namespace}}', 'client_role_name': '{{inputs.parameters.client_role_name}}', 'user_name': '{{inputs.parameters.user_name}}'} + + def get_kubernetes_api(local=False): + if local: + import os + kubeconfig_path = os.path.expandvars('$HOME/.kube/config') + config.load_kube_config(config_file=kubeconfig_path) + else: + config.load_incluster_config() + return client.CoreV1Api() + + def get_secret(k8s_client, secret_name, secret_namespace): + secret_obj = k8s_client.read_namespaced_secret(name=secret_name, namespace=secret_namespace) + encoded_data = secret_obj.data.get('admin-password') + decoded_data = base64.b64decode(encoded_data).decode('utf-8') + return decoded_data + k8s_client = get_kubernetes_api(local=False) + try: + secret_name = input_params['keycloak_credential_secret_name'] + secret_namespace = input_params['keycloak_credential_secret_namespace'] + secret = get_secret(k8s_client, secret_name, secret_namespace) + print(f'get secret "{secret_name}" in "{secret_namespace}" namespace') + except Exception as e: + print(e) + print(f'failed to get secret "{secret_name}" in "{secret_namespace}" namespace') + sys.exit(1) + keycloak_connection = KeycloakOpenIDConnection(server_url=input_params['server_url'], client_id='admin-cli', realm_name=input_params['target_realm_name'], user_realm_name='master', username='admin', password=secret, verify=True) + keycloak_openid = KeycloakOpenID(server_url=input_params['server_url'], client_id='admin-cli', realm_name='master') + try: + keycloak_admin = KeycloakAdmin(connection=keycloak_connection) + print(f"login to {input_params['server_url']} success") + except Exception as e: + print(e) + print(f"login to {input_params['server_url']} failed") + sys.exit(1) + try: + try: + hashed_client_id = keycloak_admin.get_client_id(client_id=input_params['target_client_id']) + print(f"""hashed_client_id of client id "{input_params['target_client_id']}" is "{hashed_client_id}".""") + except Exception as inner_e: + print(inner_e) + raise Exception(f"""get client id "{input_params['target_client_id']} failed""") + try: + idOfClientRole = keycloak_admin.get_client_role_id(client_id=hashed_client_id, role_name=input_params['client_role_name']) + print(f"""client role id in client id "{input_params['target_client_id']}" is "{idOfClientRole}".""") + except Exception as inner_e: + print(inner_e) + raise Exception(f"""get client role "{input_params['client_role_name']}" failed""") + try: + idOfUser = keycloak_admin.get_user_id(username=input_params['user_name']) + print(f"""id of user "{input_params['user_name']}" is "{idOfUser}".""") + except Exception as inner_e: + print(inner_e) + raise Exception(f"""get user "{input_params['user_name']}" failed""") + try: + keycloak_admin.delete_client_roles_of_user(client_id=hashed_client_id, user_id=idOfUser, roles=[{'id': idOfClientRole, 'name': input_params['client_role_name']}]) + print(f"""un-assign client role "{input_params['client_role_name']}" to user "{input_params['user_name']}" success""") + except Exception as inner_e: + print(inner_e) + raise Exception('un-assign client role to user on keycloak failed') + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + except Exception as e: + print(e) + print(f"""un-assign client role "{input_params['client_role_name']}" to user "{input_params['user_name']}" failed""") + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + sys.exit(1) + - name: create-user + inputs: + parameters: + - name: server_url + value: http://tks-console-dev.taco-cat.xyz/auth/ + - name: target_realm_name + value: test3 + - name: keycloak_credential_secret_name + value: keycloak + - name: keycloak_credential_secret_namespace + value: keycloak + - name: user_name + value: user1 + - name: user_password + value: user1 + - name: user_email + value: test@gmail.com + - name: user_first_name + value: '' + - name: user_last_name + value: '' + script: + command: + - python3 + image: harbor-cicd.taco-cat.xyz/dev/python-keycloak-cli:v0.1.0 + source: |2 + + from keycloak import KeycloakOpenIDConnection, KeycloakAdmin, KeycloakOpenID + from kubernetes import client, config + import sys + import base64 + input_params = {'server_url': '{{inputs.parameters.server_url}}', 'target_realm_name': '{{inputs.parameters.target_realm_name}}', 'keycloak_credential_secret_name': '{{inputs.parameters.keycloak_credential_secret_name}}', 'keycloak_credential_secret_namespace': '{{inputs.parameters.keycloak_credential_secret_namespace}}', 'user_name': '{{inputs.parameters.user_name}}', 'user_password': '{{inputs.parameters.user_password}}', 'user_email': '{{inputs.parameters.user_email}}', 'user_first_name': '{{inputs.parameters.user_first_name}}', 'user_last_name': '{{inputs.parameters.user_last_name}}'} + + def get_kubernetes_api(local=False): + if local: + import os + kubeconfig_path = os.path.expandvars('$HOME/.kube/config') + config.load_kube_config(config_file=kubeconfig_path) + else: + config.load_incluster_config() + return client.CoreV1Api() + + def get_secret(k8s_client, secret_name, secret_namespace): + secret_obj = k8s_client.read_namespaced_secret(name=secret_name, namespace=secret_namespace) + encoded_data = secret_obj.data.get('admin-password') + decoded_data = base64.b64decode(encoded_data).decode('utf-8') + return decoded_data + k8s_client = get_kubernetes_api(local=False) + try: + secret_name = input_params['keycloak_credential_secret_name'] + secret_namespace = input_params['keycloak_credential_secret_namespace'] + secret = get_secret(k8s_client, secret_name, secret_namespace) + print(f'get secret "{secret_name}" in "{secret_namespace}" namespace') + except Exception as e: + print(e) + print(f'failed to get secret "{secret_name}" in "{secret_namespace}" namespace') + sys.exit(1) + keycloak_connection = KeycloakOpenIDConnection(server_url=input_params['server_url'], client_id='admin-cli', realm_name=input_params['target_realm_name'], user_realm_name='master', username='admin', password=secret, verify=True) + keycloak_openid = KeycloakOpenID(server_url=input_params['server_url'], client_id='admin-cli', realm_name='master') + try: + keycloak_admin = KeycloakAdmin(connection=keycloak_connection) + print(f"login to {input_params['server_url']} success") + except Exception as e: + print(e) + print(f"login to {input_params['server_url']} failed") + sys.exit(1) + try: + user_name = input_params['user_name'] + user_password = input_params['user_password'] + user_email = input_params['user_email'] + user_first_name = input_params['user_first_name'] + user_last_name = input_params['user_last_name'] + keycloak_admin.create_user({'username': user_name, 'email': user_email, 'enabled': True, 'firstName': user_first_name, 'lastName': user_last_name, 'credentials': [{'type': 'password', 'value': user_password, 'temporary': False}]}) + print(f'create user {user_name} success') + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + except Exception as e: + print(e) + print(f'create user {user_name} failed') + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + sys.exit(1) + - name: assign-client-role-to-user + inputs: + parameters: + - name: server_url + value: http://tks-console-dev.taco-cat.xyz/auth/ + - name: target_realm_name + value: test3 + - name: target_client_id + value: k8s-oidc6 + - name: keycloak_credential_secret_name + value: keycloak + - name: keycloak_credential_secret_namespace + value: keycloak + - name: client_role_name + value: admin + - name: user_name + value: user1 + script: + command: + - python3 + image: harbor-cicd.taco-cat.xyz/dev/python-keycloak-cli:v0.1.0 + source: |2 + + from keycloak import KeycloakOpenIDConnection, KeycloakAdmin, KeycloakOpenID + from kubernetes import client, config + import sys + import base64 + input_params = {'server_url': '{{inputs.parameters.server_url}}', 'target_realm_name': '{{inputs.parameters.target_realm_name}}', 'target_client_id': '{{inputs.parameters.target_client_id}}', 'keycloak_credential_secret_name': '{{inputs.parameters.keycloak_credential_secret_name}}', 'keycloak_credential_secret_namespace': '{{inputs.parameters.keycloak_credential_secret_namespace}}', 'client_role_name': '{{inputs.parameters.client_role_name}}', 'user_name': '{{inputs.parameters.user_name}}'} + + def get_kubernetes_api(local=False): + if local: + import os + kubeconfig_path = os.path.expandvars('$HOME/.kube/config') + config.load_kube_config(config_file=kubeconfig_path) + else: + config.load_incluster_config() + return client.CoreV1Api() + + def get_secret(k8s_client, secret_name, secret_namespace): + secret_obj = k8s_client.read_namespaced_secret(name=secret_name, namespace=secret_namespace) + encoded_data = secret_obj.data.get('admin-password') + decoded_data = base64.b64decode(encoded_data).decode('utf-8') + return decoded_data + k8s_client = get_kubernetes_api(local=False) + try: + secret_name = input_params['keycloak_credential_secret_name'] + secret_namespace = input_params['keycloak_credential_secret_namespace'] + secret = get_secret(k8s_client, secret_name, secret_namespace) + print(f'get secret "{secret_name}" in "{secret_namespace}" namespace') + except Exception as e: + print(e) + print(f'failed to get secret "{secret_name}" in "{secret_namespace}" namespace') + sys.exit(1) + keycloak_connection = KeycloakOpenIDConnection(server_url=input_params['server_url'], client_id='admin-cli', realm_name=input_params['target_realm_name'], user_realm_name='master', username='admin', password=secret, verify=True) + keycloak_openid = KeycloakOpenID(server_url=input_params['server_url'], client_id='admin-cli', realm_name='master') + try: + keycloak_admin = KeycloakAdmin(connection=keycloak_connection) + print(f"login to {input_params['server_url']} success") + except Exception as e: + print(e) + print(f"login to {input_params['server_url']} failed") + sys.exit(1) + try: + try: + hashed_client_id = keycloak_admin.get_client_id(client_id=input_params['target_client_id']) + print(f"""hashed_client_id of client id "{input_params['target_client_id']}" is "{hashed_client_id}".""") + except Exception as inner_e: + print(inner_e) + raise Exception(f"""get client id "{input_params['target_client_id']} failed""") + try: + idOfClientRole = keycloak_admin.get_client_role_id(client_id=hashed_client_id, role_name=input_params['client_role_name']) + print(f"""client role id in client id "{input_params['target_client_id']}" is "{idOfClientRole}".""") + except Exception as inner_e: + print(inner_e) + raise Exception(f"""get client role "{input_params['client_role_name']}" failed""") + try: + idOfUser = keycloak_admin.get_user_id(username=input_params['user_name']) + print(f"""id of user "{input_params['user_name']}" is "{idOfUser}".""") + except Exception as inner_e: + print(inner_e) + raise Exception(f"""get user "{input_params['user_name']}" failed""") + try: + keycloak_admin.assign_client_role(client_id=hashed_client_id, user_id=idOfUser, roles=[{'id': idOfClientRole, 'name': input_params['client_role_name']}]) + print(f"""assign client role "{input_params['client_role_name']}" to user "{input_params['user_name']}" success""") + except Exception as inner_e: + print(inner_e) + raise Exception(f'assign client role to user on keycloak failed') + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + except Exception as e: + print(e) + print(f"""assign client role "{input_params['client_role_name']}" to user "{input_params['user_name']}" failed""") + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + sys.exit(1) + - name: delete-user + inputs: + parameters: + - name: server_url + value: http://tks-console-dev.taco-cat.xyz/auth/ + - name: target_realm_name + value: test3 + - name: keycloak_credential_secret_name + value: keycloak + - name: keycloak_credential_secret_namespace + value: keycloak + - name: user_name + value: user1 + script: + command: + - python3 + image: harbor-cicd.taco-cat.xyz/dev/python-keycloak-cli:v0.1.0 + source: |2 + + from keycloak import KeycloakOpenIDConnection, KeycloakAdmin, KeycloakOpenID + from kubernetes import client, config + import sys + import base64 + input_params = {'server_url': '{{inputs.parameters.server_url}}', 'target_realm_name': '{{inputs.parameters.target_realm_name}}', 'keycloak_credential_secret_name': '{{inputs.parameters.keycloak_credential_secret_name}}', 'keycloak_credential_secret_namespace': '{{inputs.parameters.keycloak_credential_secret_namespace}}', 'user_name': '{{inputs.parameters.user_name}}'} + + def get_kubernetes_api(local=False): + if local: + import os + kubeconfig_path = os.path.expandvars('$HOME/.kube/config') + config.load_kube_config(config_file=kubeconfig_path) + else: + config.load_incluster_config() + return client.CoreV1Api() + + def get_secret(k8s_client, secret_name, secret_namespace): + secret_obj = k8s_client.read_namespaced_secret(name=secret_name, namespace=secret_namespace) + encoded_data = secret_obj.data.get('admin-password') + decoded_data = base64.b64decode(encoded_data).decode('utf-8') + return decoded_data + k8s_client = get_kubernetes_api(local=False) + try: + secret_name = input_params['keycloak_credential_secret_name'] + secret_namespace = input_params['keycloak_credential_secret_namespace'] + secret = get_secret(k8s_client, secret_name, secret_namespace) + print(f'get secret "{secret_name}" in "{secret_namespace}" namespace') + except Exception as e: + print(e) + print(f'failed to get secret "{secret_name}" in "{secret_namespace}" namespace') + sys.exit(1) + keycloak_connection = KeycloakOpenIDConnection(server_url=input_params['server_url'], client_id='admin-cli', realm_name=input_params['target_realm_name'], user_realm_name='master', username='admin', password=secret, verify=True) + keycloak_openid = KeycloakOpenID(server_url=input_params['server_url'], client_id='admin-cli', realm_name='master') + try: + keycloak_admin = KeycloakAdmin(connection=keycloak_connection) + print(f"login to {input_params['server_url']} success") + except Exception as e: + print(e) + print(f"login to {input_params['server_url']} failed") + sys.exit(1) + try: + user_name = input_params['user_name'] + try: + user_id = keycloak_admin.get_user_id(user_name) + print(f'user id of {user_name}: {user_id}') + except Exception as e: + print(e) + raise Exception(f'failed to get user id of {user_name}') + try: + keycloak_admin.delete_user(user_id) + print(f'delete user {user_name} success') + except Exception as e: + print(e) + raise Exception(f'failed to remove user {user_name} on keycloak') + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + except Exception as e: + print(e) + print(f'delete user {user_name} failed') + keycloak_openid.logout(keycloak_admin.connection.token['refresh_token']) + sys.exit(1) diff --git a/keycloak-client/set-admin-cluster.yaml b/keycloak-client/set-admin-cluster.yaml new file mode 100644 index 00000000..b6b85ca7 --- /dev/null +++ b/keycloak-client/set-admin-cluster.yaml @@ -0,0 +1,106 @@ +apiVersion: argoproj.io/v1alpha1 +kind: WorkflowTemplate +metadata: + name: set-admin-cluster + namespace: argo +spec: + entrypoint: main + templates: + - name: main + inputs: + parameters: + - name: server_url + value: "{{inputs.parameters.server_url}}" + - name: target_realm_name + value: "{{inputs.parameters.target_realm_name}}" + - name: target_client_id + value: "{{inputs.parameters.target_client_id}}" + - name: keycloak_credential_secret_name + value: "{{inputs.parameters.keycloak_credential_secret_name}}" + - name: keycloak_credential_secret_namespace + value: "{{inputs.parameters.keycloak_credential_secret_namespace}}" + - name: client_role_name + value: "{{inputs.parameters.client_role_name}}" + - name: user_name + value: "{{inputs.parameters.user_name}}" + steps: + - - name: create-client + templateRef: + name: keycloak-client + template: create-client + arguments: + parameters: + - name: server_url + value: "{{inputs.parameters.server_url}}" + - name: target_realm_name + value: "{{inputs.parameters.target_realm_name}}" + - name: target_client_id + value: "{{inputs.parameters.target_client_id}}" + - name: keycloak_credential_secret_name + value: "{{inputs.parameters.keycloak_credential_secret_name}}" + - name: keycloak_credential_secret_namespace + value: "{{inputs.parameters.keycloak_credential_secret_namespace}}" + - - name: create-client-role + templateRef: + name: keycloak-client + template: create-client-role + arguments: + parameters: + - name: server_url + value: "{{inputs.parameters.server_url}}" + - name: target_realm_name + value: "{{inputs.parameters.target_realm_name}}" + - name: target_client_id + value: "{{inputs.parameters.target_client_id}}" + - name: keycloak_credential_secret_name + value: "{{inputs.parameters.keycloak_credential_secret_name}}" + - name: keycloak_credential_secret_namespace + value: "{{inputs.parameters.keycloak_credential_secret_namespace}}" + - name: client_role_name + value: "{{inputs.parameters.client_role_name}}" + - - name: create-client-scope-mapper-client-role + templateRef: + name: keycloak-client + template: create-client-scope-mapper-client-role + arguments: + parameters: + - name: server_url + value: "{{inputs.parameters.server_url}}" + - name: target_realm_name + value: "{{inputs.parameters.target_realm_name}}" + - name: target_client_id + value: "{{inputs.parameters.target_client_id}}" + - name: keycloak_credential_secret_name + value: "{{inputs.parameters.keycloak_credential_secret_name}}" + - name: keycloak_credential_secret_namespace + value: "{{inputs.parameters.keycloak_credential_secret_namespace}}" + - name: mapper_name + value: k8s-role-mapper + - name: claim_name + value: groups + - name: add_to_access_token + value: false + - name: add_to_id_token + value: true + - name: add_to_userinfo + value: false + - - name: assign-client-role-to-user + templateRef: + name: keycloak-user + template: assign-client-role-to-user + arguments: + parameters: + - name: server_url + value: "{{inputs.parameters.server_url}}" + - name: target_realm_name + value: "{{inputs.parameters.target_realm_name}}" + - name: target_client_id + value: "{{inputs.parameters.target_client_id}}" + - name: keycloak_credential_secret_name + value: "{{inputs.parameters.keycloak_credential_secret_name}}" + - name: keycloak_credential_secret_namespace + value: "{{inputs.parameters.keycloak_credential_secret_namespace}}" + - name: client_role_name + value: "{{inputs.parameters.client_role_name}}" + - name: user_name + value: "{{inputs.parameters.user_name}}" diff --git a/tks-admin-cluster/tks-install-admin-tools.yaml b/tks-admin-cluster/tks-install-admin-tools.yaml index 514cb345..271e17e8 100644 --- a/tks-admin-cluster/tks-install-admin-tools.yaml +++ b/tks-admin-cluster/tks-install-admin-tools.yaml @@ -53,10 +53,25 @@ spec: ] - - name: configure-keycloak - - - - + templateRef: + name: set-admin-cluster + template: main + arguments: + parameters: + - name: server_url + value: "{{workflow.parameters.keycloak_url}}" + - name: target_realm_name + value: "master" + - name: target_client_id + value: "admin-cluster-k8s-api" + - name: keycloak_credential_secret_name + value: "keycloak" + - name: kecyloak_credential_secret_namespace + value: "keycloak" + - name: client_role_name + value: "system:masters" + - name: user_name + value: "admin" - - name: install-tks-api-and-harbor templateRef: