Skip to content

Commit dd771e8

Browse files
Also call storage on mode1. Measure latency (grafana#87739)
* Also call storage on mode1. Add metrics * Update comment * Don't use compare function for now * Remove very important space * Finish add logging in mode2. Also call US in mode1 in a non blocking way * Improve code readability on modes 1 and 2 * Fix tests * Rename vars * Lint * Return error from legacy write * Renume useless defer * [REVIEW] improvements * Pass kind instead of name * Use kind instead of name in metrics * Only call latency metrics once * Return error on writes to legacystore in mode1 * Move accesssor logic into the goroutine as well
1 parent 7c5c62f commit dd771e8

File tree

5 files changed

+238
-64
lines changed

5 files changed

+238
-64
lines changed

pkg/apiserver/rest/dualwriter_mode1.go

+177-11
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ package rest
22

33
import (
44
"context"
5+
"errors"
6+
"time"
57

8+
"k8s.io/apimachinery/pkg/api/meta"
69
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
710
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
811
"k8s.io/apimachinery/pkg/runtime"
@@ -17,6 +20,10 @@ type DualWriterMode1 struct {
1720
*dualWriterMetrics
1821
}
1922

23+
const (
24+
mode1Str = "1"
25+
)
26+
2027
// NewDualWriterMode1 returns a new DualWriter in mode 1.
2128
// Mode 1 represents writing to and reading from LegacyStorage.
2229
func NewDualWriterMode1(legacy LegacyStorage, storage Storage) *DualWriterMode1 {
@@ -27,36 +34,195 @@ func NewDualWriterMode1(legacy LegacyStorage, storage Storage) *DualWriterMode1
2734

2835
// Create overrides the behavior of the generic DualWriter and writes only to LegacyStorage.
2936
func (d *DualWriterMode1) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
30-
ctx = klog.NewContext(ctx, d.Log)
31-
return d.Legacy.Create(ctx, obj, createValidation, options)
37+
log := d.Log.WithValues("kind", options.Kind)
38+
ctx = klog.NewContext(ctx, log)
39+
var method = "create"
40+
41+
startLegacy := time.Now()
42+
res, err := d.Legacy.Create(ctx, obj, createValidation, options)
43+
if err != nil {
44+
log.Error(err, "unable to create object in legacy storage")
45+
d.recordLegacyDuration(true, mode1Str, options.Kind, method, startLegacy)
46+
return res, err
47+
}
48+
d.recordLegacyDuration(false, mode1Str, options.Kind, method, startLegacy)
49+
50+
go func() {
51+
accessorCreated, err := meta.Accessor(res)
52+
if err != nil {
53+
log.Error(err, "unable to get accessor for created object")
54+
}
55+
56+
accessorOld, err := meta.Accessor(obj)
57+
if err != nil {
58+
log.Error(err, "unable to get accessor for old object")
59+
}
60+
61+
enrichObject(accessorOld, accessorCreated)
62+
startStorage := time.Now()
63+
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage create timeout"))
64+
defer cancel()
65+
_, errObjectSt := d.Storage.Create(ctx, obj, createValidation, options)
66+
d.recordStorageDuration(errObjectSt != nil, mode1Str, options.Kind, method, startStorage)
67+
}()
68+
69+
return res, nil
3270
}
3371

3472
// Get overrides the behavior of the generic DualWriter and reads only from LegacyStorage.
3573
func (d *DualWriterMode1) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
36-
ctx = klog.NewContext(ctx, d.Log)
37-
return d.Legacy.Get(ctx, name, options)
74+
log := d.Log.WithValues("name", name, "resourceVersion", options.ResourceVersion, "kind", options.Kind)
75+
ctx = klog.NewContext(ctx, log)
76+
var method = "get"
77+
78+
startLegacy := time.Now()
79+
res, errLegacy := d.Legacy.Get(ctx, name, options)
80+
if errLegacy != nil {
81+
log.Error(errLegacy, "unable to get object in legacy storage")
82+
}
83+
d.recordLegacyDuration(errLegacy != nil, mode1Str, options.Kind, method, startLegacy)
84+
85+
go func() {
86+
startStorage := time.Now()
87+
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage get timeout"))
88+
defer cancel()
89+
_, err := d.Storage.Get(ctx, name, options)
90+
d.recordStorageDuration(err != nil, mode1Str, options.Kind, method, startStorage)
91+
}()
92+
93+
return res, errLegacy
3894
}
3995

4096
// List overrides the behavior of the generic DualWriter and reads only from LegacyStorage.
4197
func (d *DualWriterMode1) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
42-
ctx = klog.NewContext(ctx, d.Log)
43-
return d.Legacy.List(ctx, options)
98+
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", options.ResourceVersion, "kind", options.Kind)
99+
ctx = klog.NewContext(ctx, log)
100+
var method = "list"
101+
102+
startLegacy := time.Now()
103+
res, errLegacy := d.Legacy.List(ctx, options)
104+
if errLegacy != nil {
105+
log.Error(errLegacy, "unable to list object in legacy storage")
106+
}
107+
d.recordLegacyDuration(errLegacy != nil, mode1Str, options.Kind, method, startLegacy)
108+
109+
go func() {
110+
startStorage := time.Now()
111+
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage list timeout"))
112+
defer cancel()
113+
_, err := d.Storage.List(ctx, options)
114+
d.recordStorageDuration(err != nil, mode1Str, options.Kind, method, startStorage)
115+
}()
116+
117+
return res, errLegacy
44118
}
45119

46120
func (d *DualWriterMode1) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
121+
log := d.Log.WithValues("name", name, "kind", options.Kind)
47122
ctx = klog.NewContext(ctx, d.Log)
48-
return d.Legacy.Delete(ctx, name, deleteValidation, options)
123+
var method = "delete"
124+
125+
startLegacy := time.Now()
126+
res, async, err := d.Legacy.Delete(ctx, name, deleteValidation, options)
127+
if err != nil {
128+
log.Error(err, "unable to delete object in legacy storage")
129+
d.recordLegacyDuration(true, mode1Str, options.Kind, method, startLegacy)
130+
return res, async, err
131+
}
132+
d.recordLegacyDuration(false, mode1Str, options.Kind, method, startLegacy)
133+
134+
go func() {
135+
startStorage := time.Now()
136+
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage delete timeout"))
137+
defer cancel()
138+
_, _, err := d.Storage.Delete(ctx, name, deleteValidation, options)
139+
d.recordStorageDuration(err != nil, mode1Str, options.Kind, method, startStorage)
140+
}()
141+
142+
return res, async, nil
49143
}
50144

51145
// DeleteCollection overrides the behavior of the generic DualWriter and deletes only from LegacyStorage.
52146
func (d *DualWriterMode1) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {
53-
ctx = klog.NewContext(ctx, d.Log)
54-
return d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
147+
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", listOptions.ResourceVersion)
148+
ctx = klog.NewContext(ctx, log)
149+
var method = "delete-collection"
150+
151+
startLegacy := time.Now()
152+
res, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
153+
if err != nil {
154+
log.Error(err, "unable to delete collection in legacy storage")
155+
d.recordLegacyDuration(true, mode1Str, options.Kind, method, startLegacy)
156+
return res, err
157+
}
158+
d.recordLegacyDuration(false, mode1Str, options.Kind, method, startLegacy)
159+
160+
go func() {
161+
startStorage := time.Now()
162+
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage deletecollection timeout"))
163+
defer cancel()
164+
_, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
165+
d.recordStorageDuration(err != nil, mode1Str, options.Kind, method, startStorage)
166+
}()
167+
168+
return res, nil
55169
}
56170

57171
func (d *DualWriterMode1) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
58-
ctx = klog.NewContext(ctx, d.Log)
59-
return d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
172+
log := d.Log.WithValues("name", name, "kind", options.Kind)
173+
ctx = klog.NewContext(ctx, log)
174+
var method = "update"
175+
176+
startLegacy := time.Now()
177+
res, async, err := d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
178+
if err != nil {
179+
log.Error(err, "unable to update in legacy storage")
180+
d.recordLegacyDuration(true, mode1Str, options.Kind, method, startLegacy)
181+
return res, async, err
182+
}
183+
d.recordLegacyDuration(false, mode1Str, options.Kind, method, startLegacy)
184+
185+
go func() {
186+
updated, err := objInfo.UpdatedObject(ctx, res)
187+
if err != nil {
188+
log.WithValues("object", updated).Error(err, "could not update or create object")
189+
}
190+
191+
// get the object to be updated
192+
foundObj, err := d.Storage.Get(ctx, name, &metav1.GetOptions{})
193+
if err != nil {
194+
log.WithValues("object", foundObj).Error(err, "could not get object to update")
195+
}
196+
197+
// if the object is found, create a new updateWrapper with the object found
198+
if foundObj != nil {
199+
accessorOld, err := meta.Accessor(foundObj)
200+
if err != nil {
201+
log.Error(err, "unable to get accessor for original updated object")
202+
}
203+
204+
accessor, err := meta.Accessor(res)
205+
if err != nil {
206+
log.Error(err, "unable to get accessor for updated object")
207+
}
208+
209+
accessor.SetResourceVersion(accessorOld.GetResourceVersion())
210+
accessor.SetUID(accessorOld.GetUID())
211+
212+
enrichObject(accessorOld, accessor)
213+
objInfo = &updateWrapper{
214+
upstream: objInfo,
215+
updated: res,
216+
}
217+
}
218+
startStorage := time.Now()
219+
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage update timeout"))
220+
defer cancel()
221+
_, _, errObjectSt := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
222+
d.recordStorageDuration(errObjectSt != nil, mode1Str, options.Kind, method, startStorage)
223+
}()
224+
225+
return res, async, nil
60226
}
61227

62228
func (d *DualWriterMode1) Destroy() {

pkg/apiserver/rest/dualwriter_mode1_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ import (
1313
"k8s.io/apiserver/pkg/apis/example"
1414
)
1515

16-
var exampleObj = &example.Pod{TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"}, Spec: example.PodSpec{}, Status: example.PodStatus{}}
17-
var exampleObjDifferentRV = &example.Pod{TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "3"}, Spec: example.PodSpec{}, Status: example.PodStatus{}}
18-
var anotherObj = &example.Pod{TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "2"}, Spec: example.PodSpec{}, Status: example.PodStatus{}}
19-
var failingObj = &example.Pod{TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{Name: "object-fail", ResourceVersion: "2"}, Spec: example.PodSpec{}, Status: example.PodStatus{}}
20-
var exampleList = &example.PodList{TypeMeta: metav1.TypeMeta{}, ListMeta: metav1.ListMeta{}, Items: []example.Pod{*exampleObj}}
16+
var exampleObj = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"}, Spec: example.PodSpec{}, Status: example.PodStatus{}}
17+
var exampleObjDifferentRV = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "3"}, Spec: example.PodSpec{}, Status: example.PodStatus{}}
18+
var anotherObj = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "2"}, Spec: example.PodSpec{}, Status: example.PodStatus{}}
19+
var failingObj = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "object-fail", ResourceVersion: "2"}, Spec: example.PodSpec{}, Status: example.PodStatus{}}
20+
var exampleList = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, Items: []example.Pod{*exampleObj}}
2121
var anotherList = &example.PodList{Items: []example.Pod{*anotherObj}}
2222

2323
func TestMode1_Create(t *testing.T) {

pkg/apiserver/rest/dualwriter_mode2.go

+32-33
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,12 @@ func NewDualWriterMode2(legacy LegacyStorage, storage Storage) *DualWriterMode2
3232

3333
// Create overrides the behavior of the generic DualWriter and writes to LegacyStorage and Storage.
3434
func (d *DualWriterMode2) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
35-
ctx = klog.NewContext(ctx, d.Log)
35+
log := d.Log.WithValues("kind", options.Kind)
36+
ctx = klog.NewContext(ctx, log)
3637

3738
created, err := d.Legacy.Create(ctx, obj, createValidation, options)
3839
if err != nil {
39-
d.Log.Error(err, "unable to create object in legacy storage")
40+
log.Error(err, "unable to create object in legacy storage")
4041
return created, err
4142
}
4243

@@ -58,15 +59,15 @@ func (d *DualWriterMode2) Create(ctx context.Context, obj runtime.Object, create
5859

5960
rsp, err := d.Storage.Create(ctx, created, createValidation, options)
6061
if err != nil {
61-
d.Log.WithValues("name", accessorCreated.GetName(), "resourceVersion", accessorCreated.GetResourceVersion()).Error(err, "unable to create object in duplicate storage")
62+
log.WithValues("name", accessorCreated.GetName(), "resourceVersion", accessorCreated.GetResourceVersion()).Error(err, "unable to create object in storage")
6263
}
6364
return rsp, err
6465
}
6566

6667
// Get overrides the behavior of the generic DualWriter.
6768
// It retrieves an object from Storage if possible, and if not it falls back to LegacyStorage.
6869
func (d *DualWriterMode2) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
69-
log := d.Log.WithValues("name", name, "resourceVersion", options.ResourceVersion)
70+
log := d.Log.WithValues("name", name, "resourceVersion", options.ResourceVersion, "kind", options.Kind)
7071
ctx = klog.NewContext(ctx, log)
7172
s, err := d.Storage.Get(ctx, name, &metav1.GetOptions{})
7273
if err != nil {
@@ -83,7 +84,7 @@ func (d *DualWriterMode2) Get(ctx context.Context, name string, options *metav1.
8384
// List overrides the behavior of the generic DualWriter.
8485
// It returns Storage entries if possible and falls back to LegacyStorage entries if not.
8586
func (d *DualWriterMode2) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
86-
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", options.ResourceVersion)
87+
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", options.ResourceVersion, "kind", options.Kind)
8788
ctx = klog.NewContext(ctx, log)
8889
ll, err := d.Legacy.List(ctx, options)
8990
if err != nil {
@@ -167,7 +168,7 @@ func (d *DualWriterMode2) DeleteCollection(ctx context.Context, deleteValidation
167168
}
168169

169170
func (d *DualWriterMode2) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
170-
log := d.Log.WithValues("name", name)
171+
log := d.Log.WithValues("name", name, "kind", options.Kind)
171172
ctx = klog.NewContext(ctx, log)
172173

173174
deletedLS, async, err := d.Legacy.Delete(ctx, name, deleteValidation, options)
@@ -190,23 +191,21 @@ func (d *DualWriterMode2) Delete(ctx context.Context, name string, deleteValidat
190191

191192
// Update overrides the generic behavior of the Storage and writes first to the legacy storage and then to storage.
192193
func (d *DualWriterMode2) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
193-
var notFound bool
194-
log := d.Log.WithValues("name", name)
194+
log := d.Log.WithValues("name", name, "kind", options.Kind)
195195
ctx = klog.NewContext(ctx, log)
196196

197-
// get old and new (updated) object so they can be stored in legacy store
198-
old, err := d.Storage.Get(ctx, name, &metav1.GetOptions{})
197+
// get foundObj and new (updated) object so they can be stored in legacy store
198+
foundObj, err := d.Storage.Get(ctx, name, &metav1.GetOptions{})
199199
if err != nil {
200200
if !apierrors.IsNotFound(err) {
201-
log.WithValues("object", old).Error(err, "could not get object to update")
201+
log.WithValues("object", foundObj).Error(err, "could not get object to update")
202202
return nil, false, err
203203
}
204-
notFound = true
205204
log.Info("object not found for update, creating one")
206205
}
207206

208207
// obj can be populated in case it's found or empty in case it's not found
209-
updated, err := objInfo.UpdatedObject(ctx, old)
208+
updated, err := objInfo.UpdatedObject(ctx, foundObj)
210209
if err != nil {
211210
log.WithValues("object", updated).Error(err, "could not update or create object")
212211
return nil, false, err
@@ -218,31 +217,28 @@ func (d *DualWriterMode2) Update(ctx context.Context, name string, objInfo rest.
218217
return obj, created, err
219218
}
220219

221-
if notFound {
222-
return d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
223-
}
220+
// if the object is found, create a new updateWrapper with the object found
221+
if foundObj != nil {
222+
accessorOld, err := meta.Accessor(foundObj)
223+
if err != nil {
224+
log.Error(err, "unable to get accessor for original updated object")
225+
}
224226

225-
accessor, err := meta.Accessor(obj)
226-
if err != nil {
227-
return nil, false, err
228-
}
227+
accessor, err := meta.Accessor(obj)
228+
if err != nil {
229+
log.Error(err, "unable to get accessor for updated object")
230+
}
229231

230-
// only if object exists
231-
accessorOld, err := meta.Accessor(old)
232-
if err != nil {
233-
return nil, false, err
234-
}
232+
enrichObject(accessorOld, accessor)
235233

236-
enrichObject(accessorOld, accessor)
234+
accessor.SetResourceVersion(accessorOld.GetResourceVersion())
235+
accessor.SetUID(accessorOld.GetUID())
237236

238-
// keep the same UID and resource_version
239-
accessor.SetResourceVersion(accessorOld.GetResourceVersion())
240-
accessor.SetUID(accessorOld.GetUID())
241-
objInfo = &updateWrapper{
242-
upstream: objInfo,
243-
updated: obj,
237+
objInfo = &updateWrapper{
238+
upstream: objInfo,
239+
updated: obj,
240+
}
244241
}
245-
246242
// TODO: relies on GuaranteedUpdate creating the object if
247243
// it doesn't exist: https://github.com/grafana/grafana/pull/85206
248244
return d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
@@ -309,6 +305,9 @@ func enrichObject(accessorO, accessorC metav1.Object) {
309305
accessorC.SetLabels(accessorO.GetLabels())
310306

311307
ac := accessorC.GetAnnotations()
308+
if ac == nil {
309+
ac = map[string]string{}
310+
}
312311
for k, v := range accessorO.GetAnnotations() {
313312
ac[k] = v
314313
}

0 commit comments

Comments
 (0)