Skip to content

Commit 75b0788

Browse files
authored
Plugins: Remove registry dependency from process manager (grafana#73241)
simplify
1 parent d935e6f commit 75b0788

File tree

16 files changed

+203
-222
lines changed

16 files changed

+203
-222
lines changed

pkg/plugins/manager/fakes/fakes.go

+73-8
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,8 @@ func (s *FakePluginStorage) Extract(ctx context.Context, pluginID string, dirNam
248248
}
249249

250250
type FakeProcessManager struct {
251-
StartFunc func(_ context.Context, pluginID string) error
252-
StopFunc func(_ context.Context, pluginID string) error
251+
StartFunc func(_ context.Context, p *plugins.Plugin) error
252+
StopFunc func(_ context.Context, p *plugins.Plugin) error
253253
Started map[string]int
254254
Stopped map[string]int
255255
}
@@ -261,18 +261,18 @@ func NewFakeProcessManager() *FakeProcessManager {
261261
}
262262
}
263263

264-
func (m *FakeProcessManager) Start(ctx context.Context, pluginID string) error {
265-
m.Started[pluginID]++
264+
func (m *FakeProcessManager) Start(ctx context.Context, p *plugins.Plugin) error {
265+
m.Started[p.ID]++
266266
if m.StartFunc != nil {
267-
return m.StartFunc(ctx, pluginID)
267+
return m.StartFunc(ctx, p)
268268
}
269269
return nil
270270
}
271271

272-
func (m *FakeProcessManager) Stop(ctx context.Context, pluginID string) error {
273-
m.Stopped[pluginID]++
272+
func (m *FakeProcessManager) Stop(ctx context.Context, p *plugins.Plugin) error {
273+
m.Stopped[p.ID]++
274274
if m.StopFunc != nil {
275-
return m.StopFunc(ctx, pluginID)
275+
return m.StopFunc(ctx, p)
276276
}
277277
return nil
278278
}
@@ -518,3 +518,68 @@ func (f *FakeTerminator) Terminate(ctx context.Context, pluginID string) error {
518518
}
519519
return nil
520520
}
521+
522+
type FakeBackendPlugin struct {
523+
Managed bool
524+
525+
StartCount int
526+
StopCount int
527+
Decommissioned bool
528+
Running bool
529+
530+
mutex sync.RWMutex
531+
backendplugin.Plugin
532+
}
533+
534+
func NewFakeBackendPlugin(managed bool) *FakeBackendPlugin {
535+
return &FakeBackendPlugin{
536+
Managed: managed,
537+
}
538+
}
539+
540+
func (p *FakeBackendPlugin) Start(_ context.Context) error {
541+
p.mutex.Lock()
542+
defer p.mutex.Unlock()
543+
p.Running = true
544+
p.StartCount++
545+
return nil
546+
}
547+
548+
func (p *FakeBackendPlugin) Stop(_ context.Context) error {
549+
p.mutex.Lock()
550+
defer p.mutex.Unlock()
551+
p.Running = false
552+
p.StopCount++
553+
return nil
554+
}
555+
556+
func (p *FakeBackendPlugin) Decommission() error {
557+
p.mutex.Lock()
558+
defer p.mutex.Unlock()
559+
p.Decommissioned = true
560+
return nil
561+
}
562+
563+
func (p *FakeBackendPlugin) IsDecommissioned() bool {
564+
p.mutex.RLock()
565+
defer p.mutex.RUnlock()
566+
return p.Decommissioned
567+
}
568+
569+
func (p *FakeBackendPlugin) IsManaged() bool {
570+
p.mutex.RLock()
571+
defer p.mutex.RUnlock()
572+
return p.Managed
573+
}
574+
575+
func (p *FakeBackendPlugin) Exited() bool {
576+
p.mutex.RLock()
577+
defer p.mutex.RUnlock()
578+
return !p.Running
579+
}
580+
581+
func (p *FakeBackendPlugin) Kill() {
582+
p.mutex.Lock()
583+
defer p.mutex.Unlock()
584+
p.Running = false
585+
}

pkg/plugins/manager/loader/loader_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
"github.com/grafana/grafana/pkg/services/org"
2424
)
2525

26-
var compareOpts = []cmp.Option{cmpopts.IgnoreFields(plugins.Plugin{}, "client", "log"), fsComparer}
26+
var compareOpts = []cmp.Option{cmpopts.IgnoreFields(plugins.Plugin{}, "client", "log", "mu"), fsComparer}
2727

2828
var fsComparer = cmp.Comparer(func(fs1 plugins.FS, fs2 plugins.FS) bool {
2929
fs1Files, err := fs1.Files()

pkg/plugins/manager/pipeline/initialization/steps.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,16 @@ func (b *BackendClientInit) Initialize(ctx context.Context, p *plugins.Plugin) (
5757

5858
// BackendClientStarter implements an InitializeFunc for starting a backend plugin process.
5959
type BackendClientStarter struct {
60-
processManager process.Service
60+
processManager process.Manager
6161
log log.Logger
6262
}
6363

6464
// BackendProcessStartStep returns a new InitializeFunc for starting a backend plugin process.
65-
func BackendProcessStartStep(processManager process.Service) InitializeFunc {
65+
func BackendProcessStartStep(processManager process.Manager) InitializeFunc {
6666
return newBackendProcessStarter(processManager).Start
6767
}
6868

69-
func newBackendProcessStarter(processManager process.Service) *BackendClientStarter {
69+
func newBackendProcessStarter(processManager process.Manager) *BackendClientStarter {
7070
return &BackendClientStarter{
7171
processManager: processManager,
7272
log: log.New("plugins.backend.start"),
@@ -75,7 +75,7 @@ func newBackendProcessStarter(processManager process.Service) *BackendClientStar
7575

7676
// Start will start the backend plugin process.
7777
func (b *BackendClientStarter) Start(ctx context.Context, p *plugins.Plugin) (*plugins.Plugin, error) {
78-
if err := b.processManager.Start(ctx, p.ID); err != nil {
78+
if err := b.processManager.Start(ctx, p); err != nil {
7979
b.log.Error("Could not start plugin", "pluginId", p.ID, "error", err)
8080
return nil, err
8181
}

pkg/plugins/manager/pipeline/termination/steps.go

+5-7
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,18 @@ func (r *TerminablePluginResolver) Resolve(ctx context.Context, pluginID string)
4444

4545
// BackendProcessTerminator implements a TerminateFunc for stopping a backend plugin process.
4646
//
47-
// It uses the process.Service to stop the backend plugin process.
47+
// It uses the process.Manager to stop the backend plugin process.
4848
type BackendProcessTerminator struct {
49-
processManager process.Service
49+
processManager process.Manager
5050
log log.Logger
5151
}
5252

5353
// BackendProcessTerminatorStep returns a new TerminateFunc for stopping a backend plugin process.
54-
func BackendProcessTerminatorStep(processManager process.Service) TerminateFunc {
54+
func BackendProcessTerminatorStep(processManager process.Manager) TerminateFunc {
5555
return newBackendProcessTerminator(processManager).Terminate
5656
}
5757

58-
func newBackendProcessTerminator(processManager process.Service) *BackendProcessTerminator {
58+
func newBackendProcessTerminator(processManager process.Manager) *BackendProcessTerminator {
5959
return &BackendProcessTerminator{
6060
processManager: processManager,
6161
log: log.New("plugins.backend.termination"),
@@ -64,9 +64,7 @@ func newBackendProcessTerminator(processManager process.Service) *BackendProcess
6464

6565
// Terminate stops a backend plugin process.
6666
func (t *BackendProcessTerminator) Terminate(ctx context.Context, p *plugins.Plugin) error {
67-
t.log.Debug("Stopping plugin process", "pluginId", p.ID)
68-
69-
return t.processManager.Stop(ctx, p.ID)
67+
return t.processManager.Stop(ctx, p)
7068
}
7169

7270
// Deregister implements a TerminateFunc for removing a plugin from the plugin registry.

pkg/plugins/manager/process/ifaces.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
package process
22

3-
import "context"
3+
import (
4+
"context"
45

5-
type Service interface {
6+
"github.com/grafana/grafana/pkg/plugins"
7+
)
8+
9+
type Manager interface {
610
// Start executes a backend plugin process.
7-
Start(ctx context.Context, pluginID string) error
11+
Start(ctx context.Context, p *plugins.Plugin) error
812
// Stop terminates a backend plugin process.
9-
Stop(ctx context.Context, pluginID string) error
13+
Stop(ctx context.Context, p *plugins.Plugin) error
1014
}

pkg/plugins/manager/process/process.go

+7-63
Original file line numberDiff line numberDiff line change
@@ -3,54 +3,22 @@ package process
33
import (
44
"context"
55
"errors"
6-
"sync"
76
"time"
87

98
"github.com/grafana/grafana/pkg/plugins"
10-
"github.com/grafana/grafana/pkg/plugins/backendplugin"
11-
"github.com/grafana/grafana/pkg/plugins/log"
12-
"github.com/grafana/grafana/pkg/plugins/manager/registry"
139
)
1410

15-
var _ Service = (*Manager)(nil)
11+
type Service struct{}
1612

17-
type Manager struct {
18-
pluginRegistry registry.Service
19-
20-
mu sync.Mutex
21-
log log.Logger
22-
}
23-
24-
func ProvideService(pluginRegistry registry.Service) *Manager {
25-
return NewManager(pluginRegistry)
13+
func ProvideService() *Service {
14+
return &Service{}
2615
}
2716

28-
func NewManager(pluginRegistry registry.Service) *Manager {
29-
return &Manager{
30-
pluginRegistry: pluginRegistry,
31-
log: log.New("plugin.process.manager"),
32-
}
33-
}
34-
35-
func (m *Manager) Run(ctx context.Context) error {
36-
<-ctx.Done()
37-
m.shutdown(ctx)
38-
return ctx.Err()
39-
}
40-
41-
func (m *Manager) Start(ctx context.Context, pluginID string) error {
42-
p, exists := m.pluginRegistry.Plugin(ctx, pluginID)
43-
if !exists {
44-
return backendplugin.ErrPluginNotRegistered
45-
}
46-
17+
func (*Service) Start(ctx context.Context, p *plugins.Plugin) error {
4718
if !p.IsManaged() || !p.Backend || p.SignatureError != nil {
4819
return nil
4920
}
5021

51-
m.mu.Lock()
52-
defer m.mu.Unlock()
53-
5422
if err := startPluginAndRestartKilledProcesses(ctx, p); err != nil {
5523
return err
5624
}
@@ -59,15 +27,8 @@ func (m *Manager) Start(ctx context.Context, pluginID string) error {
5927
return nil
6028
}
6129

62-
func (m *Manager) Stop(ctx context.Context, pluginID string) error {
63-
p, exists := m.pluginRegistry.Plugin(ctx, pluginID)
64-
if !exists {
65-
return backendplugin.ErrPluginNotRegistered
66-
}
67-
m.log.Debug("Stopping plugin process", "pluginId", p.ID)
68-
m.mu.Lock()
69-
defer m.mu.Unlock()
70-
30+
func (*Service) Stop(ctx context.Context, p *plugins.Plugin) error {
31+
p.Logger().Debug("Stopping plugin process")
7132
if err := p.Decommission(); err != nil {
7233
return err
7334
}
@@ -79,23 +40,6 @@ func (m *Manager) Stop(ctx context.Context, pluginID string) error {
7940
return nil
8041
}
8142

82-
// shutdown stops all backend plugin processes
83-
func (m *Manager) shutdown(ctx context.Context) {
84-
var wg sync.WaitGroup
85-
for _, p := range m.pluginRegistry.Plugins(ctx) {
86-
wg.Add(1)
87-
go func(p backendplugin.Plugin, ctx context.Context) {
88-
defer wg.Done()
89-
p.Logger().Debug("Stopping plugin")
90-
if err := p.Stop(ctx); err != nil {
91-
p.Logger().Error("Failed to stop plugin", "error", err)
92-
}
93-
p.Logger().Debug("Plugin stopped")
94-
}(p, ctx)
95-
}
96-
wg.Wait()
97-
}
98-
9943
func startPluginAndRestartKilledProcesses(ctx context.Context, p *plugins.Plugin) error {
10044
if err := p.Start(ctx); err != nil {
10145
return err
@@ -123,7 +67,7 @@ func restartKilledProcess(ctx context.Context, p *plugins.Plugin) error {
12367
if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) {
12468
return err
12569
}
126-
return nil
70+
return p.Stop(ctx)
12771
case <-ticker.C:
12872
if p.IsDecommissioned() {
12973
p.Logger().Debug("Plugin decommissioned")

0 commit comments

Comments
 (0)