Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Use Rotated Service Account Auth Token #249

Merged
merged 25 commits into from
May 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .eslintrc.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"parserOptions": {
"ecmaVersion": 8,
"ecmaVersion": 2022,
"sourceType": "module"
},
"env": {
Expand All @@ -11,11 +11,11 @@
"extends": "eslint:recommended",
"rules": {
"indent": [
"error",
2,
{
"SwitchCase": 1
}
"error",
2,
{
"SwitchCase": 1
}
],
"linebreak-style": [
"error",
Expand Down
19 changes: 6 additions & 13 deletions lib/EventHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
const merge = require('deepmerge');
const touch = require('touch');
const watchman = require('./Watchman');
const objectPath = require('object-path');
Expand All @@ -32,11 +31,6 @@ module.exports = class EventHandler {
this._livenessInterval = 60000; //One minute
}
}
this._livenessEndHours = params.restartPod; // time in hours when we should stop updating the liveness file, which should tell kube to restart our pod. If 0 is defined, pod will not be told to restart. Default: 0.
if (!Number.isInteger(this._livenessEndHours)) {
const livenessEndHours = Number.parseInt(this._livenessEndHours);
this._livenessEndHours = (livenessEndHours >= 0) ? livenessEndHours : 0;
}
this._watchTimeoutSeconds = objectPath.get(params, 'requestOptions.qs.timeoutSeconds', 300); // time in seconds when the watch should be re-initialized. default: 300s
if (!Number.isInteger(this._watchTimeoutSeconds)) {
const watchTimeoutSeconds = Number.parseInt(this._watchTimeoutSeconds);
Expand All @@ -62,21 +56,20 @@ module.exports = class EventHandler {
// Gather variables and use to create watchman
let opt = {
logger: this._logger,
requestOptions: merge(this._kubeResourceMeta.kubeApiConfig, params.requestOptions || {}),
watchUri: this._kubeResourceMeta.uri({ watch: true })
requestOptions: { ...params?.requestOptions, uri: this._kubeResourceMeta.uri({ watch: true }) }
};
this._wm = new watchman(opt, (data) => this.eventHandler(data));
this._wm.watch();

if (this._livenessInterval) {
touch.sync('/tmp/liveness');
let livenessStartTime = Date.now(); // time liveness was first initialized
setInterval(() => {
const now = Date.now();
const millisecondsSinceLivenessStart = now - livenessStartTime;
const hoursSinceLivenessStart = millisecondsSinceLivenessStart / 1000 / 60 / 60;
const restartThresholdHours = this._livenessEndHours; // stop touching liveness file after the specified number of hours, which should tell kube to restart the pod
if (this._wm.watching && (restartThresholdHours == 0 || hoursSinceLivenessStart < restartThresholdHours)) { // (watching && restartThresholdHours not defined) || (watching && hoursSinceLivenessStart < restartThresholdHours)
const millisecondsSinceLastStart = now - this._wm.watchStart;
const resetThresholdMilliseconds = (this._watchTimeoutMilliseconds + 10000); // interval time and kube timing is not exact, so adding 10 seconds to allow for variance in the watch being restarted by kube
this._logger.debug(`Time since last start: ${millisecondsSinceLastStart / 1000}s | WatchTimeoutSeconds: ${resetThresholdMilliseconds / 1000}s.`);
// if watching and time since last start is less than the desired watch restart threshold, then we are still alive.
if (this._wm.watching && (millisecondsSinceLastStart < resetThresholdMilliseconds)) {
touch.sync('/tmp/liveness');
}
}, this._livenessInterval);
Expand Down
27 changes: 17 additions & 10 deletions lib/KubeApiConfig.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,35 @@
*/
const packagejson = require('../package.json');
const fs = require('fs-extra');
const objectPath = require('object-path');
const k8s = require('@kubernetes/client-node');
const kc = new k8s.KubeConfig();


var _kubeApiConfig = {};
const _kubeApiConfig = {};

module.exports = function kubeApiConfig(options = {}) {
var result = {};
const result = {};
const refreshTimestamp = _kubeApiConfig?.refreshTimestamp ?? 0;
const refreshInterval = parseInt(process.env.KUBECONFIG_REFRESH_INTERVAL ?? options.refreshInterval) || 300000; // refresh kubeconfig in refreshInterval or 5 minutes if not defined. matches the default cycle interval.

if (_kubeApiConfig.kubeConfig && options.refreshCache !== true) {
// if kubeconfig is deinfed and refresh cache not requested and we havent hit the refreshTimestamp, then return cached kubeconfig
if (_kubeApiConfig.kubeConfig && options.refreshCache !== true && Date.now() < refreshTimestamp) {
return _kubeApiConfig.kubeConfig;
}

// ===== Kubernetes-Client/Javascript =====
let kubeconfigPath = process.env.KUBECONFIG || options.kubeConfigPath;
let kubeconfigPath = options.kubeConfigPath;

if (options.localhost) {
_kubeApiConfig.kubeConfig = { baseUrl: `http://localhost:${options.port || 8001}` };
_kubeApiConfig.refreshTimestamp = Date.now() + refreshInterval;
return _kubeApiConfig.kubeConfig;
} else if (kubeconfigPath && fs.existsSync(kubeconfigPath)) {
} else if (typeof kubeconfigPath === 'string' && fs.existsSync(kubeconfigPath)) {
kc.loadFromFile(kubeconfigPath);
} else {
kc.loadFromCluster();
// First tries to load from process.env.KUBECONFIG, then from $HOME/.kube/config, then finally loadFromCluster
kc.loadFromDefault();
kubeconfigPath = (typeof process.env.KUBECONFIG === 'string' && fs.existsSync(process.env.KUBECONFIG)) ? process.env.KUBECONFIG : undefined;
}

const cluster = kc.getCurrentCluster();
Expand All @@ -63,11 +68,11 @@ module.exports = function kubeApiConfig(options = {}) {
result.key = base64Decode(user.keyData || user['client-key-data']);
}
if (user.authProvider) {
let tokenFilePath = objectPath.get(user, ['authProvider', 'config', 'tokenFile']);
let tokenFilePath = user?.authProvider?.config?.tokenFile;
if (tokenFilePath) {
result.headers = { 'Authorization': `Bearer ${fs.readFileSync(tokenFilePath, { encoding: 'utf8' })}` };
} else {
let idToken = objectPath.get(user, ['authProvider', 'config', 'id-token']);
let idToken = user?.authProvider?.config?.['id-token'];
result.headers = { 'Authorization': `Bearer ${idToken}` };
}
} else if (user.token) {
Expand All @@ -78,8 +83,10 @@ module.exports = function kubeApiConfig(options = {}) {
const userAgentName = process.env.USER_AGENT_NAME || 'razee-io/kubernetes-util';
const userAgentVersion = process.env.USER_AGENT_VERSION || packagejson.version;

objectPath.set(result, 'headers.User-Agent', `${userAgentName}/${userAgentVersion}`);
result.headers = { ...result.headers, 'User-Agent': `${userAgentName}/${userAgentVersion}` };

_kubeApiConfig.kubeConfig = result;
_kubeApiConfig.refreshTimestamp = Date.now() + refreshInterval; // refresh in refreshInterval or 5 minutes if not defined
return result;
};

Expand Down
25 changes: 13 additions & 12 deletions lib/KubeResourceMeta.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@ const objectPath = require('object-path');
const request = require('request-promise-native');
const merge = require('deepmerge');

const KubeApiConfig = require('./KubeApiConfig');

module.exports = class KubeResourceMeta {
constructor(path, rm, kubeApiConfig) {
constructor(path, rm) {
this._path = path;
this._resourceMeta = rm;
this._kubeApiConfig = kubeApiConfig;
this._extraHeaders = {};

this._logger = require('./bunyan-api').createLogger('KubeResourceMeta');
}

clone() {
return new KubeResourceMeta(this._path, this._resourceMeta, this._kubeApiConfig);
return new KubeResourceMeta(this._path, this._resourceMeta);
}

uri(options = {}) {
Expand Down Expand Up @@ -72,7 +73,7 @@ module.exports = class KubeResourceMeta {
return Array.isArray(this._resourceMeta.verbs) ? this._resourceMeta.verbs : [];
}
get kubeApiConfig() {
return this._kubeApiConfig;
return KubeApiConfig;
}
get extraHeaders() {
return this._extraHeaders;
Expand All @@ -94,33 +95,33 @@ module.exports = class KubeResourceMeta {

async request(reqOpt) {
this._logger.debug(`Request ${reqOpt.method || 'GET'} ${reqOpt.uri || reqOpt.url}`);
return request(merge.all([this._kubeApiConfig, this._extraHeaders, reqOpt]));
return request(merge.all([this.kubeApiConfig(), this._extraHeaders, reqOpt]));
}

async get(name, ns, reqOpt = {}) {
const uri = this.uri({ name: name, namespace: ns });
this._logger.debug(`Get ${uri}`);
return request.get(merge.all([this._kubeApiConfig, this._extraHeaders, reqOpt, { uri: uri, json: true }]));
return request.get(merge.all([this.kubeApiConfig(), this._extraHeaders, reqOpt, { uri: uri, json: true }]));
}

async put(file, reqOpt = {}) {
const uri = this.uri({ name: objectPath.get(file, 'metadata.name'), namespace: objectPath.get(file, 'metadata.namespace') });
this._logger.debug(`Put ${uri}`);
return request.put(merge.all([this._kubeApiConfig, this._extraHeaders, reqOpt, { uri: uri, json: file }]));
return request.put(merge.all([this.kubeApiConfig(), this._extraHeaders, reqOpt, { uri: uri, json: file }]));
}

async post(file, reqOpt = {}) {
const uri = this.uri({ namespace: objectPath.get(file, 'metadata.namespace') });
this._logger.debug(`Post ${uri}`);
return request.post(merge.all([this._kubeApiConfig, this._extraHeaders, reqOpt, { uri: uri, json: file }]));
return request.post(merge.all([this.kubeApiConfig(), this._extraHeaders, reqOpt, { uri: uri, json: file }]));
}

async patch(name, ns, jPatch, reqOpt = {}) {
const uri = this.uri({ name: name, namespace: ns, status: reqOpt.status });
this._logger.debug(`Json Patch ${uri}`);
reqOpt = merge(reqOpt, { uri: uri, json: jPatch });
objectPath.set(reqOpt, ['headers', 'content-type'], objectPath.get(reqOpt, ['headers', 'content-type']) || 'application/json-patch+json');
return request.patch(merge.all([this._kubeApiConfig, this._extraHeaders, reqOpt]));
return request.patch(merge.all([this.kubeApiConfig(), this._extraHeaders, reqOpt]));
}

async jsonPatch(name, ns, jPatch, reqOpt = {}) {
Expand All @@ -132,21 +133,21 @@ module.exports = class KubeResourceMeta {
this._logger.debug(`MergePatch ${uri}`);
reqOpt = merge(reqOpt, { uri: uri, json: mPatch });
objectPath.set(reqOpt, ['headers', 'content-type'], objectPath.get(reqOpt, ['headers', 'content-type']) || 'application/merge-patch+json');
return request.patch(merge.all([this._kubeApiConfig, this._extraHeaders, reqOpt]));
return request.patch(merge.all([this.kubeApiConfig(), this._extraHeaders, reqOpt]));
}

async strategicMergePatch(name, ns, smPatch, reqOpt = {}) {
const uri = this.uri({ name: name, namespace: ns, status: reqOpt.status });
this._logger.debug(`StrategicMergePatch ${uri}`);
reqOpt = merge(reqOpt, { uri: uri, json: smPatch });
objectPath.set(reqOpt, ['headers', 'content-type'], objectPath.get(reqOpt, ['headers', 'content-type']) || 'application/strategic-merge-patch+json');
return request.patch(merge.all([this._kubeApiConfig, this._extraHeaders, reqOpt]));
return request.patch(merge.all([this.kubeApiConfig(), this._extraHeaders, reqOpt]));
}

async delete(name, ns, reqOpt = {}) {
const uri = this.uri({ name: name, namespace: ns });
this._logger.debug(`Delete ${uri}`);
return request.delete(merge.all([this._kubeApiConfig, this._extraHeaders, reqOpt, { uri: uri, json: true }]));
return request.delete(merge.all([this.kubeApiConfig(), this._extraHeaders, reqOpt, { uri: uri, json: true }]));
}

};
31 changes: 20 additions & 11 deletions lib/Watchman.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,35 @@ const JSONStream = require('JSONStream');
const delay = require('delay');
const merge = require('deepmerge');

const KubeApiConfig = require('./KubeApiConfig');

module.exports = class Watchman {
constructor(options, objectHandler) {
if ((typeof objectHandler) !== 'function') {
throw 'Watchman objectHandler must be a function.';
}
this._objectHandler = objectHandler;
this._requestOptions = merge({
headers: {
'User-Agent': 'razee-watchman'
},
json: true, // Automatically parses the JSON string in the response
resolveWithFullResponse: true,
simple: false
}, options.requestOptions || {});
this._requestOptions.uri = options.watchUri;
const kac = KubeApiConfig();
this._requestOptions = merge.all(
[
{
headers: {
'User-Agent': 'razee-watchman'
},
baseUrl: kac.baseUrl, // needs the baseurl for validUrl check, but i dont want the other KubeApiConfig values here so that we can fetch them before each call to watch().
json: true, // Automatically parses the JSON string in the response
resolveWithFullResponse: true,
simple: false
},
options.requestOptions ?? {}
]
);

if ((options.logger) && ((typeof options.logger) !== 'object')) {
throw 'options.logger must be an object.';
}
this._logger = options.logger;
if (!validUrl.isUri(`${this._requestOptions.baseUrl}${this._requestOptions.uri}`) || !this._requestOptions.uri.includes('watch')) {
if (!validUrl.isUri(`${this._requestOptions.baseUrl}${this._requestOptions.uri}`) || !this._requestOptions?.uri?.includes('watch')) {
throw `uri '${this._requestOptions.baseUrl}${this._requestOptions.uri}' not valid watch uri.`;
}

Expand Down Expand Up @@ -82,7 +90,8 @@ module.exports = class Watchman {
this._logger.debug('Watchman: initializing watch');
this.end(this._rewatchOnTimeout);
this._logger.debug('Watchman: attempting new watch ');
this._requestStream = request(this._requestOptions)
// this._requestOptions must not contain a prior KubeApiConfig(), otherwise the old values will overrite the newly fetched ones
this._requestStream = request(merge(KubeApiConfig(), this._requestOptions))
.on('response', (response) => {
if (response.statusCode !== 200) {
if (this._logger) {
Expand Down
17 changes: 11 additions & 6 deletions lib/kubeClass.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,26 @@ const clone = require('clone');
const objectPath = require('object-path');

const KubeResourceMeta = require('./KubeResourceMeta');
const KubeApiConfig = require('./KubeApiConfig');

const kubeResourceMetaCache = {};

module.exports = class KubeClass {

constructor(kubeApiConfig, logger) {
this._kubeApiConfig = kubeApiConfig;
constructor(logger) {
this._log = logger || require('./bunyan-api').createLogger('kubeClass');
this._baseOptions = merge({
}

get _baseOptions() {
// need to re-compute base options every call in case KubeApiConfig needs to get a fresh token
return merge({
headers: {
Accept: 'application/json'
},
json: true,
simple: false,
resolveWithFullResponse: true
}, this._kubeApiConfig);
}, KubeApiConfig());
}

async getCoreApis() {
Expand All @@ -43,7 +48,7 @@ module.exports = class KubeClass {
}
let apiResourceList = coreApiList.body;
let resourceMetaList = apiResourceList.resources;
let result = resourceMetaList.map(r => new KubeResourceMeta(`/api/${apiResourceList.groupVersion}`, r, this._kubeApiConfig));
let result = resourceMetaList.map(r => new KubeResourceMeta(`/api/${apiResourceList.groupVersion}`, r));
return result;
}

Expand Down Expand Up @@ -109,7 +114,7 @@ module.exports = class KubeClass {
if (response.statusCode == 200 && objectPath.get(response, 'body.kind') === 'APIResourceList') {
let resources = objectPath.get(response, 'body.resources');
resources.map((r) => {
let krm = new KubeResourceMeta(`/apis/${groupVersion}`, r, this._kubeApiConfig);
let krm = new KubeResourceMeta(`/apis/${groupVersion}`, r);
krmHandler(krm);
});
return { statusCode: response.statusCode, groupVersion: groupVersion };
Expand Down
Loading