forked from grafana/grafana
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdatabase_storage.go
131 lines (107 loc) · 3.36 KB
/
database_storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package remotecache
import (
"context"
"time"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
)
var getTime = time.Now
const databaseCacheType = "database"
type databaseCache struct {
SQLStore db.DB
log log.Logger
}
func newDatabaseCache(sqlstore db.DB) *databaseCache {
dc := &databaseCache{
SQLStore: sqlstore,
log: log.New("remotecache.database"),
}
return dc
}
func (dc *databaseCache) Run(ctx context.Context) error {
ticker := time.NewTicker(time.Minute * 10)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
dc.internalRunGC()
}
}
}
func (dc *databaseCache) internalRunGC() {
err := dc.SQLStore.WithDbSession(context.Background(), func(session *db.Session) error {
now := getTime().Unix()
sql := `DELETE FROM cache_data WHERE (? - created_at) >= expires AND expires <> 0`
_, err := session.Exec(sql, now)
return err
})
if err != nil {
dc.log.Error("failed to run garbage collect", "error", err)
}
}
func (dc *databaseCache) Get(ctx context.Context, key string) ([]byte, error) {
cacheHit := CacheData{}
err := dc.SQLStore.WithDbSession(ctx, func(session *db.Session) error {
exist, err := session.Where("cache_key= ?", key).Get(&cacheHit)
if err != nil {
return err
}
if !exist {
return ErrCacheItemNotFound
}
if cacheHit.Expires > 0 {
existedButExpired := getTime().Unix()-cacheHit.CreatedAt >= cacheHit.Expires
if existedButExpired {
err = dc.Delete(ctx, key) // ignore this error since we will return `ErrCacheItemNotFound` anyway
if err != nil {
dc.log.Debug("Deletion of expired key failed: %v", err)
}
return ErrCacheItemNotFound
}
}
return nil
})
return cacheHit.Data, err
}
func (dc *databaseCache) Set(ctx context.Context, key string, data []byte, expire time.Duration) error {
return dc.SQLStore.WithDbSession(ctx, func(session *db.Session) error {
var expiresInSeconds int64
if expire != 0 {
expiresInSeconds = int64(expire) / int64(time.Second)
}
// attempt to insert the key
sql := `INSERT INTO cache_data (cache_key,data,created_at,expires) VALUES(?,?,?,?)`
_, err := session.Exec(sql, key, data, getTime().Unix(), expiresInSeconds)
if err != nil {
// attempt to update if a unique constrain violation or a deadlock (for MySQL) occurs
// if the update fails propagate the error
// which eventually will result in a key that is not finally set
// but since it's a cache does not harm a lot
if dc.SQLStore.GetDialect().IsUniqueConstraintViolation(err) || dc.SQLStore.GetDialect().IsDeadlock(err) {
sql := `UPDATE cache_data SET data=?, created_at=?, expires=? WHERE cache_key=?`
_, err = session.Exec(sql, data, getTime().Unix(), expiresInSeconds, key)
if err != nil && dc.SQLStore.GetDialect().IsDeadlock(err) {
// most probably somebody else is upserting the key
// so it is safe enough not to propagate this error
return nil
}
}
}
return err
})
}
func (dc *databaseCache) Delete(ctx context.Context, key string) error {
return dc.SQLStore.WithDbSession(ctx, func(session *db.Session) error {
sql := "DELETE FROM cache_data WHERE cache_key=?"
_, err := session.Exec(sql, key)
return err
})
}
// CacheData is the struct representing the table in the database
type CacheData struct {
CacheKey string
Data []byte
Expires int64
CreatedAt int64
}