-
Notifications
You must be signed in to change notification settings - Fork 45
/
Copy pathK8sServiceAccount.py
95 lines (74 loc) · 3.09 KB
/
K8sServiceAccount.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.md', which is part of this source code package.
#
from kubernetes_py.K8sObject import K8sObject
from kubernetes_py.K8sSecret import K8sSecret
from kubernetes_py.models.v1.LocalObjectReference import LocalObjectReference
from kubernetes_py.models.v1.ServiceAccount import ServiceAccount
class K8sServiceAccount(K8sObject):
def __init__(self, config=None, name=None):
super(K8sServiceAccount, self).__init__(config=config, name=name, obj_type="ServiceAccount")
# ------------------------------------------------------------------------------------- override
def get(self):
self.model = ServiceAccount(self.get_model())
return self
def create(self):
super(K8sServiceAccount, self).create()
self.get()
return self
def update(self):
super(K8sServiceAccount, self).update()
self.get()
return self
def list(self, pattern=None, labels=None):
ls = super(K8sServiceAccount, self).list(labels=labels)
accts = list(map(lambda x: ServiceAccount(x), ls))
if pattern is not None:
accts = list(filter(lambda x: pattern in x.name, accts))
k8s = []
for x in accts:
j = K8sServiceAccount(config=self.config, name=x.name).from_model(m=x)
k8s.append(j)
return k8s
# ------------------------------------------------------------------------------------- add
def add_api_token(self):
return K8sSecret.create_service_account_api_token(config=self.config, name=self.name)
def add_image_pull_secret(self, secret=None):
if not isinstance(secret, K8sSecret):
raise SyntaxError("K8sServiceAccount.add_image_pull_secret() secret: [ {} ] is invalid.".format(secret))
ref = LocalObjectReference()
ref.name = secret.name
refs = self.image_pull_secrets_refs
refs.append(ref)
self.image_pull_secrets = refs
self.update()
# ------------------------------------------------------------------------------------- secrets
@property
def secrets(self):
secrets = K8sSecret.api_tokens_for_service_account(config=self.config, name=self.name)
return secrets
@secrets.setter
def secrets(self, s=None):
raise NotImplementedError()
# ------------------------------------------------------------------------------------- imagePullSecrets
@property
def image_pull_secrets_refs(self):
refs = self.model.image_pull_secrets
return refs
@image_pull_secrets_refs.setter
def image_pull_secrets_refs(self, s=None):
raise NotImplementedError()
@property
def image_pull_secrets(self):
refs = self.model.image_pull_secrets
secrets = []
for ref in refs:
s = K8sSecret(config=self.config, name=ref.name).get()
secrets.append(s)
return secrets
@image_pull_secrets.setter
def image_pull_secrets(self, s=None):
self.model.image_pull_secrets = s