-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathmain.go
234 lines (203 loc) · 7.09 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
package main
import (
"bufio"
"encoding/json"
"flag"
"fmt"
"os"
pkiV1 "github.com/box/error-reporting-with-kubernetes-events/pkg/apis/box.com/v1"
"github.com/golang/glog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
// "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
)
const allowedNamesFilePath string = "/AllowedNames"
const pkisWatchPath string = "/apis/box.com/v1/pkis"
// allowedNames returns the allowed serviceName values in PKI objects.
// The file in allowedNamesFilePath is read and its contents are returned in a
// slice. One line each slice entry.
func allowedNames() []string {
file, err := os.Open(allowedNamesFilePath)
if err != nil {
glog.Fatalf("Could not open file: %s", allowedNamesFilePath)
}
defer file.Close()
rv := []string{}
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
if len(line) != 0 {
rv = append(rv, line)
}
}
if err := scanner.Err(); err != nil {
glog.Fatal(err)
}
return rv
}
// isInSlice returns true if the string e exists in slice s
func isInSlice(s []string, e string) bool {
for _, x := range s {
if x == e {
return true
}
}
return false
}
// watchPkis uses the HTTP watch endpoint to "react" to changes to the
// PKIs. A goroutine with an infinite loop is started that continously
// waits on the HTTP watch endpoint. The errors are logged, the received PKIs
// are inserted into the channel that is returned.
func watchPkis(restClient restclient.Interface) <-chan pkiV1.PkiChange {
events := make(chan pkiV1.PkiChange)
go func() {
for {
// TODO: Instead of this direct HTTP request, the workqueue and
// informer pattern could be used. Similar to here
// https://github.com/kubernetes/sample-controller/blob/258eead08702028194ade1c0a7f958c837d6f081/controller.go#L122
request := restClient.Get().AbsPath(pkisWatchPath).Param("watch", "true")
glog.Infof("HTTP Get request for: %s", request.URL())
body, err := request.Stream()
if err != nil {
glog.Errorf("restClient Stream Error %v.", err)
continue
}
defer body.Close()
decoder := json.NewDecoder(body)
// Incrementally parse the body as long as it grows.
for {
var event pkiV1.PkiChange
err = decoder.Decode(&event)
if err != nil {
glog.Errorf("JSON decode error %v.", err)
break
}
events <- event
}
glog.Infof("Completed one Pki reception and decode")
}
}()
return events
}
// doPkiProcessing is a placeholder function for the business logic in this
// control plane application. Once a valid PKI object is found, additional
// processing would be triggered from this function. At the end the output is
// written back to the API server in a Secret so that the interested parties
// can mount it.
func doPkiProcessing(pki pkiV1.Pki, kubeClient *kubernetes.Clientset) {
//
// .....
// Omitted extra processing for brevity
//
// Generate a Secret with the processing result and write back to the
// API server. The application does a Secret mount and retrieves this
// data.
// A placeholder empty Secret. In real life, the data in this
// Secret would contain the generated PKI information.
emptySecretSpec := v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: pki.Name,
Namespace: pki.Namespace,
},
}
// Replace the PKI
err := kubeClient.CoreV1().Secrets(pki.Namespace).Delete(pki.Name,
&metav1.DeleteOptions{})
if err != nil {
glog.Infof("Could not delete Secret with name %s: %v\n", pki.Name, err)
}
_, err = kubeClient.CoreV1().Secrets(pki.Namespace).Create(
&emptySecretSpec)
if err != nil {
glog.Fatalf("Could not generate Secret: %v", err)
}
}
// postEventAboutPki uses Kubernetes Events to write an error message to the event
// steam of pods in the same namespace as the PKI.
func postEventAboutPki(pki pkiV1.Pki, kubeClient *kubernetes.Clientset,
recorder record.EventRecorder, allowedNames []string) {
pods, err := kubeClient.CoreV1().Pods(pki.Namespace).List(metav1.ListOptions{})
if err != nil {
glog.Fatalf(" Could not list pods in namespace %s: %v", pki.Namespace, err)
}
for _, pod := range pods.Items {
// TODO: Even in the same namespace, there may be some pods that do not use
// this PKI. Ideally we only want to send this message to the relevant pods.
// One needs to inspect the volumeMounts done by the containers in pods.
// If a container does a volumeMount with the same name, only then post this
// error message to that Pod's lifecycle. This is left as an exercise for the
// reader.
ref, err := reference.GetReference(scheme.Scheme, &pod)
if err != nil {
glog.Fatalf("Could not get reference for pod %v: %v\n",
pod.Name, err)
}
recorder.Event(ref, v1.EventTypeWarning, "PKI ServiceName error",
fmt.Sprintf("ServiceName: %s in PKI: %s is not found in"+
" allowedNames: %s", pki.Spec.ServiceName, pki.Name,
allowedNames))
}
}
// handlePkiChanges waits for new PKIs to be added or existing ones to be modified.
// When a change happens, it does parameter checking and some external processing.
func handlePkiChanges(pkiEvents <-chan pkiV1.PkiChange, allowedNames []string,
kubeClient *kubernetes.Clientset, recorder record.EventRecorder) {
for e := range pkiEvents {
glog.Infof("Seen PkiChange : %v", e)
t := e.Type
pki := e.Object
if t == watch.Added || t == watch.Modified {
serviceName := pki.Spec.ServiceName
if isInSlice(allowedNames, serviceName) {
// The input parameters are ok. Do extra processing ...
doPkiProcessing(pki, kubeClient)
} else {
// Found some unexpected parameters in the PKI. Generate an error
// message using kubernetes events.
postEventAboutPki(pki, kubeClient, recorder, allowedNames)
}
} else {
// There are other types of changes, Deleted, etc that we do
// not care in this example.
glog.Infof("Received an unhandled PKI change: %s for %v. Ignoring!", t, pki)
}
glog.Flush()
}
}
// eventRecorder returns an EventRecorder type that can be
// used to post Events to different object's lifecycles.
func eventRecorder(
kubeClient *kubernetes.Clientset) record.EventRecorder {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(
&typedcorev1.EventSinkImpl{
Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(
scheme.Scheme,
v1.EventSource{Component: "controlplane"})
return recorder
}
func main() {
flag.Parse()
allowedNames := allowedNames()
kubeConfig, err := restclient.InClusterConfig()
if err != nil {
glog.Fatalf("Could not get kubeconfig: %v", err)
}
kubeClient, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
glog.Fatalf("Error building kubernetes clientset: %v", err)
}
restClient := kubeClient.RESTClient()
recorder := eventRecorder(kubeClient)
pkiEvents := watchPkis(restClient)
handlePkiChanges(pkiEvents, allowedNames, kubeClient, recorder)
}